You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by qu...@apache.org on 2016/05/27 17:33:37 UTC

[3/5] incubator-quarks git commit: [QUARKS-181] Simplify code

[QUARKS-181] Simplify code


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/3b66fac4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/3b66fac4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/3b66fac4

Branch: refs/heads/master
Commit: 3b66fac41599aa2401e6036045b382953ae47e9b
Parents: 46d7ac2
Author: Queenie Ma <qu...@gmail.com>
Authored: Thu May 26 15:13:20 2016 -0700
Committer: Queenie Ma <qu...@gmail.com>
Committed: Fri May 27 09:50:42 2016 -0700

----------------------------------------------------------------------
 .../connectors/file/FileStreamsGlobalTest.java  |  19 +-
 .../test/connectors/file/FileStreamsTest.java   |  30 +--
 .../FileStreamsTextFileWriterGlobalTest.java    |  86 +-------
 .../file/FileStreamsTextFileWriterTest.java     | 166 +++++---------
 .../test/connectors/http/HttpGlobalTest.java    |  21 +-
 .../quarks/test/connectors/http/HttpTest.java   |  15 +-
 .../connectors/jdbc/JdbcStreamsGlobalTest.java  |  41 +---
 .../test/connectors/jdbc/JdbcStreamsTest.java   | 124 +++++------
 .../kafka/KafkaStreamsGlobalTestManual.java     |  31 +--
 .../kafka/KafkaStreamsTestManual.java           |  60 ++---
 .../mqtt/MqttStreamsGlobalTestManual.java       | 111 +---------
 .../connectors/mqtt/MqttStreamsTestManual.java  | 220 ++++++-------------
 .../connectors/pubsub/PubSubGlobalTest.java     |  22 +-
 .../test/connectors/pubsub/PubSubTest.java      |  42 ++--
 .../websocket/WebSocketClientGlobalTest.java    |  95 +-------
 .../javax/websocket/WebSocketClientTest.java    | 209 +++++++-----------
 16 files changed, 359 insertions(+), 933 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsGlobalTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsGlobalTest.java
index 46abbd3..91514fd 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsGlobalTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsGlobalTest.java
@@ -18,31 +18,18 @@ under the License.
 */
 package quarks.test.connectors.file;
 
