You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/18 20:41:57 UTC
[07/22] incubator-apex-malhar git commit: APEXMALHAR-2095 removed
checkstyle violations of malhar library module
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java
index d78df94..63c75ef 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalBatchOutputOperatorTest.java
@@ -18,20 +18,33 @@
*/
package com.datatorrent.lib.db.jdbc;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Random;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator.ProcessingMode;
-import com.datatorrent.netlet.util.DTThrowable;
-import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.*;
import com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.TestEvent;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.netlet.util.DTThrowable;
+
+import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.APP_ID;
+import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.OPERATOR_ID;
+import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.TABLE_NAME;
import static com.datatorrent.lib.db.jdbc.JdbcOperatorTest.DB_DRIVER;
import static com.datatorrent.lib.db.jdbc.JdbcOperatorTest.URL;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import java.sql.*;
-import java.util.Random;
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Test for {@link AbstractJdbcNonTransactionableBatchOutputOperator}
@@ -54,17 +67,7 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
try {
Class.forName(DB_DRIVER).newInstance();
con = DriverManager.getConnection(URL);
- }
- catch (SQLException ex) {
- DTThrowable.rethrow(ex);
- }
- catch (ClassNotFoundException ex) {
- DTThrowable.rethrow(ex);
- }
- catch (InstantiationException ex) {
- DTThrowable.rethrow(ex);
- }
- catch (IllegalAccessException ex) {
+ } catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException ex) {
DTThrowable.rethrow(ex);
}
}
@@ -77,8 +80,7 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
try {
con.close();
- }
- catch (SQLException ex) {
+ } catch (SQLException ex) {
DTThrowable.rethrow(ex);
}
}
@@ -108,8 +110,7 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
int count = resultSet.getInt(1);
stmt.close();
return count;
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("fetching count", e);
}
}
@@ -145,55 +146,40 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
outputOperator.beginWindow(0);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.beginWindow(1);
- for(int batchCounter = 0;
- batchCounter < HALF_BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 1,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should not be written",
- BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 1, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should not be written", BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.beginWindow(2);
- for(int batchCounter = 0;
- batchCounter < HALF_BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 2,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should not be written",
- 2 * BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 2, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should not be written", 2 * BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.teardown();
}
@@ -207,36 +193,26 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
outputOperator.beginWindow(0);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.beginWindow(1);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- 2 * BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.getStore().disconnect();
@@ -249,46 +225,33 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
outputOperator.setup(context);
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- 2* BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.beginWindow(0);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- 2 * BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.beginWindow(1);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 1,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- 3 * BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 1, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", 3 * BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
}
@Test
@@ -300,36 +263,26 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
outputOperator.beginWindow(0);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.beginWindow(1);
- for(int batchCounter = 0;
- batchCounter < HALF_BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.getStore().disconnect();
@@ -339,49 +292,38 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE);
attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, 0L);
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
- OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+ OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+ OPERATOR_ID, attributeMap);
+
outputOperator.setup(context);
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.beginWindow(0);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.beginWindow(1);
- for(int batchCounter = 0;
- batchCounter < HALF_BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 1,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- 2 * BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 1, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
}
@Test
@@ -393,35 +335,25 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
outputOperator.beginWindow(0);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.beginWindow(1);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- 2 * BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.getStore().disconnect();
@@ -436,20 +368,15 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
outputOperator.beginWindow(2);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 2,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- 3 * BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 2, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", 3 * BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
}
@Test
@@ -461,36 +388,26 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
outputOperator.beginWindow(0);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.beginWindow(1);
- for(int batchCounter = 0;
- batchCounter < HALF_BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < HALF_BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.getStore().disconnect();
@@ -503,28 +420,20 @@ public class JdbcNonTransactionalBatchOutputOperatorTest
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
outputOperator.setup(context);
- Assert.assertEquals("Commit window id ",
- 0,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 0, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
outputOperator.beginWindow(2);
- for(int batchCounter = 0;
- batchCounter < BATCH_SIZE;
- batchCounter++) {
+ for (int batchCounter = 0; batchCounter < BATCH_SIZE; batchCounter++) {
outputOperator.input.put(new TestEvent(random.nextInt()));
}
outputOperator.endWindow();
- Assert.assertEquals("Commit window id ",
- 2,
- outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
- Assert.assertEquals("Batch should be written",
- 2 * BATCH_SIZE,
- outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
+ Assert.assertEquals("Commit window id ", 2, outputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID));
+ Assert.assertEquals("Batch should be written", 2 * BATCH_SIZE,
+ outputOperator.getNumOfEventsInStore(outputOperator.getStore().connection));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
index d539aaa..9880aae 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalOutputOperatorTest.java
@@ -32,10 +32,11 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
import com.datatorrent.api.DAG;
-import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
-import com.google.common.collect.Lists;
+import com.datatorrent.netlet.util.DTThrowable;
/**
* Test for {@link AbstractJdbcNonTransactionableOutputOperator Operator}
@@ -70,8 +71,7 @@ public class JdbcNonTransactionalOutputOperatorTest
String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INTEGER)";
stmt.executeUpdate(createTable);
- }
- catch (Throwable e) {
+ } catch (Throwable e) {
DTThrowable.rethrow(e);
}
}
@@ -84,8 +84,7 @@ public class JdbcNonTransactionalOutputOperatorTest
String cleanTable = "delete from " + TABLE_NAME;
stmt.executeUpdate(cleanTable);
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException(e);
}
}
@@ -122,12 +121,11 @@ public class JdbcNonTransactionalOutputOperatorTest
String countQuery = "SELECT * FROM " + TABLE_NAME;
ResultSet resultSet = stmt.executeQuery(countQuery);
int count = 0;
- while(resultSet.next()) {
+ while (resultSet.next()) {
count++;
}
return count;
- }
- catch (SQLException e) {
+ } catch (SQLException e) {
throw new RuntimeException("fetching count", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java
index 56359fb..ef8f9a0 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcNonTransactionalStoreTest.java
@@ -32,8 +32,7 @@ public class JdbcNonTransactionalStoreTest
JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore();
try {
jdbcNonTransactionalStore.beginTransaction();
- }
- catch(RuntimeException e) {
+ } catch (RuntimeException e) {
return;
}
Assert.fail("Exception should be thrown");
@@ -45,8 +44,7 @@ public class JdbcNonTransactionalStoreTest
JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore();
try {
jdbcNonTransactionalStore.commitTransaction();
- }
- catch(RuntimeException e) {
+ } catch (RuntimeException e) {
return;
}
Assert.fail("Exception should be thrown");
@@ -58,8 +56,7 @@ public class JdbcNonTransactionalStoreTest
JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore();
try {
jdbcNonTransactionalStore.rollbackTransaction();
- }
- catch(RuntimeException e) {
+ } catch (RuntimeException e) {
return;
}
Assert.fail("Exception should be thrown");
@@ -71,8 +68,7 @@ public class JdbcNonTransactionalStoreTest
JdbcNonTransactionalStore jdbcNonTransactionalStore = new JdbcNonTransactionalStore();
try {
jdbcNonTransactionalStore.isInTransaction();
- }
- catch(RuntimeException e) {
+ } catch (RuntimeException e) {
return;
}
Assert.fail("Exception should be thrown");
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
index d9daf97..98be88c 100644
--- a/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/formatter/JsonFormatterTest.java
@@ -25,18 +25,22 @@ import java.io.PrintStream;
import java.util.Date;
import java.util.List;
-import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+
+import com.google.common.collect.Lists;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import com.datatorrent.lib.util.TestUtils.TestInfo;
-import com.google.common.collect.Lists;
public class JsonFormatterTest
{
@@ -150,7 +154,7 @@ public class JsonFormatterTest
Assert.assertEquals(1, validDataSink.collectedTuples.size());
Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
String expectedJSONString = "{\"a\":0,\"b\":0,\"c\":null,\"d\":null,\"date\":null}";
- System.out.println(validDataSink.collectedTuples.get(0));
+ LOG.debug("{}", validDataSink.collectedTuples.get(0));
Assert.assertEquals(expectedJSONString, validDataSink.collectedTuples.get(0));
}
@@ -199,4 +203,6 @@ public class JsonFormatterTest
{
}
+ private static final Logger LOG = LoggerFactory.getLogger(JsonFormatterTest.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
index 50ed3bd..bb51ca4 100644
--- a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java
@@ -33,12 +33,13 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-import com.datatorrent.lib.parser.XmlParser;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
@@ -76,6 +77,7 @@ public class XmlFormatterTest
}
}
+
@Test
public void testOperatorSerialization()
{
@@ -162,7 +164,7 @@ public class XmlFormatterTest
operator.setup(null);
operator.in.process(e);
- System.out.println(validDataSink.collectedTuples.get(0));
+ LOG.debug("{}", validDataSink.collectedTuples.get(0));
Assert.assertEquals(1, validDataSink.collectedTuples.size());
Assert.assertEquals(0, invalidDataSink.collectedTuples.size());
String expected = "<EmployeeBean>" + "<name>john</name>"
@@ -202,7 +204,7 @@ public class XmlFormatterTest
}
- @XmlType (propOrder={"name","dept","eid", "dateOfJoining", "address"})
+ @XmlType(propOrder = {"name", "dept", "eid", "dateOfJoining", "address"})
public static class EmployeeBean
{
@@ -292,4 +294,6 @@ public class XmlFormatterTest
}
+ private static final Logger LOG = LoggerFactory.getLogger(XmlFormatterTest.class);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
index d8138d5..2ece6b2 100644
--- a/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
+++ b/library/src/test/java/com/datatorrent/lib/helper/OperatorContextTestHelper.java
@@ -34,7 +34,7 @@ import com.datatorrent.api.Context.OperatorContext;
*/
public class OperatorContextTestHelper
{
- private final static ThreadLocal<DateFormat> DATE_FORMAT_THREAD_LOCAL = new ThreadLocal<DateFormat>()
+ private static final ThreadLocal<DateFormat> DATE_FORMAT_THREAD_LOCAL = new ThreadLocal<DateFormat>()
{
@Override
protected DateFormat initialValue()
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java b/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java
index 2cc0e1d..9d501aa 100644
--- a/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java
+++ b/library/src/test/java/com/datatorrent/lib/helper/SamplePubSubWebSocketServlet.java
@@ -65,11 +65,9 @@ public class SamplePubSubWebSocketServlet extends WebSocketServlet
if (topic != null) {
subscriber = this;
}
- }
- else if (type.equals("unsubscribe")) {
+ } else if (type.equals("unsubscribe")) {
subscriber = null;
- }
- else if (type.equals("publish")) {
+ } else if (type.equals("publish")) {
Object data = map.get("data");
if (data != null) {
if (subscriber != null) {
@@ -77,8 +75,7 @@ public class SamplePubSubWebSocketServlet extends WebSocketServlet
}
}
}
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
LOG.warn("Data read error", ex);
}
}
@@ -109,8 +106,7 @@ public class SamplePubSubWebSocketServlet extends WebSocketServlet
map.put("data", data);
try {
webSocket.connection.sendMessage(mapper.writeValueAsString(map));
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
LOG.warn("Connection send error", ex);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java b/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java
index e619ff8..a2c021e 100644
--- a/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/ApacheRandomLogsTest.java
@@ -21,7 +21,6 @@ package com.datatorrent.lib.io;
import org.junit.Assert;
import org.junit.Test;
-import com.datatorrent.lib.io.ApacheGenRandomLogs;
import com.datatorrent.lib.testbench.CollectorTestSink;
/**
@@ -29,37 +28,39 @@ import com.datatorrent.lib.testbench.CollectorTestSink;
*/
public class ApacheRandomLogsTest
{
- @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+ @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
@Test
- public void test()
- {
- ApacheGenRandomLogs oper = new ApacheGenRandomLogs();
- CollectorTestSink sink = new CollectorTestSink();
- oper.outport.setSink(sink);
- oper.setup(null);
+ public void test()
+ {
+ ApacheGenRandomLogs oper = new ApacheGenRandomLogs();
+ CollectorTestSink sink = new CollectorTestSink();
+ oper.outport.setSink(sink);
+ oper.setup(null);
- Thread t = new EmitTuples(oper);
- t.start();
- try
- {
- Thread.sleep(1000);
- } catch (InterruptedException e)
- {
- }
- t.stop();
- Assert.assertTrue("Tuples emitted", sink.collectedTuples.size() > 0);
- }
+ Thread t = new EmitTuples(oper);
+ t.start();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ //Fixme
+ }
+ t.stop();
+ Assert.assertTrue("Tuples emitted", sink.collectedTuples.size() > 0);
+ }
- private class EmitTuples extends Thread {
- private ApacheGenRandomLogs oper;
- public EmitTuples(ApacheGenRandomLogs oper)
- {
- this.oper = oper;
- }
- @Override
- public void run()
- {
- oper.emitTuples();
- }
- }
+ private class EmitTuples extends Thread
+ {
+ private ApacheGenRandomLogs oper;
+
+ public EmitTuples(ApacheGenRandomLogs oper)
+ {
+ this.oper = oper;
+ }
+
+ @Override
+ public void run()
+ {
+ oper.emitTuples();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
index ada1148..959e25e 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpJsonChunksInputOperatorTest.java
@@ -29,7 +29,6 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.commons.io.IOUtils;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.eclipse.jetty.server.Handler;
@@ -39,6 +38,8 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.commons.io.IOUtils;
+
import com.datatorrent.lib.testbench.CollectorTestSink;
/**
@@ -74,8 +75,7 @@ public class HttpJsonChunksInputOperatorTest
response.getOutputStream().println();
response.getOutputStream().println(0);
response.getOutputStream().flush();
- }
- catch (JSONException e) {
+ } catch (JSONException e) {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Error generating response: " + e.toString());
}
@@ -88,8 +88,6 @@ public class HttpJsonChunksInputOperatorTest
server.start();
String url = "http://localhost:" + server.getConnectors()[0].getLocalPort() + "/somecontext";
- System.out.println(url);
-
final AbstractHttpInputOperator operator = new HttpJsonChunksInputOperator();
CollectorTestSink sink = new CollectorTestSink();
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
index 10ec6c2..be405ab 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpLinesInputOperatorTest.java
@@ -28,7 +28,6 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.apache.commons.io.IOUtils;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
@@ -36,6 +35,8 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.commons.io.IOUtils;
+
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
@@ -79,7 +80,6 @@ public class HttpLinesInputOperatorTest
server.start();
String url = "http://localhost:" + server.getConnectors()[0].getLocalPort() + "/somecontext";
- System.out.println(url);
final HttpLinesInputOperator operator = new HttpLinesInputOperator();
CollectorTestSink<String> sink = TestUtils.setSink(operator.outputPort, new CollectorTestSink<String>());
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java
index b321cfa..927317e 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpMultiValuedMapGetOperatorTest.java
@@ -28,9 +28,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MultivaluedMap;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.core.util.MultivaluedMapImpl;
-
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
@@ -38,6 +35,9 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Test;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java
index eb69bdf..afe518b 100644
--- a/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/HttpPostOutputOperatorTest.java
@@ -31,7 +31,6 @@ import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;
-import org.apache.commons.io.IOUtils;
import org.codehaus.jettison.json.JSONObject;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
@@ -40,6 +39,8 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.commons.io.IOUtils;
+
/**
* Functional test for {@link com.datatorrent.lib.io.HttpPostOutputOperator}.
*/
@@ -75,8 +76,6 @@ public class HttpPostOutputOperatorTest
server.start();
String url = "http://localhost:" + server.getConnectors()[0].getLocalPort() + "/somecontext";
- System.out.println("url: " + url);
-
HttpPostOutputOperator<Object> node = new HttpPostOutputOperator<Object>();
node.setUrl(url);
@@ -95,7 +94,6 @@ public class HttpPostOutputOperatorTest
}
Assert.assertEquals("number requests", 1, receivedMessages.size());
- System.out.println(receivedMessages.get(0));
JSONObject json = new JSONObject(data);
Assert.assertTrue("request body " + receivedMessages.get(0), receivedMessages.get(0).contains(json.toString()));
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
index 4b29830..acb3fc4 100644
--- a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
@@ -79,8 +79,7 @@ public class IdempotentStorageManagerTest
storageManager.teardown();
try {
FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
@@ -105,7 +104,7 @@ public class IdempotentStorageManagerTest
testMeta.storageManager.save(data, 1, 1);
testMeta.storageManager.setup(testMeta.context);
@SuppressWarnings("unchecked")
- Map<Integer, String> decoded = (Map<Integer, String>) testMeta.storageManager.load(1, 1);
+ Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1);
Assert.assertEquals("dataOf1", data, decoded);
}
@@ -130,8 +129,7 @@ public class IdempotentStorageManagerTest
for (Integer operatorId : decodedStates.keySet()) {
if (operatorId == 1) {
Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1));
- }
- else {
+ } else {
Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2));
}
}
@@ -182,8 +180,7 @@ public class IdempotentStorageManagerTest
testMeta.storageManager.save(dataOf2, 2, 1);
testMeta.storageManager.save(dataOf3, 3, 1);
- testMeta.storageManager.partitioned(Lists.<IdempotentStorageManager>newArrayList(testMeta.storageManager),
- Sets.newHashSet(2, 3));
+ testMeta.storageManager.partitioned(Lists.<IdempotentStorageManager>newArrayList(testMeta.storageManager), Sets.newHashSet(2, 3));
testMeta.storageManager.setup(testMeta.context);
testMeta.storageManager.deleteUpTo(1, 6);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
index 43f9186..7801619 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataOperatorTest.java
@@ -34,8 +34,7 @@ public abstract class PubSubWebSocketAppDataOperatorTest
public static final URI GATEWAY_CONNECT_ADDRESS;
public static final URI URI_ADDRESS;
- static
- {
+ static {
try {
GATEWAY_CONNECT_ADDRESS = new URI("ws://" + GATEWAY_CONNECT_ADDRESS_STRING + "/pubsub");
URI_ADDRESS = new URI(URI_ADDRESS_STRING);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
index 3dc5be3..fc92429 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketAppDataQueryTest.java
@@ -22,13 +22,11 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
-
import com.datatorrent.common.experimental.AppData.ConnectionInfoProvider;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
public class PubSubWebSocketAppDataQueryTest extends PubSubWebSocketAppDataOperatorTest
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
index 402bb34..e165649 100644
--- a/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/PubSubWebSocketOperatorTest.java
@@ -50,7 +50,7 @@ public class PubSubWebSocketOperatorTest
contextHandler.addServlet(sh, "/pubsub");
contextHandler.addServlet(sh, "/*");
server.start();
- Connector connector[] = server.getConnectors();
+ Connector[] connector = server.getConnectors();
URI uri = URI.create("ws://localhost:" + connector[0].getLocalPort() + "/pubsub");
PubSubWebSocketOutputOperator<Object> outputOperator = new PubSubWebSocketOutputOperator<Object>();
@@ -100,10 +100,10 @@ public class PubSubWebSocketOperatorTest
Assert.assertTrue("tuples emitted", sink.collectedTuples.size() > 1);
@SuppressWarnings("unchecked")
- Map<String, String> tuple = (Map<String, String>) sink.collectedTuples.get(0);
+ Map<String, String> tuple = (Map<String, String>)sink.collectedTuples.get(0);
Assert.assertEquals("Expects {\"hello\":\"world\"} as data", "world", tuple.get("hello"));
- String stringResult = (String) sink.collectedTuples.get(1);
+ String stringResult = (String)sink.collectedTuples.get(1);
Assert.assertEquals("Expects {\"hello\":\"world\"} as data", stringData, stringResult);
inputOperator.deactivate();
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
index b4a649a..6bd839d 100644
--- a/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/SmtpOutputOperatorTest.java
@@ -26,20 +26,22 @@ import javax.mail.Message;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
-import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
+import org.apache.hadoop.conf.Configuration;
+
import com.google.common.collect.Maps;
import com.icegreen.greenmail.util.GreenMail;
import com.icegreen.greenmail.util.ServerSetup;
import com.icegreen.greenmail.util.ServerSetupTest;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
public class SmtpOutputOperatorTest
{
@@ -97,7 +99,7 @@ public class SmtpOutputOperatorTest
String expectedContent = content.replace("{}", data.toString()).trim();
Assert.assertTrue(expectedContent.equals(receivedContent));
- Assert.assertEquals(from, ((InternetAddress) messages[0].getFrom()[0]).getAddress());
+ Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress());
Assert.assertEquals(to, messages[0].getRecipients(Message.RecipientType.TO)[0].toString());
Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.TO)[1].toString());
Assert.assertEquals(cc, messages[0].getRecipients(Message.RecipientType.CC)[0].toString());
@@ -121,7 +123,7 @@ public class SmtpOutputOperatorTest
String expectedContent = content.replace("{}", data.toString()).trim();
Assert.assertTrue(expectedContent.equals(receivedContent));
- Assert.assertEquals(from, ((InternetAddress) messages[0].getFrom()[0]).getAddress());
+ Assert.assertEquals(from, ((InternetAddress)messages[0].getFrom()[0]).getAddress());
Assert.assertEquals(to, messages[0].getAllRecipients()[0].toString());
}
@@ -139,7 +141,8 @@ public class SmtpOutputOperatorTest
conf.set(StreamingApplication.DT_PREFIX + "operator.o1.prop.recipients.CC", cc);
final AtomicReference<SmtpOutputOperator> o1 = new AtomicReference<SmtpOutputOperator>();
- StreamingApplication app = new StreamingApplication() {
+ StreamingApplication app = new StreamingApplication()
+ {
@Override
public void populateDAG(DAG dag, Configuration conf)
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java
index 79d780d..a79ace7 100644
--- a/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/SocketInputOperatorTest.java
@@ -87,9 +87,8 @@ public class SocketInputOperatorTest
reader.close();
clientChannel.close();
}
- }
- catch (Exception e) {
- // LOG.debug("server ", e);
+ } catch (Exception e) {
+ //fixme
}
}
}
@@ -118,16 +117,15 @@ public class SocketInputOperatorTest
operator.endWindow();
operator.deactivate();
operator.teardown();
- String outputString = (String) sink.collectedTuples.get(0);
+ String outputString = (String)sink.collectedTuples.get(0);
Assert.assertEquals(strBuffer.substring(0, outputString.length()), sink.collectedTuples.get(0));
int length = outputString.length();
- outputString = (String) sink.collectedTuples.get(1);
+ outputString = (String)sink.collectedTuples.get(1);
Assert.assertEquals(strBuffer.substring(length, length + outputString.length()), sink.collectedTuples.get(1));
server.interrupt();
server.join();
Thread.sleep(1000);
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.debug("exception", e);
}
}
@@ -161,15 +159,14 @@ public class SocketInputOperatorTest
int endIndex = 0;
int start = 0;
for (int i = 0; i < 10; i++) {
- endIndex += ((String) sink.collectedTuples.get(i)).length();
+ endIndex += ((String)sink.collectedTuples.get(i)).length();
Assert.assertEquals(strBuffer.substring(start, endIndex), sink.collectedTuples.get(i));
start = endIndex;
}
server.interrupt();
server.join();
Thread.sleep(1000);
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.debug("exception", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java
index 184a6bd..5ce5276 100644
--- a/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/WebSocketServerInputOperatorTest.java
@@ -19,19 +19,18 @@
package com.datatorrent.lib.io;
import java.net.URI;
-
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import com.google.common.collect.Lists;
-
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketClient;
import org.eclipse.jetty.websocket.WebSocketClientFactory;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
public class WebSocketServerInputOperatorTest
{
@Test
@@ -57,11 +56,10 @@ public class WebSocketServerInputOperatorTest
long startTime = System.currentTimeMillis();
- while(startTime + 10000 > System.currentTimeMillis()) {
- if(TestWSSIO.messages.size() >= 1) {
+ while (startTime + 10000 > System.currentTimeMillis()) {
+ if (TestWSSIO.messages.size() >= 1) {
break;
}
-
Thread.sleep(100);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java
index b50fe29..d11125b 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorFailureHandlingTest.java
@@ -18,23 +18,35 @@
*/
package com.datatorrent.lib.io.fs;
-import com.datatorrent.api.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashSet;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.lib.util.TestUtils;
import com.datatorrent.lib.util.TestUtils.TestInfo;
-import com.google.common.collect.*;
-import java.io.*;
-import java.util.*;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.*;
-import org.junit.*;
public class AbstractFileInputOperatorFailureHandlingTest
{
- @Rule public TestInfo testMeta = new TestInfo();
+ @Rule
+ public TestInfo testMeta = new TestInfo();
public static class TestFileInputOperator extends AbstractFileInputOperator<String>
{
@@ -60,7 +72,8 @@ public class AbstractFileInputOperatorFailureHandlingTest
br = null;
}
- @Override protected InputStream retryFailedFile(FailedFile ff) throws IOException
+ @Override
+ protected InputStream retryFailedFile(FailedFile ff) throws IOException
{
count = 0;
return super.retryFailedFile(ff);
@@ -90,13 +103,13 @@ public class AbstractFileInputOperatorFailureHandlingTest
FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.getDir()).getAbsolutePath()), true);
HashSet<String> allLines = Sets.newHashSet();
// Create files with 100 records.
- for (int file=0; file<10; file++) {
+ for (int file = 0; file < 10; file++) {
HashSet<String> lines = Sets.newHashSet();
- for (int line=0; line<10; line++) {
- lines.add("f"+file+"l"+line);
+ for (int line = 0; line < 10; line++) {
+ lines.add("f" + file + "l" + line);
}
allLines.addAll(lines);
- FileUtils.write(new File(testMeta.getDir(), "file"+file), StringUtils.join(lines, '\n'));
+ FileUtils.write(new File(testMeta.getDir(), "file" + file), StringUtils.join(lines, '\n'));
}
Thread.sleep(10);
@@ -104,15 +117,16 @@ public class AbstractFileInputOperatorFailureHandlingTest
TestFileInputOperator oper = new TestFileInputOperator();
CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
- @SuppressWarnings({ "unchecked", "rawtypes" })
- CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
oper.output.setSink(sink);
oper.setDirectory(testMeta.getDir());
oper.getScanner().setFilePatternRegexp(".*file[\\d]");
- oper.setup(new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap()));
- for (long wid=0; wid<1000; wid++) {
+ oper.setup(
+ new OperatorContextTestHelper.TestIdOperatorContext(1, new Attribute.AttributeMap.DefaultAttributeMap()));
+ for (long wid = 0; wid < 1000; wid++) {
oper.beginWindow(wid);
oper.emitTuples();
oper.endWindow();
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index 3a8661c..ea16185 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -86,34 +86,35 @@ public class AbstractFileInputOperatorTest
}
}
- @Rule public TestMeta testMeta = new TestMeta();
+ @Rule
+ public TestMeta testMeta = new TestMeta();
@Test
public void testSinglePartiton() throws Exception
{
FileContext.getLocalFSFileContext().delete(new Path(new File(testMeta.dir).getAbsolutePath()), true);
HashSet<String> allLines = Sets.newHashSet();
- for (int file=0; file<2; file++) {
+ for (int file = 0; file < 2; file++) {
HashSet<String> lines = Sets.newHashSet();
- for (int line=0; line<2; line++) {
- lines.add("f"+file+"l"+line);
+ for (int line = 0; line < 2; line++) {
+ lines.add("f" + file + "l" + line);
}
allLines.addAll(lines);
- FileUtils.write(new File(testMeta.dir, "file"+file), StringUtils.join(lines, '\n'));
+ FileUtils.write(new File(testMeta.dir, "file" + file), StringUtils.join(lines, '\n'));
}
LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
- @SuppressWarnings({ "unchecked", "rawtypes" })
- CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
oper.output.setSink(sink);
oper.setDirectory(testMeta.dir);
oper.getScanner().setFilePatternRegexp(".*file[\\d]");
oper.setup(testMeta.context);
- for (long wid=0; wid<3; wid++) {
+ for (long wid = 0; wid < 3; wid++) {
oper.beginWindow(wid);
oper.emitTuples();
oper.endWindow();
@@ -133,8 +134,8 @@ public class AbstractFileInputOperatorTest
Path path = new Path(new File(testMeta.dir).getAbsolutePath());
FileContext.getLocalFSFileContext().delete(path, true);
- for (int file=0; file<4; file++) {
- FileUtils.write(new File(testMeta.dir, "partition00"+file), "");
+ for (int file = 0; file < 4; file++) {
+ FileUtils.write(new File(testMeta.dir, "partition00" + file), "");
}
FileSystem fs = FileSystem.get(FileContext.getLocalFSFileContext().getDefaultFileSystem().getUri(), new Configuration());
@@ -158,13 +159,14 @@ public class AbstractFileInputOperatorTest
Path path = new Path(new File(testMeta.dir).getAbsolutePath());
FileContext.getLocalFSFileContext().delete(path, true);
- for (int file=0; file<4; file++) {
- FileUtils.write(new File(testMeta.dir, "partition00"+file), "");
+ for (int file = 0; file < 4; file++) {
+ FileUtils.write(new File(testMeta.dir, "partition00" + file), "");
}
List<Partition<AbstractFileInputOperator<String>>> partitions = Lists.newArrayList();
partitions.add(new DefaultPartition<AbstractFileInputOperator<String>>(oper));
- Collection<Partition<AbstractFileInputOperator<String>>> newPartitions = oper.definePartitions(partitions, new PartitioningContextImpl(null, 2));
+ Collection<Partition<AbstractFileInputOperator<String>>> newPartitions = oper.definePartitions(partitions,
+ new PartitioningContextImpl(null, 2));
Assert.assertEquals(2, newPartitions.size());
Assert.assertEquals(1, oper.getCurrentPartitions()); // partitioned() wasn't called
@@ -202,20 +204,20 @@ public class AbstractFileInputOperatorTest
Path path = new Path(new File(testMeta.dir).getAbsolutePath());
FileContext.getLocalFSFileContext().delete(path, true);
int file;
- for (file=0; file<4; file++) {
- FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n");
+ for (file = 0; file < 4; file++) {
+ FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n");
}
CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
- @SuppressWarnings({ "unchecked", "rawtypes" })
- CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
oper.output.setSink(sink);
int wid = 0;
// Read all records to populate processedList in operator.
oper.setup(testMeta.context);
- for(int i = 0; i < 10; i++) {
+ for (int i = 0; i < 10; i++) {
oper.beginWindow(wid);
oper.emitTuples();
oper.endWindow();
@@ -233,7 +235,7 @@ public class AbstractFileInputOperatorTest
partitions.add(new DefaultPartition<AbstractFileInputOperator<String>>(oper));
// incremental capacity controlled partitionCount property
Collection<Partition<AbstractFileInputOperator<String>>> newPartitions = initialState.definePartitions(partitions,
- new PartitioningContextImpl(null, 0));
+ new PartitioningContextImpl(null, 0));
Assert.assertEquals(2, newPartitions.size());
Assert.assertEquals(1, initialState.getCurrentPartitions());
Map<Integer, Partition<AbstractFileInputOperator<String>>> m = Maps.newHashMap();
@@ -253,8 +255,8 @@ public class AbstractFileInputOperatorTest
}
sink.clear();
- for(int i = 0; i < 10; i++) {
- for(AbstractFileInputOperator<String> o : opers) {
+ for (int i = 0; i < 10; i++) {
+ for (AbstractFileInputOperator<String> o : opers) {
o.beginWindow(wid);
o.emitTuples();
o.endWindow();
@@ -266,12 +268,12 @@ public class AbstractFileInputOperatorTest
Assert.assertEquals("No new tuples read ", 0, sink.collectedTuples.size());
// Add four new files with 3 records each.
- for (; file<8; file++) {
- FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n");
+ for (; file < 8; file++) {
+ FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n");
}
- for(int i = 0; i < 10; i++) {
- for(AbstractFileInputOperator<String> o : opers) {
+ for (int i = 0; i < 10; i++) {
+ for (AbstractFileInputOperator<String> o : opers) {
o.beginWindow(wid);
o.emitTuples();
o.endWindow();
@@ -306,20 +308,20 @@ public class AbstractFileInputOperatorTest
Path path = new Path(new File(testMeta.dir).getAbsolutePath());
FileContext.getLocalFSFileContext().delete(path, true);
int file;
- for (file=0; file<4; file++) {
- FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n");
+ for (file = 0; file < 4; file++) {
+ FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n");
}
CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
- @SuppressWarnings({ "unchecked", "rawtypes" })
- CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
oper.output.setSink(sink);
int wid = 0;
//Read some records
oper.setup(testMeta.context);
- for(int i = 0; i < 5; i++) {
+ for (int i = 0; i < 5; i++) {
oper.beginWindow(wid);
oper.emitTuples();
oper.endWindow();
@@ -357,8 +359,8 @@ public class AbstractFileInputOperatorTest
}
sink.clear();
- for(int i = 0; i < 10; i++) {
- for(AbstractFileInputOperator<String> o : opers) {
+ for (int i = 0; i < 10; i++) {
+ for (AbstractFileInputOperator<String> o : opers) {
o.beginWindow(wid);
o.emitTuples();
o.endWindow();
@@ -391,20 +393,20 @@ public class AbstractFileInputOperatorTest
Path path = new Path(new File(testMeta.dir).getAbsolutePath());
FileContext.getLocalFSFileContext().delete(path, true);
int file;
- for (file=0; file<4; file++) {
- FileUtils.write(new File(testMeta.dir, "partition00"+file), "a\nb\nc\n");
+ for (file = 0; file < 4; file++) {
+ FileUtils.write(new File(testMeta.dir, "partition00" + file), "a\nb\nc\n");
}
CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
- @SuppressWarnings({ "unchecked", "rawtypes" })
- CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
oper.output.setSink(sink);
int wid = 0;
//Read some records
oper.setup(testMeta.context);
- for(int i = 0; i < 5; i++) {
+ for (int i = 0; i < 5; i++) {
oper.beginWindow(wid);
oper.emitTuples();
oper.endWindow();
@@ -442,8 +444,8 @@ public class AbstractFileInputOperatorTest
}
sink.clear();
- for(int i = 0; i < 10; i++) {
- for(AbstractFileInputOperator<String> o : opers) {
+ for (int i = 0; i < 10; i++) {
+ for (AbstractFileInputOperator<String> o : opers) {
o.beginWindow(wid);
o.emitTuples();
o.endWindow();
@@ -475,7 +477,7 @@ public class AbstractFileInputOperatorTest
CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
@SuppressWarnings({ "unchecked", "rawtypes" })
- CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
oper.output.setSink(sink);
oper.setDirectory(testMeta.dir);
@@ -510,7 +512,7 @@ public class AbstractFileInputOperatorTest
CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
@SuppressWarnings({"unchecked", "rawtypes"})
- CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
oper.output.setSink(sink);
oper.setDirectory(testMeta.dir);
@@ -545,7 +547,7 @@ public class AbstractFileInputOperatorTest
CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
@SuppressWarnings({"unchecked", "rawtypes"})
- CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
oper.output.setSink(sink);
oper.setDirectory(testMeta.dir);
@@ -581,7 +583,7 @@ public class AbstractFileInputOperatorTest
CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
@SuppressWarnings({"unchecked", "rawtypes"})
- CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
oper.output.setSink(sink);
oper.setDirectory(testMeta.dir);
@@ -713,7 +715,7 @@ public class AbstractFileInputOperatorTest
CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
@SuppressWarnings({"unchecked", "rawtypes"})
- CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
oper.output.setSink(sink);
oper.setDirectory(testMeta.dir);
@@ -775,7 +777,7 @@ public class AbstractFileInputOperatorTest
CollectorTestSink<String> queryResults = new CollectorTestSink<String>();
@SuppressWarnings({"unchecked", "rawtypes"})
- CollectorTestSink<Object> sink = (CollectorTestSink) queryResults;
+ CollectorTestSink<Object> sink = (CollectorTestSink)queryResults;
oper.output.setSink(sink);
oper.setDirectory(testMeta.dir);
@@ -837,7 +839,7 @@ public class AbstractFileInputOperatorTest
List<TestStorageManager> storageManagers = Lists.newLinkedList();
for (Partition<AbstractFileInputOperator<String>> p : newPartitions) {
- storageManagers.add((TestStorageManager) p.getPartitionedInstance().idempotentStorageManager);
+ storageManagers.add((TestStorageManager)p.getPartitionedInstance().idempotentStorageManager);
}
Assert.assertEquals("count of storage managers", 2, storageManagers.size());