You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by qu...@apache.org on 2016/05/27 17:33:37 UTC
[3/5] incubator-quarks git commit: [QUARKS-181] Simplify code
[QUARKS-181] Simplify code
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/3b66fac4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/3b66fac4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/3b66fac4
Branch: refs/heads/master
Commit: 3b66fac41599aa2401e6036045b382953ae47e9b
Parents: 46d7ac2
Author: Queenie Ma <qu...@gmail.com>
Authored: Thu May 26 15:13:20 2016 -0700
Committer: Queenie Ma <qu...@gmail.com>
Committed: Fri May 27 09:50:42 2016 -0700
----------------------------------------------------------------------
.../connectors/file/FileStreamsGlobalTest.java | 19 +-
.../test/connectors/file/FileStreamsTest.java | 30 +--
.../FileStreamsTextFileWriterGlobalTest.java | 86 +-------
.../file/FileStreamsTextFileWriterTest.java | 166 +++++---------
.../test/connectors/http/HttpGlobalTest.java | 21 +-
.../quarks/test/connectors/http/HttpTest.java | 15 +-
.../connectors/jdbc/JdbcStreamsGlobalTest.java | 41 +---
.../test/connectors/jdbc/JdbcStreamsTest.java | 124 +++++------
.../kafka/KafkaStreamsGlobalTestManual.java | 31 +--
.../kafka/KafkaStreamsTestManual.java | 60 ++---
.../mqtt/MqttStreamsGlobalTestManual.java | 111 +---------
.../connectors/mqtt/MqttStreamsTestManual.java | 220 ++++++-------------
.../connectors/pubsub/PubSubGlobalTest.java | 22 +-
.../test/connectors/pubsub/PubSubTest.java | 42 ++--
.../websocket/WebSocketClientGlobalTest.java | 95 +-------
.../javax/websocket/WebSocketClientTest.java | 209 +++++++-----------
16 files changed, 359 insertions(+), 933 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsGlobalTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsGlobalTest.java
index 46abbd3..91514fd 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsGlobalTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsGlobalTest.java
@@ -18,31 +18,18 @@ under the License.
*/
package quarks.test.connectors.file;
-import org.junit.Test;
-
/**
* FileStreams connector globalization tests.
*/
public class FileStreamsGlobalTest extends FileStreamsTest {
- String[] globalLines = new String[] {
+ private static final String[] globalLines = new String[] {
"\u5b78\u800c\u6642\u7fd2\u4e4b",
"\u4e0d\u4ea6\u8aaa\u4e4e"
};
- @Test
- public void testGlobalTextFileReader() throws Exception {
- super.testTextFileReader(globalLines);
- }
-
- @Test
- public void testGlobalTextFileReaderProblemPaths() throws Exception {
- super.testTextFileReaderProblemPaths(globalLines);
- }
-
- @Test
- public void testGlobalTextFileReaderPrePost() throws Exception {
- super.testTextFileReaderPrePost(globalLines);
+ public String[] getLines() {
+ return globalLines;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
index 7d6dea2..1d52e5e 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
@@ -58,6 +58,10 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
"But make allowance for their doubting too;"
};
+ public String[] getLines() {
+ return stdLines;
+ }
+
/**
* Test that directory watcher creates the correct output.
*/
@@ -188,23 +192,9 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
@Test
public void testTextFileReader() throws Exception {
- testTextFileReader(stdLines);
- }
-
- @Test
- public void testTextFileReaderProblemPaths() throws Exception {
- testTextFileReaderProblemPaths(stdLines);
- }
-
- @Test
- public void testTextFileReaderPrePost() throws Exception {
- testTextFileReaderPrePost(stdLines);
- }
-
- public void testTextFileReader(String[] stdLines) throws Exception {
Topology t = newTopology("testTextFileReader");
- String[] lines = stdLines;
+ String[] lines = getLines();
String[] ucLines = Stream.of(lines)
.map(line -> line.toUpperCase())
.toArray(String[]::new);
@@ -227,10 +217,11 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
}
}
- public void testTextFileReaderProblemPaths(String[] stdLines) throws Exception {
+ @Test
+ public void testTextFileReaderProblemPaths() throws Exception {
Topology t = newTopology("testTextFileReaderProblemPaths");
- String[] lines = stdLines;
+ String[] lines = getLines();
String[] ucLines = Stream.of(lines)
.map(line -> line.toUpperCase())
.toArray(String[]::new);
@@ -258,10 +249,11 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
}
}
- public void testTextFileReaderPrePost(String[] stdLines) throws Exception {
+ @Test
+ public void testTextFileReaderPrePost() throws Exception {
Topology t = newTopology("testTextFileReaderPrePost");
- String[] lines = stdLines;
+ String[] lines = getLines();
String[] ucLines = Stream.of(lines)
.map(line -> line.toUpperCase())
.toArray(String[]::new);
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
index 8a372fb..86213c6 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
@@ -18,99 +18,25 @@ under the License.
*/
package quarks.test.connectors.file;
-import org.junit.Test;
-
/**
* FileStreamsTextFileWriter connector globalization tests.
*/
public class FileStreamsTextFileWriterGlobalTest extends FileStreamsTextFileWriterTest {
- String globalStr = "\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d";
- String[] globalLines = new String[] {
+ private static final String globalStr = "\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d";
+ private static final String[] globalLines = new String[] {
"1-"+globalStr,
"2-"+globalStr,
"3-"+globalStr,
"4-"+globalStr
};
- @Test
- public void testGlobalOneFileCreated() throws Exception {
- super.testOneFileCreated(globalLines);
- }
-
- @Test
- public void testGlobalManyFiles() throws Exception {
- super.testManyFiles(globalLines);
- }
-
- @Test
- public void testGlobalManyFilesSlow() throws Exception {
- super.testManyFilesSlow(globalLines);
- }
-
- @Test
- public void testGlobalRetainCntBased() throws Exception {
- super.testRetainCntBased(globalLines);
- }
-
- @Test
- public void testGlobalRetainAggSizeBased() throws Exception {
- super.testRetainAggSizeBased(globalLines, globalStr);
- }
-
- @Test
- public void testGlobalRetainAgeBased() throws Exception {
- super.testRetainAgeBased(globalLines);
- }
-
- @Test
- public void testGlobalFlushImplicit() throws Exception {
- super.testFlushImplicit(globalLines);
- }
-
- @Test
- public void testGlobalFlushCntBased() throws Exception {
- super.testFlushCntBased(globalLines);
- }
-
- @Test
- public void testGlobalFlushTimeBased() throws Exception {
- super.testFlushTimeBased(globalLines);
- }
-
- @Test
- public void testGlobalFlushTupleBased() throws Exception {
- super.testFlushTupleBased(globalLines);
- }
-
- @Test
- public void testGlobalCycleCntBased() throws Exception {
- super.testCycleCntBased(globalLines);
- }
-
- @Test
- public void testGlobalCycleSizeBased() throws Exception {
- super.testCycleSizeBased(globalLines);
- }
-
- @Test
- public void testGlobalCycleTimeBased() throws Exception {
- super.testCycleTimeBased(globalLines);
- }
-
- @Test
- public void testGlobalCycleTupleBased() throws Exception {
- super.testCycleTupleBased(globalLines);
- }
-
- @Test
- public void testGlobalAllTimeBased() throws Exception {
- super.testAllTimeBased(globalLines);
+ public String getStr() {
+ return globalStr;
}
- @Test
- public void testGlobalWriterWatcherReader() throws Exception {
- super.testWriterWatcherReader(globalLines);
+ public String[] getLines() {
+ return globalLines;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
index 74139b2..d9b25c2 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
@@ -72,6 +72,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
private int TMO_SEC = 2;
+ public String getStr() {
+ return str;
+ }
+
+ public String[] getLines() {
+ return stdLines;
+ }
+
@Test
public void testFlushConfig() throws Exception {
FileWriterFlushConfig<String> cfg;
@@ -271,92 +279,13 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
@Test
public void testOneFileCreated() throws Exception {
- testOneFileCreated(stdLines);
- }
-
- @Test
- public void testManyFiles() throws Exception {
- testManyFiles(stdLines);
- }
-
- @Test
- public void testManyFilesSlow() throws Exception {
- testManyFilesSlow(stdLines);
- }
-
- @Test
- public void testRetainCntBased() throws Exception {
- testRetainCntBased(stdLines);
- }
-
- @Test
- public void testRetainAggSizeBased() throws Exception {
- testRetainAggSizeBased(stdLines, str);
- }
-
- @Test
- public void testRetainAgeBased() throws Exception {
- testRetainAgeBased(stdLines);
- }
-
- @Test
- public void testFlushImplicit() throws Exception {
- testFlushImplicit(stdLines);
- }
-
- @Test
- public void testFlushCntBased() throws Exception {
- testFlushCntBased(stdLines);
- }
-
- @Test
- public void testFlushTimeBased() throws Exception {
- testFlushTimeBased(stdLines);
- }
-
- @Test
- public void testFlushTupleBased() throws Exception {
- testFlushTupleBased(stdLines);
- }
-
- @Test
- public void testCycleCntBased() throws Exception {
- testCycleCntBased(stdLines);
- }
-
- @Test
- public void testCycleSizeBased() throws Exception {
- testCycleSizeBased(stdLines);
- }
-
- @Test
- public void testCycleTimeBased() throws Exception {
- testCycleTimeBased(stdLines);
- }
-
- @Test
- public void testCycleTupleBased() throws Exception {
- testCycleTupleBased(stdLines);
- }
-
- @Test
- public void testAllTimeBased() throws Exception {
- testAllTimeBased(stdLines);
- }
-
- @Test
- public void testWriterWatcherReader() throws Exception {
- testWriterWatcherReader(stdLines);
- }
-
- public void testOneFileCreated(String[] stdLines) throws Exception {
// all lines into a single (the first) file
Topology t = newTopology("testOneFileCreated");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net all in one, the first, file
@@ -373,13 +302,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
assertNotNull(sink);
}
- public void testManyFiles(String[] stdLines) throws Exception {
+ @Test
+ public void testManyFiles() throws Exception {
Topology t = newTopology("testManyFiles");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net one tuples per file
@@ -400,13 +330,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
- public void testManyFilesSlow(String[] stdLines) throws Exception {
+ @Test
+ public void testManyFilesSlow() throws Exception {
Topology t = newTopology("testManyFilesSlow");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net one tuples per file
@@ -429,14 +360,15 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
basePath, expResults);
}
- public void testRetainCntBased(String[] stdLines) throws Exception {
+ @Test
+ public void testRetainCntBased() throws Exception {
// more lines than configured retained numFiles; only keep the last numFiles
Topology t = newTopology("testRetainCntBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net one tuples per file
@@ -458,20 +390,21 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
- public void testRetainAggSizeBased(String[] stdLines, String str) throws Exception {
+ @Test
+ public void testRetainAggSizeBased() throws Exception {
// more aggsize than configured; only keep aggsize worth
Topology t = newTopology("testRetainAggSizeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net one tuple per file
List<List<String>> expResults = buildExpResults(lines, tuple -> true);
// agg size only enough for last two lines
- long aggregateFileSize = 2 * (("1-"+str).length() + 1/*eol*/);
+ long aggregateFileSize = 2 * (("1-"+getStr()).length() + 1/*eol*/);
expResults.remove(0);
expResults.remove(0);
assertEquals(2, expResults.size());
@@ -488,13 +421,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
- public void testRetainAgeBased(String[] stdLines) throws Exception {
+ @Test
+ public void testRetainAgeBased() throws Exception {
Topology t = newTopology("testRetainAgeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
int keepCnt = 2; // only keep the last n files with throttling, age,
@@ -534,13 +468,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
basePath, expResults);
}
- public void testFlushImplicit(String[] stdLines) throws Exception {
+ @Test
+ public void testFlushImplicit() throws Exception {
Topology t = newTopology("testFlushImplicit");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net all in one, the first, file
@@ -558,13 +493,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
- public void testFlushCntBased(String[] stdLines) throws Exception {
+ @Test
+ public void testFlushCntBased() throws Exception {
Topology t = newTopology("testFlushCntBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net all in one, the first, file
@@ -582,13 +518,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
- public void testFlushTimeBased(String[] stdLines) throws Exception {
+ @Test
+ public void testFlushTimeBased() throws Exception {
Topology t = newTopology("testFlushTimeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net all in one, the first, file
@@ -611,13 +548,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
basePath, expResults);
}
- public void testFlushTupleBased(String[] stdLines) throws Exception {
+ @Test
+ public void testFlushTupleBased() throws Exception {
Topology t = newTopology("testFlushTupleBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net all in one, the first, file
@@ -636,13 +574,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
- public void testCycleCntBased(String[] stdLines) throws Exception {
+ @Test
+ public void testCycleCntBased() throws Exception {
Topology t = newTopology("testCycleCntBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net two tuples per file
@@ -664,13 +603,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
- public void testCycleSizeBased(String[] stdLines) throws Exception {
+ @Test
+ public void testCycleSizeBased() throws Exception {
Topology t = newTopology("testCycleSizeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net one tuple per file
@@ -689,13 +629,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
- public void testCycleTimeBased(String[] stdLines) throws Exception {
+ @Test
+ public void testCycleTimeBased() throws Exception {
Topology t = newTopology("testCycleTimeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// net one tuple per file with 250msec cycle config and 1 throttle
@@ -720,13 +661,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
basePath, expResults);
}
- public void testCycleTupleBased(String[] stdLines) throws Exception {
+ @Test
+ public void testCycleTupleBased() throws Exception {
Topology t = newTopology("testCycleTupleBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// a tuple based config / predicate. in this case should end up with 3 files.
@@ -746,14 +688,15 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
- public void testAllTimeBased(String[] stdLines) throws Exception {
+ @Test
+ public void testAllTimeBased() throws Exception {
// exercise case with multiple timer based policies
Topology t = newTopology("testAllTimeBased");
// establish a base path
Path basePath = createTempFile("test1", "txt", new String[0]);
- String[] lines = stdLines;
+ String[] lines = getLines();
// build expected results
// keep all given age and TMO_SEC
@@ -774,7 +717,8 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
}
- public void testWriterWatcherReader(String[] stdLines) throws Exception {
+ @Test
+ public void testWriterWatcherReader() throws Exception {
// verify all the pieces work together
Topology t = newTopology("testWriterWatcherReader");
@@ -782,7 +726,7 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
Path dir = Files.createTempDirectory(testDirPrefix);
Path basePath = dir.resolve("writerCreated");
- String[] lines = stdLines;
+ String[] lines = getLines();
System.out.println("########## "+t.getName());
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/http/src/test/java/quarks/test/connectors/http/HttpGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/quarks/test/connectors/http/HttpGlobalTest.java b/connectors/http/src/test/java/quarks/test/connectors/http/HttpGlobalTest.java
index 7689061..a413f1a 100644
--- a/connectors/http/src/test/java/quarks/test/connectors/http/HttpGlobalTest.java
+++ b/connectors/http/src/test/java/quarks/test/connectors/http/HttpGlobalTest.java
@@ -18,25 +18,22 @@ under the License.
*/
package quarks.test.connectors.http;
-import org.junit.Test;
-
-import com.google.gson.JsonObject;
-
/**
* This globalization test goes against http://httpbin.org
* a freely available web-server for testing requests.
*
*/
-public class HttpGlobalTest {
+public class HttpGlobalTest extends HttpTest {
- @Test
- public void testGlobalJsonGet() throws Exception {
- JsonObject request = new JsonObject();
- request.addProperty("a", "\u5b57\u6bcd");
- request.addProperty("b", "\u56db\u5341\u4e8c");
+ private static final String globalProp1 = "\u5b57\u6bcd";
+ private static final String globalProp2 = "\u56db\u5341\u4e8c";
+
+ public String getProp1() {
+ return globalProp1;
+ }
- HttpTest ht = new HttpTest();
- ht.testJsonGet(request);
+ public String getProp2() {
+ return globalProp2;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/http/src/test/java/quarks/test/connectors/http/HttpTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/quarks/test/connectors/http/HttpTest.java b/connectors/http/src/test/java/quarks/test/connectors/http/HttpTest.java
index 8abcf1f..d75b077 100644
--- a/connectors/http/src/test/java/quarks/test/connectors/http/HttpTest.java
+++ b/connectors/http/src/test/java/quarks/test/connectors/http/HttpTest.java
@@ -46,6 +46,17 @@ import quarks.topology.tester.Tester;
*/
public class HttpTest {
+ private static final String prop1 = "abc";
+ private static final String prop2 = "42";
+
+ public String getProp1() {
+ return prop1;
+ }
+
+ public String getProp2() {
+ return prop2;
+ }
+
@Test
public void testGet() throws Exception {
@@ -129,8 +140,8 @@ public class HttpTest {
@Test
public void testJsonGet() throws Exception {
JsonObject request = new JsonObject();
- request.addProperty("a", "abc");
- request.addProperty("b", "42");
+ request.addProperty("a", getProp1());
+ request.addProperty("b", getProp2());
testJsonGet(request);
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsGlobalTest.java b/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsGlobalTest.java
index bf0c8c1..23d9b3b 100644
--- a/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsGlobalTest.java
+++ b/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsGlobalTest.java
@@ -20,7 +20,6 @@ package quarks.test.connectors.jdbc;
import java.util.ArrayList;
import java.util.List;
-import org.junit.Test;
/**
* JdbcStreams connector globalization tests.
@@ -51,44 +50,12 @@ public class JdbcStreamsGlobalTest extends JdbcStreamsTest {
}
}
- @Test
- public void testGlobalBasicRead() throws Exception {
- super.testBasicRead(globalPersonList, globalPersonIdList);
+ public List<Person> getPersonList() {
+ return globalPersonList;
}
- @Test
- public void testGlobalBasicRead2() throws Exception {
- super.testBasicRead2(globalPersonList, globalPersonIdList);
- }
-
- @Test
- public void testGlobalBasicWrite() throws Exception {
- super.testBasicWrite(globalPersonList, globalPersonIdList);
- }
-
- @Test
- public void testGlobalBasicWrite2() throws Exception {
- super.testBasicWrite2(globalPersonList, globalPersonIdList);
- }
-
- @Test
- public void testGlobalBadConnectFn() throws Exception {
- super.testBadConnectFn(globalPersonList, globalPersonIdList);
- }
-
- @Test
- public void testGlobalBadSQL() throws Exception {
- super.testBadSQL(globalPersonList, globalPersonIdList);
- }
-
- @Test
- public void testGlobalBadSetParams() throws Exception {
- super.testBadSetParams(globalPersonList, globalPersonIdList);
- }
-
- @Test
- public void testGlobalBadResultHandler() throws Exception {
- super.testBadResultHandler(globalPersonList, globalPersonIdList);
+ public List<PersonId> getPersonIdList() {
+ return globalPersonIdList;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsTest.java b/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsTest.java
index 0f0871a..d0294ee 100644
--- a/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsTest.java
+++ b/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsTest.java
@@ -107,6 +107,14 @@ public class JdbcStreamsTest extends ConnectorTestBase {
}
}
+ public List<Person> getPersonList() {
+ return personList;
+ }
+
+ public List<PersonId> getPersonIdList() {
+ return personIdList;
+ }
+
DataSource getDerbyEmbeddedDataSource(String database) throws Exception
{
// Avoid a compile-time dependency to the jdbc driver.
@@ -236,56 +244,17 @@ public class JdbcStreamsTest extends ConnectorTestBase {
@Test
public void testBasicRead() throws Exception {
- testBasicRead(personList, personIdList);
- }
-
- @Test
- public void testBasicRead2() throws Exception {
- testBasicRead2(personList, personIdList);
- }
-
- @Test
- public void testBasicWrite() throws Exception {
- testBasicWrite(personList, personIdList);
- }
-
- @Test
- public void testBasicWrite2() throws Exception {
- testBasicWrite2(personList, personIdList);
- }
-
- @Test
- public void testBadConnectFn() throws Exception {
- testBadConnectFn(personList, personIdList);
- }
-
- @Test
- public void testBadSQL() throws Exception {
- testBadSQL(personList, personIdList);
- }
-
- @Test
- public void testBadSetParams() throws Exception {
- testBadSetParams(personList, personIdList);
- }
-
- @Test
- public void testBadResultHandler() throws Exception {
- testBadResultHandler(personList, personIdList);
- }
-
- public void testBasicRead(List<Person> personList, List<PersonId> personIdList) throws Exception {
Topology t = this.newTopology("testBasicRead");
- populatePersonsTable(personList);
- List<String> expected = expectedPersons(person->true, personList);
+ populatePersonsTable(getPersonList());
+ List<String> expected = expectedPersons(person->true, getPersonList());
JdbcStreams db = new JdbcStreams(t,
() -> getDataSource(DB_NAME),
dataSource -> connect(dataSource));
// Create a stream of Person from a stream of ids
- TStream<Person> rcvdPerson = readPersonsTable(t, db, personIdList, 0/*msec*/);
+ TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 0/*msec*/);
TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
rcvd.sink(tuple -> System.out.println(
@@ -293,13 +262,14 @@ public class JdbcStreamsTest extends ConnectorTestBase {
completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
}
- public void testBasicRead2(List<Person> personList, List<PersonId> personIdList) throws Exception {
+ @Test
+ public void testBasicRead2() throws Exception {
Topology t = newTopology("testBasicRead2");
// same as testBasic but use the explicit PreparedStatement forms
// of executeStatement().
- populatePersonsTable(personList);
- List<String> expected = expectedPersons(person->true, personList);
+ populatePersonsTable(getPersonList());
+ List<String> expected = expectedPersons(person->true, getPersonList());
JdbcStreams db = new JdbcStreams(t,
() -> getDataSource(DB_NAME),
@@ -308,7 +278,7 @@ public class JdbcStreamsTest extends ConnectorTestBase {
// Create a stream of Person from a stream of ids
// Delay so this runs after populating the db above
TStream<PersonId> personIds = PlumbingStreams.blockingOneShotDelay(
- t.collection(personIdList), 3, TimeUnit.SECONDS);
+ t.collection(getPersonIdList()), 3, TimeUnit.SECONDS);
TStream<Person> rcvdPerson = db.executeStatement(personIds,
(cn) -> cn.prepareStatement("SELECT id, firstname, lastname, gender, age"
+ " FROM persons WHERE id = ?"),
@@ -330,18 +300,19 @@ public class JdbcStreamsTest extends ConnectorTestBase {
completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
}
- public void testBasicWrite(List<Person> personList, List<PersonId> personIdList) throws Exception {
+ @Test
+ public void testBasicWrite() throws Exception {
Topology t = newTopology("testBasicWrite");
truncatePersonsTable();
- List<String> expected = expectedPersons(person->true, personList);
+ List<String> expected = expectedPersons(person->true, getPersonList());
JdbcStreams db = new JdbcStreams(t,
() -> getDataSource(DB_NAME),
dataSource -> connect(dataSource));
// Add stream of Person to the db
- TStream<Person> s = t.collection(personList);
+ TStream<Person> s = t.collection(getPersonList());
TSink<Person> sink = db.executeStatement(s,
() -> "INSERT INTO persons VALUES(?,?,?,?,?)",
(tuple,stmt) -> {
@@ -355,7 +326,7 @@ public class JdbcStreamsTest extends ConnectorTestBase {
assertNotNull(sink);
// Use the same code as testBasicRead to verify the write worked.
- TStream<Person> rcvdPerson = readPersonsTable(t, db, personIdList, 3000/*msec*/);
+ TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 3000/*msec*/);
TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
rcvd.sink(tuple -> System.out.println(
@@ -363,20 +334,21 @@ public class JdbcStreamsTest extends ConnectorTestBase {
completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
}
- public void testBasicWrite2(List<Person> personList, List<PersonId> personIdList) throws Exception {
+ @Test
+ public void testBasicWrite2() throws Exception {
Topology t = newTopology("testBasicWrite2");
// same as testBasic but use the explicit PreparedStatement forms
// of executeStatement().
truncatePersonsTable();
- List<String> expected = expectedPersons(person->true, personList);
+ List<String> expected = expectedPersons(person->true, getPersonList());
JdbcStreams db = new JdbcStreams(t,
() -> getDataSource(DB_NAME),
dataSource -> connect(dataSource));
// Add stream of Person to the db
- TStream<Person> s = t.collection(personList);
+ TStream<Person> s = t.collection(getPersonList());
TSink<Person> sink = db.executeStatement(s,
(cn) -> cn.prepareStatement("INSERT into PERSONS values(?,?,?,?,?)"),
(tuple,stmt) -> {
@@ -391,7 +363,7 @@ public class JdbcStreamsTest extends ConnectorTestBase {
assertNotNull(sink);
// Use the same code as testBasicRead to verify the write worked.
- TStream<Person> rcvdPerson = readPersonsTable(t, db, personIdList, 3000/*msec*/);
+ TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 3000/*msec*/);
TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
rcvd.sink(tuple -> System.out.println(
@@ -399,7 +371,8 @@ public class JdbcStreamsTest extends ConnectorTestBase {
completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
}
- public void testBadConnectFn(List<Person> personList, List<PersonId> personIdList) throws Exception {
+ @Test
+ public void testBadConnectFn() throws Exception {
Topology t = newTopology("testBadConnectFn");
// connFn is only called for initial connect or reconnect
// following certain failures.
@@ -411,9 +384,9 @@ public class JdbcStreamsTest extends ConnectorTestBase {
// right before it?), so that we can verify the conn is closed and
// then reconnected
- populatePersonsTable(personList);
- List<String> expected = expectedPersons(p->true, personList.subList(1, personList.size()));
- int expectedExcCnt = personList.size() - expected.size();
+ populatePersonsTable(getPersonList());
+ List<String> expected = expectedPersons(p->true, getPersonList().subList(1, getPersonList().size()));
+ int expectedExcCnt = getPersonList().size() - expected.size();
AtomicInteger connFnCnt = new AtomicInteger();
JdbcStreams db = new JdbcStreams(t,
@@ -427,7 +400,7 @@ public class JdbcStreamsTest extends ConnectorTestBase {
// Create a stream of Person from a stream of ids
AtomicInteger executionExcCnt = new AtomicInteger();
- TStream<PersonId> personIds = t.collection(personIdList);
+ TStream<PersonId> personIds = t.collection(getPersonIdList());
TStream<Person> rcvdPerson = db.executeStatement(personIds,
() -> "SELECT id, firstname, lastname, gender, age"
+ " FROM persons WHERE id = ?",
@@ -454,17 +427,18 @@ public class JdbcStreamsTest extends ConnectorTestBase {
completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
}
-
- public void testBadSQL(List<Person> personList, List<PersonId> personIdList) throws Exception {
+
+ @Test
+ public void testBadSQL() throws Exception {
Topology t = newTopology("testBadSQL");
// the statement is nominally "retrieved" only once, not per-tuple.
// hence, there's not much sense in trying to simulate it
// getting called unsuccessfully, then successfully, etc.
// however, verify the result handler gets called appropriately.
- populatePersonsTable(personList);
+ populatePersonsTable(getPersonList());
List<String> expected = Collections.emptyList();
- int expectedExcCnt = personList.size() - expected.size();
+ int expectedExcCnt = getPersonList().size() - expected.size();
JdbcStreams db = new JdbcStreams(t,
() -> getDataSource(DB_NAME),
@@ -472,7 +446,7 @@ public class JdbcStreamsTest extends ConnectorTestBase {
// Create a stream of Person from a stream of ids
AtomicInteger executionExcCnt = new AtomicInteger();
- TStream<PersonId> personIds = t.collection(personIdList);
+ TStream<PersonId> personIds = t.collection(getPersonIdList());
TStream<Person> rcvdPerson = db.executeStatement(personIds,
() -> "SELECT id, firstname, lastname, gender, age"
+ " FROM persons WHERE BOGUS_XYZZY id = ?",
@@ -500,13 +474,14 @@ public class JdbcStreamsTest extends ConnectorTestBase {
assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
}
- public void testBadSetParams(List<Person> personList, List<PersonId> personIdList) throws Exception {
+ @Test
+ public void testBadSetParams() throws Exception {
Topology t = newTopology("testBadSetParams");
// exercise and validate behavior with transient parameter setter failures
- populatePersonsTable(personList);
- List<String> expected = expectedPersons(newOddIdPredicate(), personList);
- int expectedExcCnt = personList.size() - expected.size();
+ populatePersonsTable(getPersonList());
+ List<String> expected = expectedPersons(newOddIdPredicate(), getPersonList());
+ int expectedExcCnt = getPersonList().size() - expected.size();
JdbcStreams db = new JdbcStreams(t,
() -> getDataSource(DB_NAME),
@@ -514,7 +489,7 @@ public class JdbcStreamsTest extends ConnectorTestBase {
// Create a stream of Person from a stream of ids
AtomicInteger executionExcCnt = new AtomicInteger();
- TStream<PersonId> personIds = t.collection(personIdList);
+ TStream<PersonId> personIds = t.collection(getPersonIdList());
TStream<Person> rcvdPerson = db.executeStatement(personIds,
() -> "SELECT id, firstname, lastname, gender, age"
+ " FROM persons WHERE id = ?",
@@ -545,13 +520,14 @@ public class JdbcStreamsTest extends ConnectorTestBase {
assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
}
- public void testBadResultHandler(List<Person> personList, List<PersonId> personIdList) throws Exception {
+ @Test
+ public void testBadResultHandler() throws Exception {
Topology t = newTopology("testBadResultHandler");
// exercise and validate behavior with transient result handler failures
- populatePersonsTable(personList);
- List<String> expected = expectedPersons(newOddIdPredicate(), personList);
- int expectedExcCnt = personList.size() - expected.size();
+ populatePersonsTable(getPersonList());
+ List<String> expected = expectedPersons(newOddIdPredicate(), getPersonList());
+ int expectedExcCnt = getPersonList().size() - expected.size();
JdbcStreams db = new JdbcStreams(t,
() -> getDataSource(DB_NAME),
@@ -559,7 +535,7 @@ public class JdbcStreamsTest extends ConnectorTestBase {
// Create a stream of Person from a stream of ids
AtomicInteger executionExcCnt = new AtomicInteger();
- TStream<PersonId> personIds = t.collection(personIdList);
+ TStream<PersonId> personIds = t.collection(getPersonIdList());
TStream<Person> rcvdPerson = db.executeStatement(personIds,
() -> "SELECT id, firstname, lastname, gender, age"
+ " FROM persons WHERE id = ?",
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsGlobalTestManual.java b/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
index 33810b0..9ec13f6 100644
--- a/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
+++ b/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
@@ -18,38 +18,19 @@ under the License.
*/
package quarks.test.connectors.kafka;
-import org.junit.Test;
-
/**
* KafkaStreams connector globalization tests.
*/
public class KafkaStreamsGlobalTestManual extends KafkaStreamsTestManual {
- private final String globalMsg1 = "\u4f60\u597d";
- private final String globalMsg2 = "\u4f60\u5728\u55ce";
-
- @Test
- public void testGlobalSimple() throws Exception {
- super.testSimple(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalWithKey() throws Exception {
- super.testWithKey(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalPubSubBytes() throws Exception {
- super.testPubSubBytes(globalMsg1, globalMsg2);
- }
+ private static final String globalMsg1 = "\u4f60\u597d";
+ private static final String globalMsg2 = "\u4f60\u5728\u55ce";
- @Test
- public void testGlobalMultiPub() throws Exception {
- super.testMultiPub(globalMsg1, globalMsg2);
+ public String getMsg1() {
+ return globalMsg1;
}
- @Test(expected=IllegalStateException.class)
- public void testGlobalMultiSubNeg() throws Exception {
- super.testMultiSubNeg(globalMsg1, globalMsg2);
+ public String getMsg2() {
+ return globalMsg2;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsTestManual.java b/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsTestManual.java
index 76d13ad..79b7ab2 100644
--- a/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsTestManual.java
+++ b/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsTestManual.java
@@ -46,6 +46,14 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
private final String msg1 = "Hello";
private final String msg2 = "Are you there?";
+ public String getMsg1() {
+ return msg1;
+ }
+
+ public String getMsg2() {
+ return msg2;
+ }
+
private String[] getKafkaTopics() {
String csvTopics = System.getProperty("quarks.test.connectors.kafka.csvTopics", "testTopic1,testTopic2");
String[] topics = csvTopics.split(",");
@@ -98,36 +106,12 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
}
@Test
- public void testGlobalSimple() throws Exception {
- testSimple(msg1, msg2);
- }
-
- @Test
- public void testGlobalWithKey() throws Exception {
- testWithKey(msg1, msg2);
- }
-
- @Test
- public void testGlobalPubSubBytes() throws Exception {
- testPubSubBytes(msg1, msg2);
- }
-
- @Test
- public void testGlobalMultiPub() throws Exception {
- testMultiPub(msg1, msg2);
- }
-
- @Test(expected=IllegalStateException.class)
- public void testGlobalMultiSubNeg() throws Exception {
- testMultiSubNeg(msg1, msg2);
- }
-
- public void testSimple(String msg1, String msg2) throws Exception {
+ public void testSimple() throws Exception {
Topology t = newTopology("testSimple");
MsgGenerator mgen = new MsgGenerator(t.getName());
String topic = getKafkaTopics()[0];
String groupId = newGroupId(t.getName());
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -149,12 +133,13 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
assertNotNull(sink);
}
- public void testWithKey(String msg1, String msg2) throws Exception {
+ @Test
+ public void testWithKey() throws Exception {
Topology t = newTopology("testWithKey");
MsgGenerator mgen = new MsgGenerator(t.getName());
String topic = getKafkaTopics()[0];
String groupId = newGroupId(t.getName());
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
List<Rec> recs = new ArrayList<>();
int i = 0;
for (String msg : msgs) {
@@ -193,12 +178,13 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
}
- public void testPubSubBytes(String msg1, String msg2) throws Exception {
+ @Test
+ public void testPubSubBytes() throws Exception {
Topology t = newTopology("testPubSubBytes");
MsgGenerator mgen = new MsgGenerator(t.getName());
String topic = getKafkaTopics()[0];
String groupId = newGroupId(t.getName());
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
List<Rec> recs = new ArrayList<>();
int i = 0;
for (String msg : msgs) {
@@ -236,14 +222,15 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
}
- public void testMultiPub(String msg1, String msg2) throws Exception {
+ @Test
+ public void testMultiPub() throws Exception {
Topology t = newTopology("testMultiPub");
MsgGenerator mgen = new MsgGenerator(t.getName());
String topic1 = getKafkaTopics()[0];
String topic2 = getKafkaTopics()[1];
String groupId = newGroupId(t.getName());
- List<String> msgs1 = createMsgs(mgen, topic1, msg1, msg2);
- List<String> msgs2 = createMsgs(mgen, topic2, msg1, msg2);
+ List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+ List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
List<String> msgs = new ArrayList<>(msgs1);
msgs.addAll(msgs2);
@@ -276,14 +263,15 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
assertNotSame(sink1, sink2);
}
- public void testMultiSubNeg(String msg1, String msg2) throws Exception {
+ @Test(expected=IllegalStateException.class)
+ public void testMultiSubNeg() throws Exception {
Topology t = newTopology("testMultiSubNeg");
MsgGenerator mgen = new MsgGenerator(t.getName());
String topic1 = getKafkaTopics()[0];
String topic2 = getKafkaTopics()[1];
String groupId = newGroupId(t.getName());
- List<String> msgs1 = createMsgs(mgen, topic1, msg1, msg2);
- List<String> msgs2 = createMsgs(mgen, topic2, msg1, msg2);
+ List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+ List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
// Multiple subscribe() on a single connection.
// Currently, w/Kafka0.8.2.2, we only support a single
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsGlobalTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsGlobalTestManual.java b/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsGlobalTestManual.java
index 49c3de9..995e0c4 100644
--- a/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsGlobalTestManual.java
+++ b/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsGlobalTestManual.java
@@ -18,8 +18,6 @@ under the License.
*/
package quarks.test.connectors.mqtt;
-import org.junit.Test;
-
/**
* MqttStreams globalization tests
* <p>
@@ -35,113 +33,12 @@ public class MqttStreamsGlobalTestManual extends MqttStreamsTestManual {
private final String globalMsg1 = "\u4f60\u597d";
private final String globalMsg2 = "\u4f60\u5728\u55ce";
- @Test
- public void testGlobalStringPublish() throws Exception {
- super.testStringPublish(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalAutoClientId() throws Exception {
- super.testAutoClientId(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalQoS1() throws Exception {
- super.testQoS1(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalQoS2() throws Exception {
- super.testQoS2(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalGenericPublish() throws Exception {
- super.testGenericPublish(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalMultiConnector() throws Exception {
- super.testMultiConnector(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalMultiTopicPublish() throws Exception {
- super.testMultiTopicPublish(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalMultiTopicSubscribe() throws Exception {
- super.testMultiTopicSubscribe(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalConnectFail() throws Exception {
- super.testConnectFail(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalRetainedFalse() throws Exception {
- super.testRetainedFalse(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalRetainedTrue() throws Exception {
- super.testRetainedTrue(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalMultipleServerURL() throws Exception {
- super.testMultipleServerURL(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalActionTime() throws Exception {
- super.testActionTime(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalIdleSubscribe() throws Exception {
- super.testIdleSubscribe(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalIdlePublish() throws Exception {
- super.testIdlePublish(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalConnectRetryPub() throws Exception {
- super.testConnectRetryPub(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalConnectRetrySub() throws Exception {
- super.testConnectRetrySub(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalSubscribeFnThrow() throws Exception {
- super.testSubscribeFnThrow(globalMsg1, globalMsg2);
- }
-
- @Test
- public void testGlobalPublishFnThrow() throws Exception {
- super.testPublishFnThrow(globalMsg1, globalMsg2);
- }
-
- /*
- * See mqtt/src/test/keystores/README for info about SSL/TLS and mosquitto
- */
-
- @Test
- public void testGlobalSsl() throws Exception {
- super.testSsl(globalMsg1, globalMsg2);
+ public String getMsg1() {
+ return globalMsg1;
}
- @Test
- public void testGlobalSslClientAuth() throws Exception {
- super.testSslClientAuth(globalMsg1, globalMsg2);
+ public String getMsg2() {
+ return globalMsg2;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsTestManual.java b/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsTestManual.java
index be5ec4a..b49df9c 100644
--- a/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsTestManual.java
+++ b/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsTestManual.java
@@ -83,6 +83,14 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
private final String msg1 = "Hello";
private final String msg2 = "Are you there?";
+ public String getMsg1() {
+ return msg1;
+ }
+
+ public String getMsg2() {
+ return msg2;
+ }
+
@Before
public void setupAuthInfo() {
authInfo.clear();
@@ -185,117 +193,13 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
@Test
public void testStringPublish() throws Exception {
- testStringPublish(msg1, msg2);
- }
-
- @Test
- public void testAutoClientId() throws Exception {
- testAutoClientId(msg1, msg2);
- }
-
- @Test
- public void testQoS1() throws Exception {
- testQoS1(msg1, msg2);
- }
-
- @Test
- public void testQoS2() throws Exception {
- testQoS2(msg1, msg2);
- }
-
- @Test
- public void testGenericPublish() throws Exception {
- testGenericPublish(msg1, msg2);
- }
-
- @Test
- public void testMultiConnector() throws Exception {
- testMultiConnector(msg1, msg2);
- }
-
- @Test
- public void testMultiTopicPublish() throws Exception {
- testMultiTopicPublish(msg1, msg2);
- }
-
- @Test
- public void testMultiTopicSubscribe() throws Exception {
- testMultiTopicSubscribe(msg1, msg2);
- }
-
- @Test
- public void testConnectFail() throws Exception {
- testConnectFail(msg1, msg2);
- }
-
- @Test
- public void testRetainedFalse() throws Exception {
- testRetainedFalse(msg1, msg2);
- }
-
- @Test
- public void testRetainedTrue() throws Exception {
- testRetainedTrue(msg1, msg2);
- }
-
- @Test
- public void testMultipleServerURL() throws Exception {
- testMultipleServerURL(msg1, msg2);
- }
-
- @Test
- public void testActionTime() throws Exception {
- testActionTime(msg1, msg2);
- }
-
- @Test
- public void testIdleSubscribe() throws Exception {
- testIdleSubscribe(msg1, msg2);
- }
-
- @Test
- public void testIdlePublish() throws Exception {
- testIdlePublish(msg1, msg2);
- }
-
- @Test
- public void testConnectRetryPub() throws Exception {
- testConnectRetryPub(msg1, msg2);
- }
-
- @Test
- public void testConnectRetrySub() throws Exception {
- testConnectRetrySub(msg1, msg2);
- }
-
- @Test
- public void testSubscribeFnThrow() throws Exception {
- testSubscribeFnThrow(msg1, msg2);
- }
-
- @Test
- public void testPublishFnThrow() throws Exception {
- testPublishFnThrow(msg1, msg2);
- }
-
- @Test
- public void testSsl() throws Exception {
- testSsl(msg1, msg2);
- }
-
- @Test
- public void testSslClientAuth() throws Exception {
- testSslClientAuth(msg1, msg2);
- }
-
- public void testStringPublish(String msg1, String msg2) throws Exception {
Topology top = newTopology("testStringPublish");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -314,13 +218,14 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
assertNotNull(sink);
}
- public void testAutoClientId(String msg1, String msg2) throws Exception {
+ @Test
+ public void testAutoClientId() throws Exception {
Topology top = newTopology("testAutoClientId");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -335,14 +240,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate("some-auto-clientId", top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
- public void testQoS1(String msg1, String msg2) throws Exception {
+ @Test
+ public void testQoS1() throws Exception {
Topology top = newTopology("testQoS1");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 1;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -360,15 +266,16 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
-
- public void testQoS2(String msg1, String msg2) throws Exception {
+
+ @Test
+ public void testQoS2() throws Exception {
Topology top = newTopology("testQoS2");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 2;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -389,7 +296,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
- public void testGenericPublish(String msg1, String msg2) throws Exception {
+ @Test
+ public void testGenericPublish() throws Exception {
Topology top = newTopology("testGenericPublish");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
@@ -398,7 +306,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
// avoid mucking up other test subscribers expecting UTF-8 string payloads
// and use a different topic
String topic = getMqttTopics()[0] + "-Generic";
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
List<String> expMsgsAsStr =
msgs
.stream()
@@ -429,7 +337,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
assertNotNull(sink);
}
- public void testMultiConnector(String msg1, String msg2) throws Exception {
+ @Test
+ public void testMultiConnector() throws Exception {
Topology top = newTopology("testMultiConnector");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
@@ -437,7 +346,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
String pubClientId = newClientId(top.getName())+"_pub";
String subClientId = newClientId(top.getName())+"_sub";
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -455,7 +364,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate(pubClientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
- public void testMultiTopicPublish(String msg1, String msg2) throws Exception {
+ @Test
+ public void testMultiTopicPublish() throws Exception {
Topology top = newTopology("testMultiTopicPublish");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
@@ -463,8 +373,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
String clientId = newClientId(top.getName());
String topic1 = getMqttTopics()[0];
String topic2 = getMqttTopics()[1];
- List<String> msgs1 = createMsgs(mgen, topic1, msg1, msg2);
- List<String> msgs2 = createMsgs(mgen, topic2, msg1, msg2);
+ List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+ List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
// create an interleaved list
List<Msg> msgs = new ArrayList<>();
for (int i = 0; i < msgs1.size(); i++) {
@@ -519,7 +429,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
assertTrue("valid:" + tc, tc.valid());
}
- public void testMultiTopicSubscribe(String msg1, String msg2) throws Exception {
+ @Test
+ public void testMultiTopicSubscribe() throws Exception {
Topology top = newTopology("testMultiTopicSubscribe");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
@@ -528,8 +439,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
String topic1 = getMqttTopics()[0] + "/1";
String topic2 = getMqttTopics()[0] + "/2";
String topics = getMqttTopics()[0] + "/+";
- List<String> msgs1 = createMsgs(mgen, topic1, msg1, msg2);
- List<String> msgs2 = createMsgs(mgen, topic2, msg1, msg2);
+ List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+ List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
List<String> msgs = new ArrayList<>();
msgs.addAll(msgs1);
msgs.addAll(msgs2);
@@ -567,14 +478,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
mqtt.subscribe(topic, qos); // should throw
}
- public void testConnectFail(String msg1, String msg2) throws Exception {
+ @Test
+ public void testConnectFail() throws Exception {
Topology top = newTopology("testConnectFail");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -621,7 +533,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
return retainedMsg;
}
- public void testRetainedFalse(String msg1, String msg2) throws Exception {
+ @Test
+ public void testRetainedFalse() throws Exception {
Topology top = newTopology("testRetainedFalse");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
@@ -637,7 +550,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
System.out.println("=============== setup complete");
// verify the next connect/subscribe [doesn't] sees the retain and then new msgs
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
List<String> expMsgsAsStr =
msgs
.stream()
@@ -663,7 +576,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate(clientId, top, rcvdAsStr, mgen, SEC_TIMEOUT, expMsgsAsStr.toArray(new String[0]));
}
- public void testRetainedTrue(String msg1, String msg2) throws Exception {
+ @Test
+ public void testRetainedTrue() throws Exception {
Topology top = newTopology("testRetainedTrue");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
@@ -678,7 +592,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
System.out.println("=============== setup complete");
// verify the next connect/subscribe [doesn't] sees the retain and then new msgs
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
List<String> expMsgsAsStr =
msgs
.stream()
@@ -912,14 +826,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
propTester.checkAll();
}
- public void testMultipleServerURL(String msg1, String msg2) throws Exception {
+ @Test
+ public void testMultipleServerURL() throws Exception {
Topology top = newTopology("testMultipleServerURL");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -937,14 +852,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
- public void testActionTime(String msg1, String msg2) throws Exception {
+ @Test
+ public void testActionTime() throws Exception {
Topology top = newTopology("testActionTime");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -962,7 +878,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
- public void testIdleSubscribe(String msg1, String msg2) throws Exception {
+ @Test
+ public void testIdleSubscribe() throws Exception {
Topology top = newTopology("testIdleSubscribe");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
@@ -970,7 +887,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
String pubClientId = newClientId(top.getName()+"_pub");
String subClientId = newClientId(top.getName()+"_sub");
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
// Exercise idle timeouts. We won't have any direct
// evidence that an idle disconnect/reconnect happen
@@ -1001,7 +918,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate(subClientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
- public void testIdlePublish(String msg1, String msg2) throws Exception {
+ @Test
+ public void testIdlePublish() throws Exception {
Topology top = newTopology("testIdlePublish");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
@@ -1009,7 +927,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
String pubClientId = newClientId(top.getName()+"_pub");
String subClientId = newClientId(top.getName()+"_sub");
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
// Exercise idle timeouts. We won't have any direct
// evidence that an idle disconnect/reconnect happen
@@ -1035,7 +953,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate(subClientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
}
- public void testConnectRetryPub(String msg1, String msg2) throws Exception {
+ @Test
+ public void testConnectRetryPub() throws Exception {
Topology top = newTopology("testConnectRetryPub");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
@@ -1043,7 +962,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
String pubClientId = newClientId(top.getName()+"_pub");
String subClientId = newClientId(top.getName()+"_sub");
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
// Exercise connection retry by first specifying
// a bogus server url then a good one.
@@ -1083,7 +1002,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate(subClientId, top, rcvd, mgen, SEC_TIMEOUT + 5, msgs.toArray(new String[0]));
}
- public void testConnectRetrySub(String msg1, String msg2) throws Exception {
+ @Test
+ public void testConnectRetrySub() throws Exception {
// Timing variances on shared machines can cause this test to fail
assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
@@ -1094,7 +1014,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
String pubClientId = newClientId(top.getName()+"_pub");
String subClientId = newClientId(top.getName()+"_sub");
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
// Exercise connection retry by first specifying
// a bogus server url then a good one.
@@ -1145,14 +1065,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate(subClientId, top, rcvd, mgen, SEC_TIMEOUT + 5, msgs.toArray(new String[0]));
}
- public void testSubscribeFnThrow(String msg1, String msg2) throws Exception {
+ @Test
+ public void testSubscribeFnThrow() throws Exception {
Topology top = newTopology("testSubscribeFnThrow");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
List<String> expectMsgs = new ArrayList<>(msgs);
expectMsgs.remove(0); // should only lose the 1st tuple
@@ -1186,14 +1107,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, expectMsgs.toArray(new String[0]));
}
- public void testPublishFnThrow(String msg1, String msg2) throws Exception {
+ @Test
+ public void testPublishFnThrow() throws Exception {
Topology top = newTopology("testPublishFnThrow");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
List<String> expectMsgs = new ArrayList<>(msgs);
expectMsgs.remove(0); // should only lose the 1st tuple
@@ -1230,14 +1152,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
* See mqtt/src/test/keystores/README for info about SSL/TLS and mosquitto
*/
- public void testSsl(String msg1, String msg2) throws Exception {
+ @Test
+ public void testSsl() throws Exception {
Topology top = newTopology("testSsl");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -1259,14 +1182,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
assertNotNull(sink);
}
- public void testSslClientAuth(String msg1, String msg2) throws Exception {
+ @Test
+ public void testSslClientAuth() throws Exception {
Topology top = newTopology("testSslClientAuth");
MsgGenerator mgen = new MsgGenerator(top.getName());
int qos = 0;
boolean retain = false;
String clientId = newClientId(top.getName());
String topic = getMqttTopics()[0];
- List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+ List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
TStream<String> s = PlumbingStreams.blockingOneShotDelay(
top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubGlobalTest.java b/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubGlobalTest.java
index be96335..2ba6c00 100644
--- a/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubGlobalTest.java
+++ b/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubGlobalTest.java
@@ -18,8 +18,6 @@ under the License.
*/
package quarks.test.connectors.pubsub;
-import org.junit.Test;
-
/**
* PubSub connector globalization tests.
*/
@@ -27,24 +25,8 @@ public class PubSubGlobalTest extends PubSubTest {
private final String[] globalStrs = new String[] { "\u56db", "\u4e94", "\u516d" };
- /**
- * Test without a pub-sub service so no
- * cross job connections will be made.
- * @throws Exception
- */
- @Test
- public void testGlobalNoService() throws Exception {
- super.testNoService(globalStrs);
- }
-
- @Test(timeout=10000)
- public void testStdProviderServiceSingleSubscriber() throws Exception {
- super.testProviderServiceSingleSubscriber(globalStrs);
- }
-
- @Test(timeout=10000)
- public void testStdProviderServiceMultipleSubscriber() throws Exception {
- super.testProviderServiceMultipleSubscriber(globalStrs);
+ public String[] getStrs() {
+ return globalStrs;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java
----------------------------------------------------------------------
diff --git a/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java b/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java
index 05ccc9a..68a9062 100644
--- a/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java
+++ b/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java
@@ -40,19 +40,8 @@ public class PubSubTest {
private final String[] strs = new String[] { "A", "B", "C" };
- @Test
- public void testNoService() throws Exception {
- testNoService(strs);
- }
-
- @Test(timeout=10000)
- public void testProviderServiceSingleSubscriber() throws Exception {
- testProviderServiceSingleSubscriber(strs);
- }
-
- @Test(timeout=10000)
- public void testProviderServiceMultipleSubscriber() throws Exception {
- testProviderServiceMultipleSubscriber(strs);
+ public String[] getStrs() {
+ return strs;
}
/**
@@ -60,10 +49,11 @@ public class PubSubTest {
* cross job connections will be made.
* @throws Exception
*/
- public void testNoService(String[] strs) throws Exception {
+ @Test
+ public void testNoService() throws Exception {
DirectProvider dp = new DirectProvider();
- TStream<String> publishedStream = createPublisher(dp, "t1", String.class, strs);
+ TStream<String> publishedStream = createPublisher(dp, "t1", String.class, getStrs());
Tester testPub = publishedStream.topology().getTester();
Condition<Long> tcPub = testPub.tupleCount(publishedStream, 3);
@@ -95,18 +85,19 @@ public class PubSubTest {
return PublishSubscribe.subscribe(subscriber, topic, streamType);
}
- public void testProviderServiceSingleSubscriber(String[] strs) throws Exception {
+ @Test(timeout=10000)
+ public void testProviderServiceSingleSubscriber() throws Exception {
DirectProvider dp = new DirectProvider();
dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
- TStream<String> publishedStream = createPublisher(dp, "t1", String.class, strs);
+ TStream<String> publishedStream = createPublisher(dp, "t1", String.class, getStrs());
Tester testPub = publishedStream.topology().getTester();
- Condition<List<String>> tcPub = testPub.streamContents(publishedStream, strs);
+ Condition<List<String>> tcPub = testPub.streamContents(publishedStream, getStrs());
TStream<String> subscribedStream = createSubscriber(dp, "t1", String.class);
Tester testSub = subscribedStream.topology().getTester();
- Condition<List<String>> tcSub = testSub.streamContents(subscribedStream, strs); // Expect all tuples
+ Condition<List<String>> tcSub = testSub.streamContents(subscribedStream, getStrs()); // Expect all tuples
Job js = dp.submit(subscribedStream.topology()).get();
// Give the subscriber a chance to setup.
@@ -125,26 +116,27 @@ public class PubSubTest {
jp.stateChange(Action.CLOSE);
}
- public void testProviderServiceMultipleSubscriber(String[] strs) throws Exception {
+ @Test(timeout=10000)
+ public void testProviderServiceMultipleSubscriber() throws Exception {
DirectProvider dp = new DirectProvider();
dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
- TStream<String> publishedStream = createPublisher(dp, "t1", String.class, strs);
+ TStream<String> publishedStream = createPublisher(dp, "t1", String.class, getStrs());
Tester testPub = publishedStream.topology().getTester();
- Condition<List<String>> tcPub = testPub.streamContents(publishedStream, strs);
+ Condition<List<String>> tcPub = testPub.streamContents(publishedStream, getStrs());
TStream<String> subscribedStream1 = createSubscriber(dp, "t1", String.class);
Tester testSub1 = subscribedStream1.topology().getTester();
- Condition<List<String>> tcSub1 = testSub1.streamContents(subscribedStream1, strs);
+ Condition<List<String>> tcSub1 = testSub1.streamContents(subscribedStream1, getStrs());
TStream<String> subscribedStream2 = createSubscriber(dp, "t1", String.class);
Tester testSub2 = subscribedStream2.topology().getTester();
- Condition<List<String>> tcSub2 = testSub2.streamContents(subscribedStream2, strs);
+ Condition<List<String>> tcSub2 = testSub2.streamContents(subscribedStream2, getStrs());
TStream<String> subscribedStream3 = createSubscriber(dp, "t1", String.class);
Tester testSub3 = subscribedStream3.topology().getTester();
- Condition<List<String>> tcSub3 = testSub3.streamContents(subscribedStream3, strs);
+ Condition<List<String>> tcSub3 = testSub3.streamContents(subscribedStream3, getStrs());
Job js1 = dp.submit(subscribedStream1.topology()).get();
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
index b717579..bf7f343 100644
--- a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
+++ b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
@@ -18,8 +18,6 @@ under the License.
*/
package quarks.tests.connectors.wsclient.javax.websocket;
-import org.junit.Test;
-
/**
* WebSocketClient connector globalization tests.
*/
@@ -29,97 +27,20 @@ public class WebSocketClientGlobalTest extends WebSocketClientTest {
private final static String globalStr3 = "\u4e09\u4e09";
private final static String globalStr4 = "\u56db";
- @Test
- public void testGlobalBasicStaticStuff() {
- super.testBasicStaticStuff(globalStr1, globalStr2);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testGlobalTooManySendersNeg() {
- super.testTooManySendersNeg(globalStr1, globalStr2);
- }
-
- @Test
- public void testGlobalJson() throws Exception {
- super.testJson(globalStr1, globalStr2);
- }
-
- @Test
- public void testGlobalString() throws Exception {
- super.testString(globalStr1, globalStr2);
+ public String getStr1() {
+ return globalStr1;
}
- @Test
- public void testGlobalBytes() throws Exception {
- super.testBytes(globalStr1, globalStr2);
+ public String getStr2() {
+ return globalStr2;
}
- @Test
- public void testGlobalReconnect() throws Exception {
- super.testReconnect(globalStr1, globalStr2, globalStr3, globalStr4);
+ public String getStr3() {
+ return globalStr3;
}
- @Test
- public void testGlobalReconnectBytes() throws Exception {
- super.testReconnectBytes(globalStr1, globalStr2, globalStr3, globalStr4);
+ public String getStr4() {
+ return globalStr4;
}
- @Test
- public void testGlobalSslClientAuthSystemProperty() throws Exception {
- super.testSslClientAuthSystemProperty(globalStr1, globalStr2);
- }
-
- @Test
- public void testGlobalSsl() throws Exception {
- super.testSsl(globalStr1, globalStr2);
- }
-
- @Test
- public void testGlobalSslReconnect() throws Exception {
- super.testSslReconnect(globalStr1, globalStr2, globalStr3, globalStr4);
- }
-
- @Test
- public void testGlobalSslNeg() throws Exception {
- super.testSslNeg(globalStr1, globalStr2);
- }
-
- @Test
- public void testGlobalSslClientAuth() throws Exception {
- super.testSslClientAuth(globalStr1, globalStr2);
- }
-
- @Test
- public void testGlobalSslClientAuthDefault() throws Exception {
- super.testSslClientAuthDefault(globalStr1, globalStr2);
- }
-
- @Test
- public void testGlobalSslClientAuthMy2ndCertNeg() throws Exception {
- super.testSslClientAuthMy2ndCertNeg(globalStr1, globalStr2);
- }
-
- @Test
- public void testGlobalSslClientAuthMy3rdCert() throws Exception {
- super.testSslClientAuthMy3rdCert(globalStr1, globalStr2);
- }
-
- @Test
- public void testGlobalSslClientAuthNeg() throws Exception {
- super.testSslClientAuthNeg(globalStr1, globalStr2);
- }
-
- @Test
- public void testGlobalPublicServer() throws Exception {
- super.testPublicServer(globalStr1, globalStr2);
- }
-
- @Test
- public void testGlobalSslPublicServer() throws Exception {
- super.testSslPublicServer(globalStr1, globalStr2);
- }
-
- public void testGlobalSslPublicServerBadTrustStoreSystemPropertyNeg() throws Exception {
- super.testSslPublicServerBadTrustStoreSystemPropertyNeg(globalStr1, globalStr2);
- }
}