-import org.junit.Test;
-
 /**
  * FileStreams connector globalization tests.
  */
 public class FileStreamsGlobalTest extends FileStreamsTest {
 
-    String[] globalLines = new String[] {
+    private static final String[] globalLines = new String[] {
             "\u5b78\u800c\u6642\u7fd2\u4e4b",
             "\u4e0d\u4ea6\u8aaa\u4e4e"
     };
 
-    @Test
-    public void testGlobalTextFileReader() throws Exception {
-        super.testTextFileReader(globalLines);
-    }
-
-    @Test
-    public void testGlobalTextFileReaderProblemPaths() throws Exception {
-        super.testTextFileReaderProblemPaths(globalLines);
-    }
-
-    @Test
-    public void testGlobalTextFileReaderPrePost() throws Exception {
-        super.testTextFileReaderPrePost(globalLines);
+    public String[] getLines() {
+        return globalLines;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
index 7d6dea2..1d52e5e 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTest.java
@@ -58,6 +58,10 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
             "But make allowance for their doubting too;"                
     };
 
+    public String[] getLines() {
+        return stdLines;
+    }
+
     /**
      * Test that directory watcher creates the correct output.
      */
@@ -188,23 +192,9 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
 
     @Test
     public void testTextFileReader() throws Exception {
-        testTextFileReader(stdLines);
-    }
-
-    @Test
-    public void testTextFileReaderProblemPaths() throws Exception {
-        testTextFileReaderProblemPaths(stdLines);
-    }
-
-    @Test
-    public void testTextFileReaderPrePost() throws Exception {
-        testTextFileReaderPrePost(stdLines);
-    }
-
-    public void testTextFileReader(String[] stdLines) throws Exception {
         Topology t = newTopology("testTextFileReader");
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
         String[] ucLines = Stream.of(lines)
                 .map(line -> line.toUpperCase())
                 .toArray(String[]::new);
@@ -227,10 +217,11 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
         }
     }
 
-    public void testTextFileReaderProblemPaths(String[] stdLines) throws Exception {
+    @Test
+    public void testTextFileReaderProblemPaths() throws Exception {
         Topology t = newTopology("testTextFileReaderProblemPaths");
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
         String[] ucLines = Stream.of(lines)
                 .map(line -> line.toUpperCase())
                 .toArray(String[]::new);
@@ -258,10 +249,11 @@ public class FileStreamsTest extends TopologyAbstractTest implements DirectTestS
         }
     }
 
-    public void testTextFileReaderPrePost(String[] stdLines) throws Exception {
+    @Test
+    public void testTextFileReaderPrePost() throws Exception {
         Topology t = newTopology("testTextFileReaderPrePost");
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
         String[] ucLines = Stream.of(lines)
                 .map(line -> line.toUpperCase())
                 .toArray(String[]::new);

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
index 8a372fb..86213c6 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterGlobalTest.java
@@ -18,99 +18,25 @@ under the License.
 */
 package quarks.test.connectors.file;
 
-import org.junit.Test;
-
 /**
  * FileStreamsTextFileWriter connector globalization tests.
  */
 public class FileStreamsTextFileWriterGlobalTest extends FileStreamsTextFileWriterTest {
 
-    String globalStr = "\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d";
-    String[] globalLines = new String[] {
+    private static final String globalStr = "\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d";
+    private static final String[] globalLines = new String[] {
             "1-"+globalStr,
             "2-"+globalStr,
             "3-"+globalStr,
             "4-"+globalStr
     };
 
-    @Test
-    public void testGlobalOneFileCreated() throws Exception {
-        super.testOneFileCreated(globalLines);
-    }
-
-    @Test
-    public void testGlobalManyFiles() throws Exception {
-        super.testManyFiles(globalLines);
-    }
-
-    @Test
-    public void testGlobalManyFilesSlow() throws Exception {
-        super.testManyFilesSlow(globalLines);
-    }
-
-    @Test
-    public void testGlobalRetainCntBased() throws Exception {
-        super.testRetainCntBased(globalLines);
-    }
-
-    @Test
-    public void testGlobalRetainAggSizeBased() throws Exception {
-        super.testRetainAggSizeBased(globalLines, globalStr);
-    }
-
-    @Test
-    public void testGlobalRetainAgeBased() throws Exception {
-        super.testRetainAgeBased(globalLines);
-    }
-
-    @Test
-    public void testGlobalFlushImplicit() throws Exception {
-        super.testFlushImplicit(globalLines);
-    }
-
-    @Test
-    public void testGlobalFlushCntBased() throws Exception {
-        super.testFlushCntBased(globalLines);
-    }
-
-    @Test
-    public void testGlobalFlushTimeBased() throws Exception {
-        super.testFlushTimeBased(globalLines);
-    }
-
-    @Test
-    public void testGlobalFlushTupleBased() throws Exception {
-        super.testFlushTupleBased(globalLines);
-    }
-
-    @Test
-    public void testGlobalCycleCntBased() throws Exception {
-        super.testCycleCntBased(globalLines);
-    }
-
-    @Test
-    public void testGlobalCycleSizeBased() throws Exception {
-        super.testCycleSizeBased(globalLines);
-    }
-
-    @Test
-    public void testGlobalCycleTimeBased() throws Exception {
-        super.testCycleTimeBased(globalLines);
-    }
-
-    @Test
-    public void testGlobalCycleTupleBased() throws Exception {
-        super.testCycleTupleBased(globalLines);
-    }
-
-    @Test
-    public void testGlobalAllTimeBased() throws Exception {
-        super.testAllTimeBased(globalLines);
+    public String getStr() {
+        return globalStr;
     }
 
-    @Test
-    public void testGlobalWriterWatcherReader() throws Exception {
-        super.testWriterWatcherReader(globalLines);
+    public String[] getLines() {
+        return globalLines;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
----------------------------------------------------------------------
diff --git a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
index 74139b2..d9b25c2 100644
--- a/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
+++ b/connectors/file/src/test/java/quarks/test/connectors/file/FileStreamsTextFileWriterTest.java
@@ -72,6 +72,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
     
     private int TMO_SEC = 2;
 
+    public String getStr() {
+        return str;
+    }
+
+    public String[] getLines() {
+        return stdLines;
+    }
+
     @Test
     public void testFlushConfig() throws Exception {
         FileWriterFlushConfig<String> cfg;
@@ -271,92 +279,13 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
 
     @Test
     public void testOneFileCreated() throws Exception {
-        testOneFileCreated(stdLines);
-    }
-
-    @Test
-    public void testManyFiles() throws Exception {
-        testManyFiles(stdLines);
-    }
-
-    @Test
-    public void testManyFilesSlow() throws Exception {
-        testManyFilesSlow(stdLines);
-    }
-
-    @Test
-    public void testRetainCntBased() throws Exception {
-        testRetainCntBased(stdLines);
-    }
-
-    @Test
-    public void testRetainAggSizeBased() throws Exception {
-        testRetainAggSizeBased(stdLines, str);
-    }
-
-    @Test
-    public void testRetainAgeBased() throws Exception {
-        testRetainAgeBased(stdLines);
-    }
-
-    @Test
-    public void testFlushImplicit() throws Exception {
-        testFlushImplicit(stdLines);
-    }
-
-    @Test
-    public void testFlushCntBased() throws Exception {
-        testFlushCntBased(stdLines);
-    }
-
-    @Test
-    public void testFlushTimeBased() throws Exception {
-        testFlushTimeBased(stdLines);
-    }
-
-    @Test
-    public void testFlushTupleBased() throws Exception {
-        testFlushTupleBased(stdLines);
-    }
-
-    @Test
-    public void testCycleCntBased() throws Exception {
-        testCycleCntBased(stdLines);
-    }
-
-    @Test
-    public void testCycleSizeBased() throws Exception {
-        testCycleSizeBased(stdLines);
-    }
-
-    @Test
-    public void testCycleTimeBased() throws Exception {
-        testCycleTimeBased(stdLines);
-    }
-
-    @Test
-    public void testCycleTupleBased() throws Exception {
-        testCycleTupleBased(stdLines);
-    }
-
-    @Test
-    public void testAllTimeBased() throws Exception {
-        testAllTimeBased(stdLines);
-    }
-
-    @Test
-    public void testWriterWatcherReader() throws Exception {
-        testWriterWatcherReader(stdLines);
-    }
-
-    public void testOneFileCreated(String[] stdLines) throws Exception {
         // all lines into a single (the first) file
         Topology t = newTopology("testOneFileCreated");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
 
         // build expected results
         // net all in one, the first, file
@@ -373,13 +302,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         assertNotNull(sink);
     }
 
-    public void testManyFiles(String[] stdLines) throws Exception {
+    @Test
+    public void testManyFiles() throws Exception {
         Topology t = newTopology("testManyFiles");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
 
         // build expected results
         // net one tuples per file
@@ -400,13 +330,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
     }
 
-    public void testManyFilesSlow(String[] stdLines) throws Exception {
+    @Test
+    public void testManyFilesSlow() throws Exception {
         Topology t = newTopology("testManyFilesSlow");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
 
         // build expected results
         // net one tuples per file
@@ -429,14 +360,15 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
                 basePath, expResults);
     }
 
-    public void testRetainCntBased(String[] stdLines) throws Exception {
+    @Test
+    public void testRetainCntBased() throws Exception {
         // more lines than configured retained numFiles; only keep the last numFiles
         Topology t = newTopology("testRetainCntBased");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
         
         // build expected results
         // net one tuples per file
@@ -458,20 +390,21 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
     }
 
-    public void testRetainAggSizeBased(String[] stdLines, String str) throws Exception {
+    @Test
+    public void testRetainAggSizeBased() throws Exception {
         // more aggsize than configured; only keep aggsize worth
         Topology t = newTopology("testRetainAggSizeBased");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
         
         // build expected results
         // net one tuple per file
         List<List<String>> expResults = buildExpResults(lines, tuple -> true);
         // agg size only enough for last two lines
-        long aggregateFileSize = 2 * (("1-"+str).length() + 1/*eol*/);
+        long aggregateFileSize = 2 * (("1-"+getStr()).length() + 1/*eol*/);
         expResults.remove(0);
         expResults.remove(0);
         assertEquals(2, expResults.size());
@@ -488,13 +421,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
     }
 
-    public void testRetainAgeBased(String[] stdLines) throws Exception {
+    @Test
+    public void testRetainAgeBased() throws Exception {
         Topology t = newTopology("testRetainAgeBased");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
         
         // build expected results
         int keepCnt = 2;  // only keep the last n files with throttling, age,
@@ -534,13 +468,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
                 basePath, expResults);
     }
 
-    public void testFlushImplicit(String[] stdLines) throws Exception {
+    @Test
+    public void testFlushImplicit() throws Exception {
         Topology t = newTopology("testFlushImplicit");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
 
         // build expected results
         // net all in one, the first, file
@@ -558,13 +493,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
     }
 
-    public void testFlushCntBased(String[] stdLines) throws Exception {
+    @Test
+    public void testFlushCntBased() throws Exception {
         Topology t = newTopology("testFlushCntBased");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
 
         // build expected results
         // net all in one, the first, file
@@ -582,13 +518,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
     }
 
-    public void testFlushTimeBased(String[] stdLines) throws Exception {
+    @Test
+    public void testFlushTimeBased() throws Exception {
         Topology t = newTopology("testFlushTimeBased");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
 
         // build expected results
         // net all in one, the first, file
@@ -611,13 +548,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
                 basePath, expResults);
     }
 
-    public void testFlushTupleBased(String[] stdLines) throws Exception {
+    @Test
+    public void testFlushTupleBased() throws Exception {
         Topology t = newTopology("testFlushTupleBased");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
 
         // build expected results
         // net all in one, the first, file
@@ -636,13 +574,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
     }
 
-    public void testCycleCntBased(String[] stdLines) throws Exception {
+    @Test
+    public void testCycleCntBased() throws Exception {
         Topology t = newTopology("testCycleCntBased");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
         
         // build expected results
         // net two tuples per file
@@ -664,13 +603,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
     }
 
-    public void testCycleSizeBased(String[] stdLines) throws Exception {
+    @Test
+    public void testCycleSizeBased() throws Exception {
         Topology t = newTopology("testCycleSizeBased");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
         
         // build expected results
         // net one tuple per file 
@@ -689,13 +629,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
     }
 
-    public void testCycleTimeBased(String[] stdLines) throws Exception {
+    @Test
+    public void testCycleTimeBased() throws Exception {
         Topology t = newTopology("testCycleTimeBased");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
         
         // build expected results
         // net one tuple per file with 250msec cycle config and 1 throttle
@@ -720,13 +661,14 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
                 basePath, expResults);
     }
 
-    public void testCycleTupleBased(String[] stdLines) throws Exception {
+    @Test
+    public void testCycleTupleBased() throws Exception {
         Topology t = newTopology("testCycleTupleBased");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
 
         // build expected results
         // a tuple based config / predicate.  in this case should end up with 3 files.
@@ -746,14 +688,15 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
     }
 
-    public void testAllTimeBased(String[] stdLines) throws Exception {
+    @Test
+    public void testAllTimeBased() throws Exception {
         // exercise case with multiple timer based policies
         Topology t = newTopology("testAllTimeBased");
         
         // establish a base path
         Path basePath = createTempFile("test1", "txt", new String[0]);
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
         
         // build expected results
         // keep all given age and TMO_SEC
@@ -774,7 +717,8 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         completeAndValidateWriter(t, TMO_SEC, basePath, expResults);
     }
 
-    public void testWriterWatcherReader(String[] stdLines) throws Exception {
+    @Test
+    public void testWriterWatcherReader() throws Exception {
         // verify all the pieces work together
         Topology t = newTopology("testWriterWatcherReader");
         
@@ -782,7 +726,7 @@ public class FileStreamsTextFileWriterTest extends TopologyAbstractTest implemen
         Path dir = Files.createTempDirectory(testDirPrefix);
         Path basePath = dir.resolve("writerCreated");
         
-        String[] lines = stdLines;
+        String[] lines = getLines();
 
         System.out.println("########## "+t.getName());
         

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/http/src/test/java/quarks/test/connectors/http/HttpGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/quarks/test/connectors/http/HttpGlobalTest.java b/connectors/http/src/test/java/quarks/test/connectors/http/HttpGlobalTest.java
index 7689061..a413f1a 100644
--- a/connectors/http/src/test/java/quarks/test/connectors/http/HttpGlobalTest.java
+++ b/connectors/http/src/test/java/quarks/test/connectors/http/HttpGlobalTest.java
@@ -18,25 +18,22 @@ under the License.
 */
 package quarks.test.connectors.http;
 
-import org.junit.Test;
-
-import com.google.gson.JsonObject;
-
 /**
  * This globalization test goes against http://httpbin.org
  * a freely available web-server for testing requests.
  *
  */
-public class HttpGlobalTest {
+public class HttpGlobalTest extends HttpTest {
 
-    @Test
-    public void testGlobalJsonGet() throws Exception {
-        JsonObject request = new JsonObject();
-        request.addProperty("a", "\u5b57\u6bcd");
-        request.addProperty("b", "\u56db\u5341\u4e8c");
+    private static final String globalProp1 = "\u5b57\u6bcd";
+    private static final String globalProp2 = "\u56db\u5341\u4e8c";
+
+    public String getProp1() {
+        return globalProp1;
+    }
 
-        HttpTest ht = new HttpTest();
-        ht.testJsonGet(request);
+    public String getProp2() {
+        return globalProp2;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/http/src/test/java/quarks/test/connectors/http/HttpTest.java
----------------------------------------------------------------------
diff --git a/connectors/http/src/test/java/quarks/test/connectors/http/HttpTest.java b/connectors/http/src/test/java/quarks/test/connectors/http/HttpTest.java
index 8abcf1f..d75b077 100644
--- a/connectors/http/src/test/java/quarks/test/connectors/http/HttpTest.java
+++ b/connectors/http/src/test/java/quarks/test/connectors/http/HttpTest.java
@@ -46,6 +46,17 @@ import quarks.topology.tester.Tester;
  */
 public class HttpTest {
 
+    private static final String prop1 = "abc";
+    private static final String prop2 = "42";
+
+    public String getProp1() {
+        return prop1;
+    }
+
+    public String getProp2() {
+        return prop2;
+    }
+
     @Test
     public void testGet() throws Exception {
         
@@ -129,8 +140,8 @@ public class HttpTest {
     @Test
     public void testJsonGet() throws Exception {
         JsonObject request = new JsonObject();
-        request.addProperty("a", "abc");
-        request.addProperty("b", "42");
+        request.addProperty("a", getProp1());
+        request.addProperty("b", getProp2());
 
         testJsonGet(request);
     }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsGlobalTest.java b/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsGlobalTest.java
index bf0c8c1..23d9b3b 100644
--- a/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsGlobalTest.java
+++ b/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsGlobalTest.java
@@ -20,7 +20,6 @@ package quarks.test.connectors.jdbc;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.junit.Test;
 
 /**
  * JdbcStreams connector globalization tests.
@@ -51,44 +50,12 @@ public class JdbcStreamsGlobalTest extends JdbcStreamsTest {
         }
     }
 
-    @Test
-    public void testGlobalBasicRead() throws Exception {
-        super.testBasicRead(globalPersonList, globalPersonIdList);
+    public List<Person> getPersonList() {
+        return globalPersonList;
     }
 
-    @Test
-    public void testGlobalBasicRead2() throws Exception {
-        super.testBasicRead2(globalPersonList, globalPersonIdList);
-    }
-
-    @Test
-    public void testGlobalBasicWrite() throws Exception {
-        super.testBasicWrite(globalPersonList, globalPersonIdList);
-    }
-
-    @Test
-    public void testGlobalBasicWrite2() throws Exception {
-        super.testBasicWrite2(globalPersonList, globalPersonIdList);
-    }
-
-    @Test
-    public void testGlobalBadConnectFn() throws Exception {
-        super.testBadConnectFn(globalPersonList, globalPersonIdList);
-    }
-
-    @Test
-    public void testGlobalBadSQL() throws Exception {
-        super.testBadSQL(globalPersonList, globalPersonIdList);
-    }
-
-    @Test
-    public void testGlobalBadSetParams() throws Exception {
-        super.testBadSetParams(globalPersonList, globalPersonIdList);
-    }
-
-    @Test
-    public void testGlobalBadResultHandler() throws Exception {
-        super.testBadResultHandler(globalPersonList, globalPersonIdList);
+    public List<PersonId> getPersonIdList() {
+        return globalPersonIdList;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsTest.java
----------------------------------------------------------------------
diff --git a/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsTest.java b/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsTest.java
index 0f0871a..d0294ee 100644
--- a/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsTest.java
+++ b/connectors/jdbc/src/test/java/quarks/test/connectors/jdbc/JdbcStreamsTest.java
@@ -107,6 +107,14 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         }
     }
 
+    public List<Person> getPersonList() {
+        return personList;
+    }
+
+    public List<PersonId> getPersonIdList() {
+        return personIdList;
+    }
+
     DataSource getDerbyEmbeddedDataSource(String database) throws Exception
     {
         // Avoid a compile-time dependency to the jdbc driver.
@@ -236,56 +244,17 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
 
     @Test
     public void testBasicRead() throws Exception {
-        testBasicRead(personList, personIdList);
-    }
-
-    @Test
-    public void testBasicRead2() throws Exception {
-        testBasicRead2(personList, personIdList);
-    }
-
-    @Test
-    public void testBasicWrite() throws Exception {
-        testBasicWrite(personList, personIdList);
-    }
-
-    @Test
-    public void testBasicWrite2() throws Exception {
-        testBasicWrite2(personList, personIdList);
-    }
-
-    @Test
-    public void testBadConnectFn() throws Exception {
-        testBadConnectFn(personList, personIdList);
-    }
-
-    @Test
-    public void testBadSQL() throws Exception {
-        testBadSQL(personList, personIdList);
-    }
-
-    @Test
-    public void testBadSetParams() throws Exception {
-        testBadSetParams(personList, personIdList);
-    }
-
-    @Test
-    public void testBadResultHandler() throws Exception {
-        testBadResultHandler(personList, personIdList);
-    }
-
-    public void testBasicRead(List<Person> personList, List<PersonId> personIdList) throws Exception {
         Topology t = this.newTopology("testBasicRead");
         
-        populatePersonsTable(personList);
-        List<String> expected = expectedPersons(person->true, personList);
+        populatePersonsTable(getPersonList());
+        List<String> expected = expectedPersons(person->true, getPersonList());
 
         JdbcStreams db = new JdbcStreams(t,
                 () -> getDataSource(DB_NAME),
                 dataSource -> connect(dataSource));
 
         // Create a stream of Person from a stream of ids
-        TStream<Person> rcvdPerson = readPersonsTable(t, db, personIdList, 0/*msec*/);
+        TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 0/*msec*/);
         TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
         
         rcvd.sink(tuple -> System.out.println(
@@ -293,13 +262,14 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
     }
 
-    public void testBasicRead2(List<Person> personList, List<PersonId> personIdList) throws Exception {
+    @Test
+    public void testBasicRead2() throws Exception {
         Topology t = newTopology("testBasicRead2");
         // same as testBasic but use the explicit PreparedStatement forms
         // of executeStatement().
         
-        populatePersonsTable(personList);
-        List<String> expected = expectedPersons(person->true, personList);
+        populatePersonsTable(getPersonList());
+        List<String> expected = expectedPersons(person->true, getPersonList());
 
         JdbcStreams db = new JdbcStreams(t,
                 () -> getDataSource(DB_NAME),
@@ -308,7 +278,7 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         // Create a stream of Person from a stream of ids
         // Delay so this runs after populating the db above
         TStream<PersonId> personIds =  PlumbingStreams.blockingOneShotDelay(
-                t.collection(personIdList), 3, TimeUnit.SECONDS);
+                t.collection(getPersonIdList()), 3, TimeUnit.SECONDS);
         TStream<Person> rcvdPerson = db.executeStatement(personIds,
                 (cn) -> cn.prepareStatement("SELECT id, firstname, lastname, gender, age"
                         + " FROM persons WHERE id = ?"),
@@ -330,18 +300,19 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
     }
     
-    public void testBasicWrite(List<Person> personList, List<PersonId> personIdList) throws Exception {
+    @Test
+    public void testBasicWrite() throws Exception {
         Topology t = newTopology("testBasicWrite");
         
         truncatePersonsTable();
-        List<String> expected = expectedPersons(person->true, personList);
+        List<String> expected = expectedPersons(person->true, getPersonList());
 
         JdbcStreams db = new JdbcStreams(t,
                 () -> getDataSource(DB_NAME),
                 dataSource -> connect(dataSource));
         
         // Add stream of Person to the db
-        TStream<Person> s = t.collection(personList);
+        TStream<Person> s = t.collection(getPersonList());
         TSink<Person> sink = db.executeStatement(s,
                 () -> "INSERT INTO persons VALUES(?,?,?,?,?)",
                 (tuple,stmt) -> {
@@ -355,7 +326,7 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         assertNotNull(sink);
         
         // Use the same code as testBasicRead to verify the write worked.
-        TStream<Person> rcvdPerson = readPersonsTable(t, db, personIdList, 3000/*msec*/);
+        TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 3000/*msec*/);
         TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
         
         rcvd.sink(tuple -> System.out.println(
@@ -363,20 +334,21 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
     }
     
-    public void testBasicWrite2(List<Person> personList, List<PersonId> personIdList) throws Exception {
+    @Test
+    public void testBasicWrite2() throws Exception {
         Topology t = newTopology("testBasicWrite2");
         // same as testBasic but use the explicit PreparedStatement forms
         // of executeStatement().
         
         truncatePersonsTable();
-        List<String> expected = expectedPersons(person->true, personList);
+        List<String> expected = expectedPersons(person->true, getPersonList());
 
         JdbcStreams db = new JdbcStreams(t,
                 () -> getDataSource(DB_NAME),
                 dataSource -> connect(dataSource));
         
         // Add stream of Person to the db
-        TStream<Person> s = t.collection(personList);
+        TStream<Person> s = t.collection(getPersonList());
         TSink<Person> sink = db.executeStatement(s,
                 (cn) -> cn.prepareStatement("INSERT into PERSONS values(?,?,?,?,?)"),
                 (tuple,stmt) -> {
@@ -391,7 +363,7 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         assertNotNull(sink);
         
         // Use the same code as testBasicRead to verify the write worked.
-        TStream<Person> rcvdPerson = readPersonsTable(t, db, personIdList, 3000/*msec*/);
+        TStream<Person> rcvdPerson = readPersonsTable(t, db, getPersonIdList(), 3000/*msec*/);
         TStream<String> rcvd = rcvdPerson.map(person -> person.toString());
         
         rcvd.sink(tuple -> System.out.println(
@@ -399,7 +371,8 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
     }
     
-    public void testBadConnectFn(List<Person> personList, List<PersonId> personIdList) throws Exception {
+    @Test
+    public void testBadConnectFn() throws Exception {
         Topology t = newTopology("testBadConnectFn");
         // connFn is only called for initial connect or reconnect
         // following certain failures.
@@ -411,9 +384,9 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         // right before it?), so that we can verify the conn is closed and
         // then reconnected
         
-        populatePersonsTable(personList);
-        List<String> expected = expectedPersons(p->true, personList.subList(1, personList.size()));
-        int expectedExcCnt = personList.size() - expected.size();
+        populatePersonsTable(getPersonList());
+        List<String> expected = expectedPersons(p->true, getPersonList().subList(1, getPersonList().size()));
+        int expectedExcCnt = getPersonList().size() - expected.size();
 
         AtomicInteger connFnCnt = new AtomicInteger();
         JdbcStreams db = new JdbcStreams(t,
@@ -427,7 +400,7 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
 
         // Create a stream of Person from a stream of ids
         AtomicInteger executionExcCnt = new AtomicInteger();
-        TStream<PersonId> personIds = t.collection(personIdList);
+        TStream<PersonId> personIds = t.collection(getPersonIdList());
         TStream<Person> rcvdPerson = db.executeStatement(personIds,
                 () -> "SELECT id, firstname, lastname, gender, age"
                         + " FROM persons WHERE id = ?",
@@ -454,17 +427,18 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         completeAndValidate("", t, rcvd, SEC_TIMEOUT, expected.toArray(new String[0]));
         assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
     }
-        
-    public void testBadSQL(List<Person> personList, List<PersonId> personIdList) throws Exception {
+    
+    @Test
+    public void testBadSQL() throws Exception {
         Topology t = newTopology("testBadSQL");
         // the statement is nominally "retrieved" only once, not per-tuple.
         // hence, there's not much sense in trying to simulate it
         // getting called unsuccessfully, then successfully, etc.
         // however, verify the result handler gets called appropriately.
         
-        populatePersonsTable(personList);
+        populatePersonsTable(getPersonList());
         List<String> expected = Collections.emptyList();
-        int expectedExcCnt = personList.size() - expected.size();
+        int expectedExcCnt = getPersonList().size() - expected.size();
 
         JdbcStreams db = new JdbcStreams(t,
                 () -> getDataSource(DB_NAME),
@@ -472,7 +446,7 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
 
         // Create a stream of Person from a stream of ids
         AtomicInteger executionExcCnt = new AtomicInteger();
-        TStream<PersonId> personIds = t.collection(personIdList);
+        TStream<PersonId> personIds = t.collection(getPersonIdList());
         TStream<Person> rcvdPerson = db.executeStatement(personIds,
                 () -> "SELECT id, firstname, lastname, gender, age"
                         + " FROM persons WHERE BOGUS_XYZZY id = ?",
@@ -500,13 +474,14 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
     }
     
-    public void testBadSetParams(List<Person> personList, List<PersonId> personIdList) throws Exception {
+    @Test
+    public void testBadSetParams() throws Exception {
         Topology t = newTopology("testBadSetParams");
         // exercise and validate  behavior with transient parameter setter failures
         
-        populatePersonsTable(personList);
-        List<String> expected = expectedPersons(newOddIdPredicate(), personList);
-        int expectedExcCnt = personList.size() - expected.size();
+        populatePersonsTable(getPersonList());
+        List<String> expected = expectedPersons(newOddIdPredicate(), getPersonList());
+        int expectedExcCnt = getPersonList().size() - expected.size();
 
         JdbcStreams db = new JdbcStreams(t,
                 () -> getDataSource(DB_NAME),
@@ -514,7 +489,7 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
 
         // Create a stream of Person from a stream of ids
         AtomicInteger executionExcCnt = new AtomicInteger();
-        TStream<PersonId> personIds = t.collection(personIdList);
+        TStream<PersonId> personIds = t.collection(getPersonIdList());
         TStream<Person> rcvdPerson = db.executeStatement(personIds,
                 () -> "SELECT id, firstname, lastname, gender, age"
                         + " FROM persons WHERE id = ?",
@@ -545,13 +520,14 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
         assertEquals("executionExcCnt", expectedExcCnt, executionExcCnt.get());
     }
     
-    public void testBadResultHandler(List<Person> personList, List<PersonId> personIdList) throws Exception {
+    @Test
+    public void testBadResultHandler() throws Exception {
         Topology t = newTopology("testBadResultHandler");
         // exercise and validate behavior with transient result handler failures
         
-        populatePersonsTable(personList);
-        List<String> expected = expectedPersons(newOddIdPredicate(), personList);
-        int expectedExcCnt = personList.size() - expected.size();
+        populatePersonsTable(getPersonList());
+        List<String> expected = expectedPersons(newOddIdPredicate(), getPersonList());
+        int expectedExcCnt = getPersonList().size() - expected.size();
 
         JdbcStreams db = new JdbcStreams(t,
                 () -> getDataSource(DB_NAME),
@@ -559,7 +535,7 @@ public class JdbcStreamsTest  extends ConnectorTestBase {
 
         // Create a stream of Person from a stream of ids
         AtomicInteger executionExcCnt = new AtomicInteger();
-        TStream<PersonId> personIds = t.collection(personIdList);
+        TStream<PersonId> personIds = t.collection(getPersonIdList());
         TStream<Person> rcvdPerson = db.executeStatement(personIds,
                 () -> "SELECT id, firstname, lastname, gender, age"
                         + " FROM persons WHERE id = ?",

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsGlobalTestManual.java b/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
index 33810b0..9ec13f6 100644
--- a/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
+++ b/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsGlobalTestManual.java
@@ -18,38 +18,19 @@ under the License.
 */
 package quarks.test.connectors.kafka;
 
-import org.junit.Test;
-
 /**
  * KafkaStreams connector globalization tests.
  */
 public class KafkaStreamsGlobalTestManual extends KafkaStreamsTestManual {
-    private final String globalMsg1 = "\u4f60\u597d";
-    private final String globalMsg2 = "\u4f60\u5728\u55ce";
-
-    @Test
-    public void testGlobalSimple() throws Exception {
-        super.testSimple(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalWithKey() throws Exception {
-        super.testWithKey(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalPubSubBytes() throws Exception {
-        super.testPubSubBytes(globalMsg1, globalMsg2);
-    }
+    private static final String globalMsg1 = "\u4f60\u597d";
+    private static final String globalMsg2 = "\u4f60\u5728\u55ce";
 
-    @Test
-    public void testGlobalMultiPub() throws Exception {
-        super.testMultiPub(globalMsg1, globalMsg2);
+    public String getMsg1() {
+        return globalMsg1;
     }
 
-    @Test(expected=IllegalStateException.class)
-    public void testGlobalMultiSubNeg() throws Exception {
-        super.testMultiSubNeg(globalMsg1, globalMsg2);
+    public String getMsg2() {
+        return globalMsg2;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsTestManual.java b/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsTestManual.java
index 76d13ad..79b7ab2 100644
--- a/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsTestManual.java
+++ b/connectors/kafka/src/test/java/quarks/test/connectors/kafka/KafkaStreamsTestManual.java
@@ -46,6 +46,14 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
     private final String msg1 = "Hello";
     private final String msg2 = "Are you there?";
     
+    public String getMsg1() {
+        return msg1;
+    }
+
+    public String getMsg2() {
+        return msg2;
+    }
+
     private String[] getKafkaTopics() {
         String csvTopics = System.getProperty("quarks.test.connectors.kafka.csvTopics", "testTopic1,testTopic2");
         String[] topics = csvTopics.split(",");
@@ -98,36 +106,12 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
     }
 
     @Test
-    public void testGlobalSimple() throws Exception {
-        testSimple(msg1, msg2);
-    }
-
-    @Test
-    public void testGlobalWithKey() throws Exception {
-        testWithKey(msg1, msg2);
-    }
-
-    @Test
-    public void testGlobalPubSubBytes() throws Exception {
-        testPubSubBytes(msg1, msg2);
-    }
-
-    @Test
-    public void testGlobalMultiPub() throws Exception {
-        testMultiPub(msg1, msg2);
-    }
-
-    @Test(expected=IllegalStateException.class)
-    public void testGlobalMultiSubNeg() throws Exception {
-        testMultiSubNeg(msg1, msg2);
-    }
-
-    public void testSimple(String msg1, String msg2) throws Exception {
+    public void testSimple() throws Exception {
         Topology t = newTopology("testSimple");
         MsgGenerator mgen = new MsgGenerator(t.getName());
         String topic = getKafkaTopics()[0];
         String groupId = newGroupId(t.getName());
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         
         TStream<String> s = PlumbingStreams.blockingOneShotDelay(
                         t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -149,12 +133,13 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
         assertNotNull(sink);
     }
 
-    public void testWithKey(String msg1, String msg2) throws Exception {
+    @Test
+    public void testWithKey() throws Exception {
         Topology t = newTopology("testWithKey");
         MsgGenerator mgen = new MsgGenerator(t.getName());
         String topic = getKafkaTopics()[0];
         String groupId = newGroupId(t.getName());
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         List<Rec> recs = new ArrayList<>();
         int i = 0;
         for (String msg : msgs) {
@@ -193,12 +178,13 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
         completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
     }
 
-    public void testPubSubBytes(String msg1, String msg2) throws Exception {
+    @Test
+    public void testPubSubBytes() throws Exception {
         Topology t = newTopology("testPubSubBytes");
         MsgGenerator mgen = new MsgGenerator(t.getName());
         String topic = getKafkaTopics()[0];
         String groupId = newGroupId(t.getName());
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         List<Rec> recs = new ArrayList<>();
         int i = 0;
         for (String msg : msgs) {
@@ -236,14 +222,15 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
         completeAndValidate("", t, rcvd, mgen, SEC_TIMEOUT, expected.toArray(new String[0]));
     }
 
-    public void testMultiPub(String msg1, String msg2) throws Exception {
+    @Test
+    public void testMultiPub() throws Exception {
         Topology t = newTopology("testMultiPub");
         MsgGenerator mgen = new MsgGenerator(t.getName());
         String topic1 = getKafkaTopics()[0];
         String topic2 = getKafkaTopics()[1];
         String groupId = newGroupId(t.getName());
-        List<String> msgs1 = createMsgs(mgen, topic1, msg1, msg2);
-        List<String> msgs2 = createMsgs(mgen, topic2, msg1, msg2);
+        List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+        List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
         List<String> msgs = new ArrayList<>(msgs1);
         msgs.addAll(msgs2);
         
@@ -276,14 +263,15 @@ public class KafkaStreamsTestManual extends ConnectorTestBase {
         assertNotSame(sink1, sink2);
     }
 
-    public void testMultiSubNeg(String msg1, String msg2) throws Exception {
+    @Test(expected=IllegalStateException.class)
+    public void testMultiSubNeg() throws Exception {
         Topology t = newTopology("testMultiSubNeg");
         MsgGenerator mgen = new MsgGenerator(t.getName());
         String topic1 = getKafkaTopics()[0];
         String topic2 = getKafkaTopics()[1];
         String groupId = newGroupId(t.getName());
-        List<String> msgs1 = createMsgs(mgen, topic1, msg1, msg2);
-        List<String> msgs2 = createMsgs(mgen, topic2, msg1, msg2);
+        List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+        List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
         
         // Multiple subscribe() on a single connection.
         // Currently, w/Kafka0.8.2.2, we only support a single

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsGlobalTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsGlobalTestManual.java b/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsGlobalTestManual.java
index 49c3de9..995e0c4 100644
--- a/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsGlobalTestManual.java
+++ b/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsGlobalTestManual.java
@@ -18,8 +18,6 @@ under the License.
 */
 package quarks.test.connectors.mqtt;
 
-import org.junit.Test;
-
 /**
  * MqttStreams globalization tests
  * <p>
@@ -35,113 +33,12 @@ public class MqttStreamsGlobalTestManual extends MqttStreamsTestManual {
     private final String globalMsg1 = "\u4f60\u597d";
     private final String globalMsg2 = "\u4f60\u5728\u55ce";
 
-    @Test
-    public void testGlobalStringPublish() throws Exception {
-        super.testStringPublish(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalAutoClientId() throws Exception {
-        super.testAutoClientId(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalQoS1() throws Exception {
-        super.testQoS1(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalQoS2() throws Exception {
-        super.testQoS2(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalGenericPublish() throws Exception {
-        super.testGenericPublish(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalMultiConnector() throws Exception {
-        super.testMultiConnector(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalMultiTopicPublish() throws Exception {
-        super.testMultiTopicPublish(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalMultiTopicSubscribe() throws Exception {
-        super.testMultiTopicSubscribe(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalConnectFail() throws Exception {
-        super.testConnectFail(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalRetainedFalse() throws Exception {
-        super.testRetainedFalse(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalRetainedTrue() throws Exception {
-        super.testRetainedTrue(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalMultipleServerURL() throws Exception {
-        super.testMultipleServerURL(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalActionTime() throws Exception {
-        super.testActionTime(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalIdleSubscribe() throws Exception {
-        super.testIdleSubscribe(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalIdlePublish() throws Exception {
-        super.testIdlePublish(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalConnectRetryPub() throws Exception {
-        super.testConnectRetryPub(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalConnectRetrySub() throws Exception {
-        super.testConnectRetrySub(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalSubscribeFnThrow() throws Exception {
-        super.testSubscribeFnThrow(globalMsg1, globalMsg2);
-    }
-
-    @Test
-    public void testGlobalPublishFnThrow() throws Exception {
-        super.testPublishFnThrow(globalMsg1, globalMsg2);
-    }
-
-    /*
-     * See mqtt/src/test/keystores/README for info about SSL/TLS and mosquitto
-     */
-
-    @Test
-    public void testGlobalSsl() throws Exception {
-        super.testSsl(globalMsg1, globalMsg2);
+    public String getMsg1() {
+        return globalMsg1;
     }
 
-    @Test
-    public void testGlobalSslClientAuth() throws Exception {
-        super.testSslClientAuth(globalMsg1, globalMsg2);
+    public String getMsg2() {
+        return globalMsg2;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsTestManual.java
----------------------------------------------------------------------
diff --git a/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsTestManual.java b/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsTestManual.java
index be5ec4a..b49df9c 100644
--- a/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsTestManual.java
+++ b/connectors/mqtt/src/test/java/quarks/test/connectors/mqtt/MqttStreamsTestManual.java
@@ -83,6 +83,14 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
     private final String msg1 = "Hello";
     private final String msg2 = "Are you there?";
     
+    public String getMsg1() {
+        return msg1;
+    }
+
+    public String getMsg2() {
+        return msg2;
+    }
+
     @Before
     public void setupAuthInfo() {
         authInfo.clear();
@@ -185,117 +193,13 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
 
     @Test
     public void testStringPublish() throws Exception {
-        testStringPublish(msg1, msg2);
-    }
-
-    @Test
-    public void testAutoClientId() throws Exception {
-        testAutoClientId(msg1, msg2);
-    }
-
-    @Test
-    public void testQoS1() throws Exception {
-        testQoS1(msg1, msg2);
-    }
-
-    @Test
-    public void testQoS2() throws Exception {
-        testQoS2(msg1, msg2);
-    }
-
-    @Test
-    public void testGenericPublish() throws Exception {
-        testGenericPublish(msg1, msg2);
-    }
-
-    @Test
-    public void testMultiConnector() throws Exception {
-        testMultiConnector(msg1, msg2);
-    }
-
-    @Test
-    public void testMultiTopicPublish() throws Exception {
-        testMultiTopicPublish(msg1, msg2);
-    }
-
-    @Test
-    public void testMultiTopicSubscribe() throws Exception {
-        testMultiTopicSubscribe(msg1, msg2);
-    }
-
-    @Test
-    public void testConnectFail() throws Exception {
-        testConnectFail(msg1, msg2);
-    }
-
-    @Test
-    public void testRetainedFalse() throws Exception {
-        testRetainedFalse(msg1, msg2);
-    }
-
-    @Test
-    public void testRetainedTrue() throws Exception {
-        testRetainedTrue(msg1, msg2);
-    }
-
-    @Test
-    public void testMultipleServerURL() throws Exception {
-        testMultipleServerURL(msg1, msg2);
-    }
-
-    @Test
-    public void testActionTime() throws Exception {
-        testActionTime(msg1, msg2);
-    }
-
-    @Test
-    public void testIdleSubscribe() throws Exception {
-        testIdleSubscribe(msg1, msg2);
-    }
-
-    @Test
-    public void testIdlePublish() throws Exception {
-        testIdlePublish(msg1, msg2);
-    }
-
-    @Test
-    public void testConnectRetryPub() throws Exception {
-        testConnectRetryPub(msg1, msg2);
-    }
-
-    @Test
-    public void testConnectRetrySub() throws Exception {
-        testConnectRetrySub(msg1, msg2);
-    }
-
-    @Test
-    public void testSubscribeFnThrow() throws Exception {
-        testSubscribeFnThrow(msg1, msg2);
-    }
-
-    @Test
-    public void testPublishFnThrow() throws Exception {
-        testPublishFnThrow(msg1, msg2);
-    }
-
-    @Test
-    public void testSsl() throws Exception {
-        testSsl(msg1, msg2);
-    }
-
-    @Test
-    public void testSslClientAuth() throws Exception {
-        testSslClientAuth(msg1, msg2);
-    }
-
-    public void testStringPublish(String msg1, String msg2) throws Exception {
         Topology top = newTopology("testStringPublish");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
         boolean retain = false;
         String clientId = newClientId(top.getName());
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         
         TStream<String> s = PlumbingStreams.blockingOneShotDelay(
                 top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -314,13 +218,14 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         assertNotNull(sink);
     }
     
-    public void testAutoClientId(String msg1, String msg2) throws Exception {
+    @Test
+    public void testAutoClientId() throws Exception {
         Topology top = newTopology("testAutoClientId");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
         boolean retain = false;
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         
         TStream<String> s = PlumbingStreams.blockingOneShotDelay(
                 top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -335,14 +240,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         completeAndValidate("some-auto-clientId", top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
     }
     
-    public void testQoS1(String msg1, String msg2) throws Exception {
+    @Test
+    public void testQoS1() throws Exception {
         Topology top = newTopology("testQoS1");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 1;
         boolean retain = false;
         String clientId = newClientId(top.getName());
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         
         TStream<String> s = PlumbingStreams.blockingOneShotDelay(
                 top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -360,15 +266,16 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
 
         completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
     }
-    
-    public void testQoS2(String msg1, String msg2) throws Exception {
+
+    @Test
+    public void testQoS2() throws Exception {
         Topology top = newTopology("testQoS2");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 2;
         boolean retain = false;
         String clientId = newClientId(top.getName());
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         
         TStream<String> s = PlumbingStreams.blockingOneShotDelay(
                 top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -389,7 +296,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
     }
     
-    public void testGenericPublish(String msg1, String msg2) throws Exception {
+    @Test
+    public void testGenericPublish() throws Exception {
         Topology top = newTopology("testGenericPublish");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
@@ -398,7 +306,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         // avoid mucking up other test subscribers expecting UTF-8 string payloads
         // and use a different topic
         String topic = getMqttTopics()[0] + "-Generic";
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         List<String> expMsgsAsStr = 
                 msgs
                 .stream()
@@ -429,7 +337,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         assertNotNull(sink);
     }
     
-    public void testMultiConnector(String msg1, String msg2) throws Exception {
+    @Test
+    public void testMultiConnector() throws Exception {
         Topology top = newTopology("testMultiConnector");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
@@ -437,7 +346,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         String pubClientId = newClientId(top.getName())+"_pub";
         String subClientId = newClientId(top.getName())+"_sub";
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         
         TStream<String> s = PlumbingStreams.blockingOneShotDelay(
                 top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -455,7 +364,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         completeAndValidate(pubClientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
     }
     
-    public void testMultiTopicPublish(String msg1, String msg2) throws Exception {
+    @Test
+    public void testMultiTopicPublish() throws Exception {
         Topology top = newTopology("testMultiTopicPublish");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
@@ -463,8 +373,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         String clientId = newClientId(top.getName());
         String topic1 = getMqttTopics()[0];
         String topic2 = getMqttTopics()[1];
-        List<String> msgs1 = createMsgs(mgen, topic1, msg1, msg2);
-        List<String> msgs2 = createMsgs(mgen, topic2, msg1, msg2);
+        List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+        List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
         // create an interleaved list
         List<Msg> msgs = new ArrayList<>();
         for (int i = 0; i < msgs1.size(); i++) {
@@ -519,7 +429,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         assertTrue("valid:" + tc, tc.valid());
     }
     
-    public void testMultiTopicSubscribe(String msg1, String msg2) throws Exception {
+    @Test
+    public void testMultiTopicSubscribe() throws Exception {
         Topology top = newTopology("testMultiTopicSubscribe");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
@@ -528,8 +439,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         String topic1 = getMqttTopics()[0] + "/1";
         String topic2 = getMqttTopics()[0] + "/2";
         String topics = getMqttTopics()[0] + "/+";
-        List<String> msgs1 = createMsgs(mgen, topic1, msg1, msg2);
-        List<String> msgs2 = createMsgs(mgen, topic2, msg1, msg2);
+        List<String> msgs1 = createMsgs(mgen, topic1, getMsg1(), getMsg2());
+        List<String> msgs2 = createMsgs(mgen, topic2, getMsg1(), getMsg2());
         List<String> msgs = new ArrayList<>();
         msgs.addAll(msgs1);
         msgs.addAll(msgs2);
@@ -567,14 +478,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         mqtt.subscribe(topic, qos); // should throw
     }
     
-    public void testConnectFail(String msg1, String msg2) throws Exception {
+    @Test
+    public void testConnectFail() throws Exception {
         Topology top = newTopology("testConnectFail");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
         boolean retain = false;
         String clientId = newClientId(top.getName());
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         
         TStream<String> s = PlumbingStreams.blockingOneShotDelay(
                 top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -621,7 +533,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         return retainedMsg;
     }
 
-    public void testRetainedFalse(String msg1, String msg2) throws Exception {
+    @Test
+    public void testRetainedFalse() throws Exception {
         Topology top = newTopology("testRetainedFalse");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
@@ -637,7 +550,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         System.out.println("=============== setup complete");
         
         // verify the next connect/subscribe [doesn't] sees the retain and then new msgs
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         List<String> expMsgsAsStr = 
                 msgs
                 .stream()
@@ -663,7 +576,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         completeAndValidate(clientId, top, rcvdAsStr, mgen, SEC_TIMEOUT, expMsgsAsStr.toArray(new String[0]));
     }
 
-    public void testRetainedTrue(String msg1, String msg2) throws Exception {
+    @Test
+    public void testRetainedTrue() throws Exception {
         Topology top = newTopology("testRetainedTrue");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
@@ -678,7 +592,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         System.out.println("=============== setup complete");
         
         // verify the next connect/subscribe [doesn't] sees the retain and then new msgs
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         List<String> expMsgsAsStr = 
                 msgs
                 .stream()
@@ -912,14 +826,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         propTester.checkAll();
     }
     
-    public void testMultipleServerURL(String msg1, String msg2) throws Exception {
+    @Test
+    public void testMultipleServerURL() throws Exception {
         Topology top = newTopology("testMultipleServerURL");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
         boolean retain = false;
         String clientId = newClientId(top.getName());
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         
         TStream<String> s = PlumbingStreams.blockingOneShotDelay(
                 top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -937,14 +852,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
     }
     
-    public void testActionTime(String msg1, String msg2) throws Exception {
+    @Test
+    public void testActionTime() throws Exception {
         Topology top = newTopology("testActionTime");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
         boolean retain = false;
         String clientId = newClientId(top.getName());
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         
         TStream<String> s = PlumbingStreams.blockingOneShotDelay(
                 top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -962,7 +878,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
     }
     
-    public void testIdleSubscribe(String msg1, String msg2) throws Exception {
+    @Test
+    public void testIdleSubscribe() throws Exception {
         Topology top = newTopology("testIdleSubscribe");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
@@ -970,7 +887,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         String pubClientId = newClientId(top.getName()+"_pub");
         String subClientId = newClientId(top.getName()+"_sub");
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
 
         // Exercise idle timeouts.  We won't have any direct
         // evidence that an idle disconnect/reconnect happen
@@ -1001,7 +918,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         completeAndValidate(subClientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
     }
     
-    public void testIdlePublish(String msg1, String msg2) throws Exception {
+    @Test
+    public void testIdlePublish() throws Exception {
         Topology top = newTopology("testIdlePublish");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
@@ -1009,7 +927,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         String pubClientId = newClientId(top.getName()+"_pub");
         String subClientId = newClientId(top.getName()+"_sub");
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
 
         // Exercise idle timeouts.  We won't have any direct
         // evidence that an idle disconnect/reconnect happen
@@ -1035,7 +953,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         completeAndValidate(subClientId, top, rcvd, mgen, SEC_TIMEOUT, msgs.toArray(new String[0]));
     }
     
-    public void testConnectRetryPub(String msg1, String msg2) throws Exception {
+    @Test
+    public void testConnectRetryPub() throws Exception {
         Topology top = newTopology("testConnectRetryPub");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
@@ -1043,7 +962,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         String pubClientId = newClientId(top.getName()+"_pub");
         String subClientId = newClientId(top.getName()+"_sub");
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
 
         // Exercise connection retry by first specifying
         // a bogus server url then a good one.
@@ -1083,7 +1002,8 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         completeAndValidate(subClientId, top, rcvd, mgen, SEC_TIMEOUT + 5, msgs.toArray(new String[0]));
     }
     
-    public void testConnectRetrySub(String msg1, String msg2) throws Exception {
+    @Test
+    public void testConnectRetrySub() throws Exception {
         // Timing variances on shared machines can cause this test to fail
         assumeTrue(!Boolean.getBoolean("quarks.build.ci"));
 
@@ -1094,7 +1014,7 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         String pubClientId = newClientId(top.getName()+"_pub");
         String subClientId = newClientId(top.getName()+"_sub");
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
 
         // Exercise connection retry by first specifying
         // a bogus server url then a good one.
@@ -1145,14 +1065,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         completeAndValidate(subClientId, top, rcvd, mgen, SEC_TIMEOUT + 5, msgs.toArray(new String[0]));
     }
     
-    public void testSubscribeFnThrow(String msg1, String msg2) throws Exception {
+    @Test
+    public void testSubscribeFnThrow() throws Exception {
         Topology top = newTopology("testSubscribeFnThrow");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
         boolean retain = false;
         String clientId = newClientId(top.getName());
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         List<String> expectMsgs = new ArrayList<>(msgs);
         expectMsgs.remove(0); // should only lose the 1st tuple
         
@@ -1186,14 +1107,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         completeAndValidate(clientId, top, rcvd, mgen, SEC_TIMEOUT, expectMsgs.toArray(new String[0]));
     }
     
-    public void testPublishFnThrow(String msg1, String msg2) throws Exception {
+    @Test
+    public void testPublishFnThrow() throws Exception {
         Topology top = newTopology("testPublishFnThrow");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
         boolean retain = false;
         String clientId = newClientId(top.getName());
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         List<String> expectMsgs = new ArrayList<>(msgs);
         expectMsgs.remove(0); // should only lose the 1st tuple
         
@@ -1230,14 +1152,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
      * See mqtt/src/test/keystores/README for info about SSL/TLS and mosquitto
      */
     
-    public void testSsl(String msg1, String msg2) throws Exception {
+    @Test
+    public void testSsl() throws Exception {
         Topology top = newTopology("testSsl");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
         boolean retain = false;
         String clientId = newClientId(top.getName());
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         
         TStream<String> s = PlumbingStreams.blockingOneShotDelay(
                 top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
@@ -1259,14 +1182,15 @@ public class MqttStreamsTestManual extends ConnectorTestBase {
         assertNotNull(sink);
     }
     
-    public void testSslClientAuth(String msg1, String msg2) throws Exception {
+    @Test
+    public void testSslClientAuth() throws Exception {
         Topology top = newTopology("testSslClientAuth");
         MsgGenerator mgen = new MsgGenerator(top.getName());
         int qos = 0;
         boolean retain = false;
         String clientId = newClientId(top.getName());
         String topic = getMqttTopics()[0];
-        List<String> msgs = createMsgs(mgen, topic, msg1, msg2);
+        List<String> msgs = createMsgs(mgen, topic, getMsg1(), getMsg2());
         
         TStream<String> s = PlumbingStreams.blockingOneShotDelay(
                 top.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubGlobalTest.java b/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubGlobalTest.java
index be96335..2ba6c00 100644
--- a/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubGlobalTest.java
+++ b/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubGlobalTest.java
@@ -18,8 +18,6 @@ under the License.
 */
 package quarks.test.connectors.pubsub;
 
-import org.junit.Test;
-
 /**
  * PubSub connector globalization tests.
  */
@@ -27,24 +25,8 @@ public class PubSubGlobalTest extends PubSubTest {
 
     private final String[] globalStrs = new String[] { "\u56db", "\u4e94", "\u516d" };
 
-    /**
-     * Test without a pub-sub service so no
-     * cross job connections will be made.
-     * @throws Exception
-     */
-    @Test
-    public void testGlobalNoService() throws Exception {
-        super.testNoService(globalStrs);
-    }
-
-    @Test(timeout=10000)
-    public void testStdProviderServiceSingleSubscriber() throws Exception {
-        super.testProviderServiceSingleSubscriber(globalStrs);
-    }
-
-    @Test(timeout=10000)
-    public void testStdProviderServiceMultipleSubscriber() throws Exception {
-        super.testProviderServiceMultipleSubscriber(globalStrs);
+    public String[] getStrs() {
+        return globalStrs;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java
----------------------------------------------------------------------
diff --git a/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java b/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java
index 05ccc9a..68a9062 100644
--- a/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java
+++ b/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java
@@ -40,19 +40,8 @@ public class PubSubTest {
 
     private final String[] strs = new String[] { "A", "B", "C" };
 
-    @Test
-    public void testNoService() throws Exception {
-        testNoService(strs);
-    }
-
-    @Test(timeout=10000)
-    public void testProviderServiceSingleSubscriber() throws Exception {
-        testProviderServiceSingleSubscriber(strs);
-    }
-
-    @Test(timeout=10000)
-    public void testProviderServiceMultipleSubscriber() throws Exception {
-        testProviderServiceMultipleSubscriber(strs);
+    public String[] getStrs() {
+        return strs;
     }
 
     /**
@@ -60,10 +49,11 @@ public class PubSubTest {
      * cross job connections will be made.
      * @throws Exception
      */
-    public void testNoService(String[] strs) throws Exception {
+    @Test
+    public void testNoService() throws Exception {
         DirectProvider dp = new DirectProvider();
 
-        TStream<String> publishedStream = createPublisher(dp, "t1", String.class, strs);
+        TStream<String> publishedStream = createPublisher(dp, "t1", String.class, getStrs());
         Tester testPub = publishedStream.topology().getTester();
         Condition<Long> tcPub = testPub.tupleCount(publishedStream, 3);
 
@@ -95,18 +85,19 @@ public class PubSubTest {
         return PublishSubscribe.subscribe(subscriber, topic, streamType);     
     }
 
-    public void testProviderServiceSingleSubscriber(String[] strs) throws Exception {
+    @Test(timeout=10000)
+    public void testProviderServiceSingleSubscriber() throws Exception {
         DirectProvider dp = new DirectProvider();
 
         dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
 
-        TStream<String> publishedStream = createPublisher(dp, "t1", String.class, strs);
+        TStream<String> publishedStream = createPublisher(dp, "t1", String.class, getStrs());
         Tester testPub = publishedStream.topology().getTester();
-        Condition<List<String>> tcPub = testPub.streamContents(publishedStream, strs);
+        Condition<List<String>> tcPub = testPub.streamContents(publishedStream, getStrs());
 
         TStream<String> subscribedStream = createSubscriber(dp, "t1", String.class);
         Tester testSub = subscribedStream.topology().getTester();
-        Condition<List<String>> tcSub = testSub.streamContents(subscribedStream, strs); // Expect all tuples
+        Condition<List<String>> tcSub = testSub.streamContents(subscribedStream, getStrs()); // Expect all tuples
 
         Job js = dp.submit(subscribedStream.topology()).get();
         // Give the subscriber a chance to setup.
@@ -125,26 +116,27 @@ public class PubSubTest {
         jp.stateChange(Action.CLOSE);
     }
     
-    public void testProviderServiceMultipleSubscriber(String[] strs) throws Exception {
+    @Test(timeout=10000)
+    public void testProviderServiceMultipleSubscriber() throws Exception {
         DirectProvider dp = new DirectProvider();
 
         dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
 
-        TStream<String> publishedStream = createPublisher(dp, "t1", String.class, strs);
+        TStream<String> publishedStream = createPublisher(dp, "t1", String.class, getStrs());
         Tester testPub = publishedStream.topology().getTester();
-        Condition<List<String>> tcPub = testPub.streamContents(publishedStream, strs);
+        Condition<List<String>> tcPub = testPub.streamContents(publishedStream, getStrs());
         
         TStream<String> subscribedStream1 = createSubscriber(dp, "t1", String.class);
         Tester testSub1 = subscribedStream1.topology().getTester();
-        Condition<List<String>> tcSub1 = testSub1.streamContents(subscribedStream1, strs);
+        Condition<List<String>> tcSub1 = testSub1.streamContents(subscribedStream1, getStrs());
         
         TStream<String> subscribedStream2 = createSubscriber(dp, "t1", String.class);
         Tester testSub2 = subscribedStream2.topology().getTester();
-        Condition<List<String>> tcSub2 = testSub2.streamContents(subscribedStream2, strs);
+        Condition<List<String>> tcSub2 = testSub2.streamContents(subscribedStream2, getStrs());
         
         TStream<String> subscribedStream3 = createSubscriber(dp, "t1", String.class);
         Tester testSub3 = subscribedStream3.topology().getTester();
-        Condition<List<String>> tcSub3 = testSub3.streamContents(subscribedStream3, strs);
+        Condition<List<String>> tcSub3 = testSub3.streamContents(subscribedStream3, getStrs());
 
         
         Job js1 = dp.submit(subscribedStream1.topology()).get();

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3b66fac4/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
----------------------------------------------------------------------
diff --git a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
index b717579..bf7f343 100644
--- a/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
+++ b/connectors/wsclient-javax.websocket/src/test/java/quarks/tests/connectors/wsclient/javax/websocket/WebSocketClientGlobalTest.java
@@ -18,8 +18,6 @@ under the License.
 */
 package quarks.tests.connectors.wsclient.javax.websocket;
 
-import org.junit.Test;
-
 /**
  * WebSocketClient connector globalization tests.
  */
@@ -29,97 +27,20 @@ public class WebSocketClientGlobalTest extends WebSocketClientTest {
     private final static String globalStr3 = "\u4e09\u4e09";
     private final static String globalStr4 = "\u56db";
 
-    @Test
-    public void testGlobalBasicStaticStuff() {
-        super.testBasicStaticStuff(globalStr1, globalStr2);
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testGlobalTooManySendersNeg() {
-        super.testTooManySendersNeg(globalStr1, globalStr2);
-    }
-
-    @Test
-    public void testGlobalJson() throws Exception {
-        super.testJson(globalStr1, globalStr2);
-    }
-
-    @Test
-    public void testGlobalString() throws Exception {
-        super.testString(globalStr1, globalStr2);
+    public String getStr1() {
+        return globalStr1;
     }
 
-    @Test
-    public void testGlobalBytes() throws Exception {
-        super.testBytes(globalStr1, globalStr2);
+    public String getStr2() {
+        return globalStr2;
     }
 
-    @Test
-    public void testGlobalReconnect() throws Exception {
-        super.testReconnect(globalStr1, globalStr2, globalStr3, globalStr4);
+    public String getStr3() {
+        return globalStr3;
     }
 
-    @Test
-    public void testGlobalReconnectBytes() throws Exception {
-        super.testReconnectBytes(globalStr1, globalStr2, globalStr3, globalStr4);
+    public String getStr4() {
+        return globalStr4;
     }
 
-    @Test
-    public void testGlobalSslClientAuthSystemProperty() throws Exception {
-        super.testSslClientAuthSystemProperty(globalStr1, globalStr2);
-    }
-
-    @Test
-    public void testGlobalSsl() throws Exception {
-        super.testSsl(globalStr1, globalStr2);
-    }
-
-     @Test
-     public void testGlobalSslReconnect() throws Exception {
-         super.testSslReconnect(globalStr1, globalStr2, globalStr3, globalStr4);
-     }
-
-    @Test
-    public void testGlobalSslNeg() throws Exception {
-        super.testSslNeg(globalStr1, globalStr2);
-    }
-
-    @Test
-    public void testGlobalSslClientAuth() throws Exception {
-        super.testSslClientAuth(globalStr1, globalStr2);
-    }
-
-    @Test
-    public void testGlobalSslClientAuthDefault() throws Exception {
-        super.testSslClientAuthDefault(globalStr1, globalStr2);
-    }
-
-    @Test
-    public void testGlobalSslClientAuthMy2ndCertNeg() throws Exception {
-        super.testSslClientAuthMy2ndCertNeg(globalStr1, globalStr2);
-    }
-
-    @Test
-    public void testGlobalSslClientAuthMy3rdCert() throws Exception {
-        super.testSslClientAuthMy3rdCert(globalStr1, globalStr2);
-    }
-
-    @Test
-    public void testGlobalSslClientAuthNeg() throws Exception {
-        super.testSslClientAuthNeg(globalStr1, globalStr2);
-    }
-
-    @Test
-    public void testGlobalPublicServer() throws Exception {
-        super.testPublicServer(globalStr1, globalStr2);
-    }
-
-    @Test
-    public void testGlobalSslPublicServer() throws Exception {
-        super.testSslPublicServer(globalStr1, globalStr2);
-    }
-
-    public void testGlobalSslPublicServerBadTrustStoreSystemPropertyNeg() throws Exception {
-        super.testSslPublicServerBadTrustStoreSystemPropertyNeg(globalStr1, globalStr2);
-    }
 }