You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/12/12 02:05:17 UTC

bahir git commit: [BAHIR-103] New module with common utilities and test classes

Repository: bahir
Updated Branches:
  refs/heads/master e0e49e23c -> 0601698c3


[BAHIR-103] New module with common utilities and test classes

Closes #73


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/0601698c
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/0601698c
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/0601698c

Branch: refs/heads/master
Commit: 0601698c3721fb3db58431683e556af28ffc0d6a
Parents: e0e49e2
Author: Lukasz Antoniak <lu...@gmail.com>
Authored: Mon Dec 3 08:52:10 2018 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Tue Dec 11 23:04:51 2018 -0300

----------------------------------------------------------------------
 common/pom.xml                                  | 78 ++++++++++++++++++++
 .../org/apache/bahir/utils/FileHelper.scala     | 46 ++++++++++++
 .../scala/org/apache/bahir/utils/Logging.scala  | 24 ++++++
 .../scala/org/apache/bahir/utils/Retry.scala    | 51 +++++++++++++
 .../apache/spark/ConditionalSparkFunSuite.scala | 45 +++++++++++
 .../streaming/LocalJavaStreamingContext.java    | 44 +++++++++++
 distribution/pom.xml                            | 22 +++++-
 pom.xml                                         | 10 +--
 sql-cloudant/pom.xml                            | 12 +++
 .../bahir/cloudant/ClientSparkFunSuite.scala    | 31 ++------
 .../bahir/cloudant/CloudantAllDocsDFSuite.scala | 14 ++--
 .../bahir/cloudant/CloudantChangesDFSuite.scala | 19 ++---
 .../bahir/cloudant/CloudantOptionSuite.scala    | 16 ++--
 .../bahir/cloudant/CloudantSparkSQLSuite.scala  |  8 +-
 .../org/apache/bahir/cloudant/TestUtils.scala   | 14 +---
 sql-streaming-akka/pom.xml                      |  5 ++
 .../org/apache/bahir/utils/BahirUtils.scala     | 47 ------------
 .../scala/org/apache/bahir/utils/Logging.scala  | 24 ------
 .../streaming/akka/AkkaStreamSourceSuite.scala  |  4 +-
 sql-streaming-mqtt/pom.xml                      |  5 ++
 .../sql/streaming/mqtt/CachedMQTTClient.scala   |  1 +
 .../sql/streaming/mqtt/MQTTStreamSink.scala     |  1 +
 .../bahir/sql/streaming/mqtt/MQTTUtils.scala    | 33 ---------
 .../org/apache/bahir/utils/BahirUtils.scala     | 48 ------------
 .../scala/org/apache/bahir/utils/Logging.scala  | 25 -------
 .../streaming/mqtt/LocalMessageStoreSuite.scala |  4 +-
 .../streaming/mqtt/MQTTStreamSinkSuite.scala    |  4 +-
 .../streaming/mqtt/MQTTStreamSourceSuite.scala  |  4 +-
 streaming-akka/pom.xml                          |  7 ++
 .../streaming/akka/JavaAkkaUtilsSuite.java      | 57 +++++++-------
 streaming-mqtt/pom.xml                          |  7 ++
 .../streaming/LocalJavaStreamingContext.java    | 44 -----------
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  |  1 +
 streaming-pubnub/pom.xml                        |  7 ++
 .../streaming/LocalJavaStreamingContext.java    | 43 -----------
 streaming-pubsub/pom.xml                        |  7 ++
 .../streaming/LocalJavaStreamingContext.java    | 44 -----------
 .../spark/streaming/pubsub/PubsubFunSuite.scala | 46 ------------
 .../streaming/pubsub/PubsubStreamSuite.scala    |  9 ++-
 .../streaming/pubsub/PubsubTestUtils.scala      |  5 +-
 streaming-twitter/pom.xml                       |  7 ++
 .../streaming/LocalJavaStreamingContext.java    | 44 -----------
 streaming-zeromq/pom.xml                        |  7 ++
 .../streaming/LocalJavaStreamingContext.java    | 44 -----------
 44 files changed, 457 insertions(+), 561 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
new file mode 100644
index 0000000..d7757bb
--- /dev/null
+++ b/common/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.bahir</groupId>
+    <artifactId>bahir-parent_2.11</artifactId>
+    <version>2.4.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.bahir</groupId>
+  <artifactId>bahir-common_2.11</artifactId>
+  <properties>
+    <sbt.project.name>bahir-common</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Apache Bahir - Common</name>
+  <url>http://bahir.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalacheck</groupId>
+      <artifactId>scalacheck_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala
----------------------------------------------------------------------
diff --git a/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala b/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala
new file mode 100644
index 0000000..1800871
--- /dev/null
+++ b/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.utils
+
+import java.io.{File, IOException}
+import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
+import java.nio.file.attribute.BasicFileAttributes
+
+object FileHelper extends Logging {
+  def deleteFileQuietly(file: File): Path = {
+    Files.walkFileTree(file.toPath, new SimpleFileVisitor[Path]() {
+      override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
+        try {
+          Files.delete(file)
+        } catch {
+          case t: Throwable => log.warn("Failed to delete", t)
+        }
+        FileVisitResult.CONTINUE
+      }
+
+      override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
+        try {
+          Files.delete(dir)
+        } catch {
+          case t: Throwable => log.warn("Failed to delete", t)
+        }
+        FileVisitResult.CONTINUE
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/main/scala/org/apache/bahir/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/common/src/main/scala/org/apache/bahir/utils/Logging.scala b/common/src/main/scala/org/apache/bahir/utils/Logging.scala
new file mode 100644
index 0000000..776ed5a
--- /dev/null
+++ b/common/src/main/scala/org/apache/bahir/utils/Logging.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.utils
+
+import org.slf4j.LoggerFactory
+
+trait Logging {
+  final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/main/scala/org/apache/bahir/utils/Retry.scala
----------------------------------------------------------------------
diff --git a/common/src/main/scala/org/apache/bahir/utils/Retry.scala b/common/src/main/scala/org/apache/bahir/utils/Retry.scala
new file mode 100644
index 0000000..a5c429a
--- /dev/null
+++ b/common/src/main/scala/org/apache/bahir/utils/Retry.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.utils
+
+object Retry {
+  /**
+   * Retry invocation of given code.
+   * @param attempts Number of attempts to try executing given code. -1 represents infinity.
+   * @param pauseMs Number of backoff milliseconds.
+   * @param retryExceptions Types of exceptions to retry.
+   * @param code Function to execute.
+   * @tparam A Type parameter.
+   * @return Returns result of function execution or exception in case of failure.
+   */
+  def apply[A](attempts: Int, pauseMs: Long, retryExceptions: Class[_]*)(code: => A): A = {
+    var result: Option[A] = None
+    var success = false
+    var remaining = attempts
+    while (!success) {
+      try {
+        remaining -= 1
+        result = Some(code)
+        success = true
+      }
+      catch {
+        case e: Exception =>
+          if (retryExceptions.contains(e.getClass) && (attempts == -1 || remaining > 0)) {
+            Thread.sleep(pauseMs)
+          } else {
+            throw e
+          }
+      }
+    }
+    result.get
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala b/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala
new file mode 100644
index 0000000..922ec5f
--- /dev/null
+++ b/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+trait ConditionalSparkFunSuite extends SparkFunSuite {
+  /**
+   * Run test if given predicate is satisfied.
+   * @param testName Test name
+   * @param condition If satisfied, test will be executed
+   * @param testBody Test body
+   */
+  def testIf(testName: String, condition: () => Boolean)(testBody: => Unit) {
+    if (condition()) {
+      test(testName)(testBody)
+    } else {
+      ignore(testName)(testBody)
+    }
+  }
+
+  /**
+   * Run given code only if predicate has been satisfied.
+   * @param condition If satisfied, run code block
+   * @param body Code block
+   */
+  def runIf(condition: () => Boolean)(body: => Unit): Unit = {
+    if (condition()) {
+      body
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..012c7fb
--- /dev/null
+++ b/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+    protected transient JavaStreamingContext ssc;
+
+    @Before
+    public void setUp() {
+        final SparkConf conf = new SparkConf()
+                .setMaster("local[2]")
+                .setAppName("test")
+                .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+        ssc = new JavaStreamingContext(conf, new Duration(1000));
+        ssc.checkpoint("checkpoint");
+    }
+
+    @After
+    public void tearDown() {
+        ssc.stop();
+        ssc = null;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index ea1ed49..18ba854 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -34,6 +34,21 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.bahir</groupId>
+            <artifactId>spark-sql-cloudant_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.bahir</groupId>
+            <artifactId>spark-sql-streaming-akka_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.bahir</groupId>
+            <artifactId>spark-sql-streaming-mqtt_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.bahir</groupId>
             <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
         </dependency>
@@ -44,7 +59,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.bahir</groupId>
-            <artifactId>spark-sql-streaming-mqtt_${scala.binary.version}</artifactId>
+            <artifactId>spark-streaming-pubnub_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.bahir</groupId>
+            <artifactId>spark-streaming-pubsub_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 52fb22c..34862ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,15 +75,16 @@
   </mailingLists>
 
   <modules>
+    <module>common</module>
     <module>sql-cloudant</module>
-    <module>streaming-akka</module>
     <module>sql-streaming-akka</module>
-    <module>streaming-mqtt</module>
     <module>sql-streaming-mqtt</module>
+    <module>streaming-akka</module>
+    <module>streaming-mqtt</module>
+    <module>streaming-pubnub</module>
+    <module>streaming-pubsub</module>
     <module>streaming-twitter</module>
     <module>streaming-zeromq</module>
-    <module>streaming-pubsub</module>
-    <module>streaming-pubnub</module>
   </modules>
 
   <properties>
@@ -107,7 +108,6 @@
     <!-- Streaming Akka connector -->
     <akka.group>com.typesafe.akka</akka.group>
     <akka.version>2.5.12</akka.version>
-    <akka_zeromq.version>2.3.16</akka_zeromq.version>
 
     <protobuf.version>2.5.0</protobuf.version>
     <jsr305.version>3.0.1</jsr305.version>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/pom.xml
----------------------------------------------------------------------
diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml
index d1a3be7..d81232a 100644
--- a/sql-cloudant/pom.xml
+++ b/sql-cloudant/pom.xml
@@ -36,6 +36,18 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>bahir-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>bahir-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>com.cloudant</groupId>
       <artifactId>cloudant-client</artifactId>
       <version>2.11.0</version>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
index aa8a48e..ed14f17 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
@@ -26,12 +26,12 @@ import com.cloudant.client.api.CloudantClient
 import com.google.gson.{Gson, JsonArray, JsonObject}
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{ConditionalSparkFunSuite, SparkConf}
 import org.apache.spark.sql.SparkSession
 
-import org.apache.bahir.cloudant.TestUtils.shouldRunTests
+import org.apache.bahir.utils.FileHelper
 
-class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter {
+class ClientSparkFunSuite extends ConditionalSparkFunSuite with BeforeAndAfter {
   private val tempDir: File = new File(System.getProperty("java.io.tmpdir") + "/sql-cloudant/")
 
   var client: CloudantClient = _
@@ -40,7 +40,7 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter {
   var spark: SparkSession = _
 
   override def beforeAll() {
-    runIfTestsEnabled("Prepare Cloudant test databases") {
+    runIf(TestUtils.shouldRunTest) {
       tempDir.mkdirs()
       tempDir.deleteOnExit()
       setupClient()
@@ -49,7 +49,7 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter {
 }
 
   override def afterAll() {
-    TestUtils.deleteRecursively(tempDir)
+    FileHelper.deleteFileQuietly(tempDir)
     deleteTestDbs()
     teardownClient()
     spark.close()
@@ -119,25 +119,4 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter {
   def deleteTestDb(dbName: String) {
     client.deleteDB(dbName)
   }
-
-  /** Run the test if environment variable is set or ignore the test */
-  def testIfEnabled(testName: String)(testBody: => Unit) {
-    if (shouldRunTests) {
-      test(testName)(testBody)
-    } else {
-      ignore(s"$testName [enable by setting env var CLOUDANT_USER and " +
-        s"CLOUDANT_PASSWORD]")(testBody)
-    }
-  }
-
-
-  /** Run the body of code only if tests are enabled */
-  def runIfTestsEnabled(message: String)(body: => Unit): Unit = {
-    if (shouldRunTests) {
-      body
-    } else {
-      ignore(s"$message [enable by setting env var CLOUDANT_USER and " +
-        s"CLOUDANT_PASSWORD]")(())
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala
index 982bbf9..635fa32 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala
@@ -35,7 +35,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
       .getOrCreate()
   }
 
-  testIfEnabled("load and save data from Cloudant database") {
+  testIf("load and save data from Cloudant database", TestUtils.shouldRunTest) {
     // Loading data from Cloudant db
     val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
     // Caching df in memory to speed computations
@@ -45,7 +45,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
     assert(df.count() == 1967)
   }
 
-  testIfEnabled("load and count data from Cloudant search index") {
+  testIf("load and count data from Cloudant search index", TestUtils.shouldRunTest) {
     val df = spark.read.format("org.apache.bahir.cloudant")
       .option("index", "_design/view/_search/n_flights").load("n_flight")
     val total = df.filter(df("flightSegmentId") >"AA9")
@@ -54,7 +54,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
     assert(total == 50)
   }
 
-  testIfEnabled("load data and count rows in filtered dataframe") {
+  testIf("load data and count rows in filtered dataframe", TestUtils.shouldRunTest) {
     // Loading data from Cloudant db
     val df = spark.read.format("org.apache.bahir.cloudant")
       .load("n_airportcodemapping")
@@ -63,7 +63,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
   }
 
   // save data to Cloudant test
-  testIfEnabled("save filtered dataframe to database") {
+  testIf("save filtered dataframe to database", TestUtils.shouldRunTest) {
     val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
 
     // Saving data frame with filter to Cloudant db
@@ -80,7 +80,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
   }
 
   // createDBOnSave option test
-  testIfEnabled("save dataframe to database using createDBOnSave=true option") {
+  testIf("save dataframe to database using createDBOnSave=true option", TestUtils.shouldRunTest) {
     val df = spark.read.format("org.apache.bahir.cloudant")
       .load("n_airportcodemapping")
 
@@ -106,13 +106,13 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
   }
 
   // view option tests
-  testIfEnabled("load and count data from view") {
+  testIf("load and count data from view", TestUtils.shouldRunTest) {
     val df = spark.read.format("org.apache.bahir.cloudant")
       .option("view", "_design/view/_view/AA0").load("n_flight")
     assert(df.count() == 5)
   }
 
-  testIfEnabled("load data from view with MapReduce function") {
+  testIf("load data from view with MapReduce function", TestUtils.shouldRunTest) {
     val df = spark.read.format("org.apache.bahir.cloudant")
       .option("view", "_design/view/_view/AAreduce?reduce=true")
       .load("n_flight")

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
index 5e8f6f6..4210566 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
@@ -39,7 +39,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
     spark.close()
   }
 
-  testIfEnabled("load and save data from Cloudant database") {
+  testIf("load and save data from Cloudant database", TestUtils.shouldRunTest) {
     // Loading data from Cloudant db
     val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
     // Caching df in memory to speed computations
@@ -50,7 +50,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
     assert(df.count() == 1967)
   }
 
-  testIfEnabled("load and count data from Cloudant search index") {
+  testIf("load and count data from Cloudant search index", TestUtils.shouldRunTest) {
     val df = spark.read.format("org.apache.bahir.cloudant")
       .option("index", "_design/view/_search/n_flights").load("n_flight")
     val total = df.filter(df("flightSegmentId") >"AA9")
@@ -59,7 +59,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
     assert(total == 50)
   }
 
-  testIfEnabled("load data and verify deleted doc is not in results") {
+  testIf("load data and verify deleted doc is not in results", TestUtils.shouldRunTest) {
     val db = client.database("n_flight", false)
     // delete a saved doc to verify it's not included when loading data
     db.remove(deletedDoc.get("_id").getAsString, deletedDoc.get("_rev").getAsString)
@@ -71,7 +71,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
     assert(!df.columns.contains("_deleted"))
   }
 
-  testIfEnabled("load data and count rows in filtered dataframe") {
+  testIf("load data and count rows in filtered dataframe", TestUtils.shouldRunTest) {
     // Loading data from Cloudant db
     val df = spark.read.format("org.apache.bahir.cloudant")
       .load("n_airportcodemapping")
@@ -80,7 +80,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
   }
 
   // save data to Cloudant test
-  testIfEnabled("save filtered dataframe to database") {
+  testIf("save filtered dataframe to database", TestUtils.shouldRunTest) {
     val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
 
     // Saving data frame with filter to Cloudant db
@@ -97,7 +97,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
   }
 
   // createDBOnSave option test
-  testIfEnabled("save dataframe to database using createDBOnSave=true option") {
+  testIf("save dataframe to database using createDBOnSave=true option", TestUtils.shouldRunTest) {
     val df = spark.read.format("org.apache.bahir.cloudant")
       .load("n_airportcodemapping")
 
@@ -127,20 +127,21 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
   }
 
   // view option tests
-  testIfEnabled("load and count data from view") {
+  testIf("load and count data from view", TestUtils.shouldRunTest) {
     val df = spark.read.format("org.apache.bahir.cloudant")
       .option("view", "_design/view/_view/AA0").load("n_flight")
     assert(df.count() == 5)
   }
 
-  testIfEnabled("load data from view with MapReduce function") {
+  testIf("load data from view with MapReduce function", TestUtils.shouldRunTest) {
     val df = spark.read.format("org.apache.bahir.cloudant")
       .option("view", "_design/view/_view/AAreduce?reduce=true")
       .load("n_flight")
     assert(df.count() == 1)
   }
 
-  testIfEnabled("load data and verify total count of selector, filter, and view option") {
+  testIf("load data and verify total count of selector, filter, and view option",
+      TestUtils.shouldRunTest) {
     val df = spark.read.format("org.apache.bahir.cloudant")
       .option("selector", "{\"flightSegmentId\": {\"$eq\": \"AA2\"}}")
       .load("n_flight")

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
index c487937..4bc66e0 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
@@ -29,7 +29,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
     spark.close()
   }
 
-  testIfEnabled("invalid api receiver option throws an error message") {
+  testIf("invalid api receiver option throws an error message", TestUtils.shouldRunTest) {
     spark = SparkSession.builder().config(conf)
       .config("cloudant.host", TestUtils.getHost)
       .config("cloudant.username", TestUtils.getUsername)
@@ -44,7 +44,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
       s"is invalid. Please supply the valid option '_all_docs' or '_changes'.")
   }
 
-  testIfEnabled("empty username option throws an error message") {
+  testIf("empty username option throws an error message", TestUtils.shouldRunTest) {
     spark = SparkSession.builder().config(conf)
       .config("cloudant.host", TestUtils.getHost)
       .config("cloudant.username", "")
@@ -58,7 +58,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
       s"is empty. Please supply the required value.")
   }
 
-  testIfEnabled("empty password option throws an error message") {
+  testIf("empty password option throws an error message", TestUtils.shouldRunTest) {
     spark = SparkSession.builder().config(conf)
       .config("cloudant.host", TestUtils.getHost)
       .config("cloudant.username", TestUtils.getUsername)
@@ -72,7 +72,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
       s"is empty. Please supply the required value.")
   }
 
-  testIfEnabled("empty databaseName throws an error message") {
+  testIf("empty databaseName throws an error message", TestUtils.shouldRunTest) {
     spark = SparkSession.builder().config(conf)
       .config("cloudant.host", TestUtils.getHost)
       .config("cloudant.username", TestUtils.getUsername)
@@ -86,7 +86,8 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
       s"Please supply the required value.")
   }
 
-  testIfEnabled("incorrect password throws an error message for changes receiver") {
+  testIf("incorrect password throws an error message for changes receiver",
+      TestUtils.shouldRunTest) {
     spark = SparkSession.builder().config(conf)
       .config("cloudant.protocol", TestUtils.getProtocol)
       .config("cloudant.host", TestUtils.getHost)
@@ -103,7 +104,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
       "\"reason\":\"Name or password is incorrect.\"}")
   }
 
-  testIfEnabled("string with valid value for cloudant.numberOfRetries option") {
+  testIf("string with valid value for cloudant.numberOfRetries option", TestUtils.shouldRunTest) {
     spark = SparkSession.builder().config(conf)
       .config("cloudant.host", TestUtils.getHost)
       .config("cloudant.username", TestUtils.getUsername)
@@ -115,7 +116,8 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
     assert(df.count() === 2)
   }
 
-  testIfEnabled("invalid value for cloudant.numberOfRetries option throws an error message") {
+  testIf("invalid value for cloudant.numberOfRetries option throws an error message",
+      TestUtils.shouldRunTest) {
     spark = SparkSession.builder().config(conf)
       .config("cloudant.host", TestUtils.getHost)
       .config("cloudant.username", TestUtils.getUsername)

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala
index 41e5e89..9b70314 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala
@@ -25,8 +25,6 @@ class CloudantSparkSQLSuite extends ClientSparkFunSuite {
     protected override def _sqlContext: SQLContext = spark.sqlContext
   }
 
-  import testImplicits._
-
   val endpoint = "_all_docs"
 
   override def beforeAll() {
@@ -40,8 +38,8 @@ class CloudantSparkSQLSuite extends ClientSparkFunSuite {
       .getOrCreate()
   }
 
-  testIfEnabled("verify results from temp view of database n_airportcodemapping") {
-
+  testIf("verify results from temp view of database n_airportcodemapping",
+      TestUtils.shouldRunTest) {
     // create a temp table from Cloudant db and query it using sql syntax
     val sparkSql = spark.sql(
       s"""
@@ -69,7 +67,7 @@ class CloudantSparkSQLSuite extends ClientSparkFunSuite {
     assert(df2count == airportData.count())
   }
 
-  testIfEnabled("verify results from temp view of index in n_flight") {
+  testIf("verify results from temp view of index in n_flight", TestUtils.shouldRunTest) {
     // create a temp table from Cloudant index  and query it using sql syntax
     val sparkSql = spark.sql(
       s"""

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala
index dee6542..2904c25 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala
@@ -17,8 +17,6 @@
 
 package org.apache.bahir.cloudant
 
-import java.io.File
-
 object TestUtils {
   // Set CouchDB/Cloudant host, username and password for local testing
   private val host = System.getenv("CLOUDANT_HOST")
@@ -37,15 +35,6 @@ object TestUtils {
     "n_flightsegment"
   )
 
-  def deleteRecursively(file: File): Unit = {
-    if (file.isDirectory) {
-      file.listFiles.foreach(deleteRecursively)
-    }
-    if (file.exists && !file.delete) {
-      throw new Exception(s"Unable to delete ${file.getAbsolutePath}")
-    }
-  }
-
   // default value is https for cloudant.com accounts
   def getProtocol: String = {
     if (protocol != null && !protocol.isEmpty) {
@@ -71,12 +60,11 @@ object TestUtils {
     password
   }
 
-  lazy val shouldRunTests = {
+  def shouldRunTest(): Boolean = {
     val isEnvSet = (username != null && !username.isEmpty) &&
       (password != null && !password.isEmpty)
     if (isEnvSet) {
       // scalastyle:off println
-      // Print this so that they are easily visible on the console and not hidden in the log4j logs.
       println(
         s"""
            |Sql-cloudant tests that require Cloudant databases have been enabled by

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/pom.xml b/sql-streaming-akka/pom.xml
index 028b719..98586c7 100644
--- a/sql-streaming-akka/pom.xml
+++ b/sql-streaming-akka/pom.xml
@@ -36,6 +36,11 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.bahir</groupId>
+            <artifactId>bahir-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-tags_${scala.binary.version}</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
deleted file mode 100644
index 996a0a1..0000000
--- a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bahir.utils
-
-import java.io.{File, IOException}
-import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
-import java.nio.file.attribute.BasicFileAttributes
-
-object BahirUtils extends Logging {
-
-  def recursiveDeleteDir(dir: File): Path = {
-    Files.walkFileTree(dir.toPath, new SimpleFileVisitor[Path]() {
-      override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
-        try {
-          Files.delete(file)
-        } catch {
-          case t: Throwable => log.warn("Failed to delete", t)
-        }
-        FileVisitResult.CONTINUE
-      }
-
-      override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
-        try {
-          Files.delete(dir)
-        } catch {
-          case t: Throwable => log.warn("Failed to delete", t)
-        }
-        FileVisitResult.CONTINUE
-      }
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
deleted file mode 100644
index 776ed5a..0000000
--- a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bahir.utils
-
-import org.slf4j.LoggerFactory
-
-trait Logging {
-  final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
index f61b067..b04ed3c 100644
--- a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
+++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.types.StructType
 
-import org.apache.bahir.utils.BahirUtils
+import org.apache.bahir.utils.FileHelper
 
 class AkkaStreamSourceSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -50,7 +50,7 @@ class AkkaStreamSourceSuite extends SparkFunSuite with BeforeAndAfter {
 
   after {
     Persistence.close()
-    BahirUtils.recursiveDeleteDir(tempDir)
+    FileHelper.deleteFileQuietly(tempDir)
   }
 
   protected val tmpDir: String = tempDir.getAbsolutePath

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/pom.xml b/sql-streaming-mqtt/pom.xml
index 242f24b..63497dc 100644
--- a/sql-streaming-mqtt/pom.xml
+++ b/sql-streaming-mqtt/pom.xml
@@ -36,6 +36,11 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>bahir-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
index f825eea..fed2601 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
@@ -27,6 +27,7 @@ import scala.util.control.NonFatal
 import org.apache.spark.SparkEnv
 
 import org.apache.bahir.utils.Logging
+import org.apache.bahir.utils.Retry
 
 
 private[mqtt] object CachedMQTTClient extends Logging {

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
index 8654b88..f449e57 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
 import org.apache.bahir.utils.Logging
+import org.apache.bahir.utils.Retry
 
 
 class MQTTStreamWriter (schema: StructType, parameters: DataSourceOptions)

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
index 79fe7a2..a615d28 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
@@ -82,36 +82,3 @@ private[mqtt] object MQTTUtils extends Logging {
     (brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos)
   }
 }
-
-private[mqtt] object Retry {
-  /**
-   * Retry invocation of given code.
-   * @param attempts Number of attempts to try executing given code. -1 represents infinity.
-   * @param pauseMs Number of backoff milliseconds.
-   * @param retryExceptions Types of exceptions to retry.
-   * @param code Function to execute.
-   * @tparam A Type parameter.
-   * @return Returns result of function execution or exception in case of failure.
-   */
-  def apply[A](attempts: Int, pauseMs: Long, retryExceptions: Class[_]*)(code: => A): A = {
-    var result: Option[A] = None
-    var success = false
-    var remaining = attempts
-    while ( ! success ) {
-      try {
-        remaining -= 1
-        result = Some( code )
-        success = true
-      }
-      catch {
-        case e: Exception =>
-          if (retryExceptions.contains(e.getClass) && (attempts == -1 || remaining > 0)) {
-            Thread.sleep(pauseMs)
-          } else {
-            throw e
-          }
-      }
-    }
-    result.get
-  }
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
deleted file mode 100644
index 3d27b06..0000000
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bahir.utils
-
-import java.io.{File, IOException}
-import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
-import java.nio.file.attribute.BasicFileAttributes
-
-object BahirUtils extends Logging {
-
-  def recursiveDeleteDir(dir: File): Path = {
-    Files.walkFileTree(dir.toPath, new SimpleFileVisitor[Path]() {
-      override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
-        try {
-          Files.delete(file)
-        } catch {
-          case t: Throwable => log.warn("Failed to delete", t)
-        }
-        FileVisitResult.CONTINUE
-      }
-
-      override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
-        try {
-          Files.delete(dir)
-        } catch {
-          case t: Throwable => log.warn("Failed to delete", t)
-        }
-        FileVisitResult.CONTINUE
-      }
-    })
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala
deleted file mode 100644
index cbe97e9..0000000
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bahir.utils
-
-import org.slf4j.LoggerFactory
-
-
-trait Logging {
-  final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala
index d1bbe18..0b6b80b 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkFunSuite
 
-import org.apache.bahir.utils.BahirUtils
+import org.apache.bahir.utils.FileHelper
 
 
 class LocalMessageStoreSuite extends SparkFunSuite with BeforeAndAfter {
@@ -48,7 +48,7 @@ class LocalMessageStoreSuite extends SparkFunSuite with BeforeAndAfter {
   after {
     persistence.clear()
     persistence.close()
-    BahirUtils.recursiveDeleteDir(tempDir)
+    FileHelper.deleteFileQuietly(tempDir)
   }
 
   test("serialize and deserialize") {

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
index 14ea962..d72ba17 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 
-import org.apache.bahir.utils.BahirUtils
+import org.apache.bahir.utils.FileHelper
 
 
 class MQTTStreamSinkSuite extends SparkFunSuite with SharedSparkContext with BeforeAndAfter {
@@ -57,7 +57,7 @@ class MQTTStreamSinkSuite extends SparkFunSuite with SharedSparkContext with Bef
     testClient.disconnect()
     testClient.close()
     mqttTestUtils.teardown()
-    BahirUtils.recursiveDeleteDir(tempDir)
+    FileHelper.deleteFileQuietly(tempDir)
   }
 
   protected def createContextAndDF(messages: String*): (SQLContext, DataFrame) = {

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
index bb82715..a7eb770 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery}
 
-import org.apache.bahir.utils.BahirUtils
+import org.apache.bahir.utils.FileHelper
 
 class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with BeforeAndAfter {
 
@@ -50,7 +50,7 @@ class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with B
 
   after {
     mqttTestUtils.teardown()
-    BahirUtils.recursiveDeleteDir(tempDir)
+    FileHelper.deleteFileQuietly(tempDir)
   }
 
   protected val tmpDir: String = tempDir.getAbsolutePath

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-akka/pom.xml b/streaming-akka/pom.xml
index 7929f33..5b94c7a 100644
--- a/streaming-akka/pom.xml
+++ b/streaming-akka/pom.xml
@@ -36,6 +36,13 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>bahir-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
index 4a6d578..1c8fd78 100644
--- a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
+++ b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
@@ -21,50 +21,45 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.util.Timeout;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.junit.Test;
 
 import org.apache.spark.api.java.function.Function0;
 import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
 
 import java.util.concurrent.TimeUnit;
 
-public class JavaAkkaUtilsSuite {
-
-  @Test // tests the API, does not actually test data receiving
-  public void testAkkaUtils() {
-    JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
-    try {
-      JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
-        jsc, Props.create(JavaTestActor.class), "test");
-      JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
-        jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2());
-      JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
-        jsc,
-        Props.create(JavaTestActor.class),
-        "test", StorageLevel.MEMORY_AND_DISK_SER_2(),
-        new ActorSystemCreatorForTest(),
-        SupervisorStrategy.defaultStrategy());
-    } finally {
-      jsc.stop();
+public class JavaAkkaUtilsSuite extends LocalJavaStreamingContext {
+    @Test
+    public void testAkkaUtils() {
+        // tests the API, does not actually test data receiving
+        JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
+                ssc, Props.create(JavaTestActor.class), "test"
+        );
+        JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
+                ssc, Props.create(JavaTestActor.class), "test",
+                StorageLevel.MEMORY_AND_DISK_SER_2()
+        );
+        JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
+                ssc, Props.create(JavaTestActor.class), "test",
+                StorageLevel.MEMORY_AND_DISK_SER_2(), new ActorSystemCreatorForTest(),
+                SupervisorStrategy.defaultStrategy()
+        );
     }
-  }
 }
 
 class ActorSystemCreatorForTest implements Function0<ActorSystem> {
-  @Override
-  public ActorSystem call() {
-    return null;
-  }
+    @Override
+    public ActorSystem call() {
+        return null;
+    }
 }
 
-
 class JavaTestActor extends JavaActorReceiver {
-  @Override
-  public void onReceive(Object message) throws Exception {
-    store((String) message);
-    store((String) message, new Timeout(1000, TimeUnit.MILLISECONDS));
-  }
+    @Override
+    public void onReceive(Object message) throws Exception {
+        store((String) message);
+        store((String) message, new Timeout(1000, TimeUnit.MILLISECONDS));
+    }
 }

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-mqtt/pom.xml b/streaming-mqtt/pom.xml
index f23aa83..0f6d809 100644
--- a/streaming-mqtt/pom.xml
+++ b/streaming-mqtt/pom.xml
@@ -36,6 +36,13 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>bahir-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a..0000000
--- a/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
-    protected transient JavaStreamingContext ssc;
-
-    @Before
-    public void setUp() {
-        SparkConf conf = new SparkConf()
-            .setMaster("local[2]")
-            .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
-        ssc = new JavaStreamingContext(conf, new Duration(1000));
-        ssc.checkpoint("checkpoint");
-    }
-
-    @After
-    public void tearDown() {
-        ssc.stop();
-        ssc = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 6ef551b..d86aa98 100644
--- a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -77,6 +77,7 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
     }
     ssc.stop()
   }
+
   test("mqtt input stream2") {
     val sendMessage1 = "MQTT demo for spark streaming1"
     val sendMessage2 = "MQTT demo for spark streaming2"

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubnub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubnub/pom.xml b/streaming-pubnub/pom.xml
index ac0b925..464cfce 100644
--- a/streaming-pubnub/pom.xml
+++ b/streaming-pubnub/pom.xml
@@ -35,6 +35,13 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>bahir-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index 448fb5e..0000000
--- a/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-    protected transient JavaStreamingContext ssc;
-
-    @Before
-    public void setUp() {
-        SparkConf conf = new SparkConf()
-            .setMaster("local[2]")
-            .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
-        ssc = new JavaStreamingContext(conf, new Duration(1000));
-        ssc.checkpoint("checkpoint");
-    }
-
-    @After
-    public void tearDown() {
-        ssc.stop();
-        ssc = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubsub/pom.xml b/streaming-pubsub/pom.xml
index 0885152..f6ecd37 100644
--- a/streaming-pubsub/pom.xml
+++ b/streaming-pubsub/pom.xml
@@ -36,6 +36,13 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>bahir-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a..0000000
--- a/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
-    protected transient JavaStreamingContext ssc;
-
-    @Before
-    public void setUp() {
-        SparkConf conf = new SparkConf()
-            .setMaster("local[2]")
-            .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
-        ssc = new JavaStreamingContext(conf, new Duration(1000));
-        ssc.checkpoint("checkpoint");
-    }
-
-    @After
-    public void tearDown() {
-        ssc.stop();
-        ssc = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
deleted file mode 100644
index acdceb7..0000000
--- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.pubsub
-
-import org.apache.spark.SparkFunSuite
-
-/**
- * Helper class that runs Google Cloud Pub/Sub real data transfer tests of
- * ignores them based on env variable is set or not.
- */
-trait PubsubFunSuite extends SparkFunSuite {
-  import PubsubTestUtils._
-
-  /** Run the test if environment variable is set or ignore the test */
-  def testIfEnabled(testName: String)(testBody: => Unit) {
-    if (shouldRunTests) {
-      test(testName)(testBody)
-    } else {
-      ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody)
-    }
-  }
-
-  /** Run the give body of code only if Kinesis tests are enabled */
-  def runIfTestsEnabled(message: String)(body: => Unit): Unit = {
-    if (shouldRunTests) {
-      body
-    } else {
-      ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(())
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
index d91c7e6..8f499cb 100644
--- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
@@ -25,11 +25,12 @@ import scala.language.postfixOps
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.Eventually
 
+import org.apache.spark.ConditionalSparkFunSuite
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.Seconds
 import org.apache.spark.streaming.StreamingContext
 
-class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAfter {
+class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with BeforeAndAfter {
 
   val batchDuration = Seconds(1)
 
@@ -50,7 +51,7 @@ class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAft
   private var subForCreateFullName: String = null
 
   override def beforeAll(): Unit = {
-    runIfTestsEnabled("Prepare PubsubTestUtils") {
+    runIf(PubsubTestUtils.shouldRunTest) {
       pubsubTestUtils = new PubsubTestUtils
       topicFullName = pubsubTestUtils.getFullTopicPath(topicName)
       subscriptionFullName = pubsubTestUtils.getFullSubscriptionPath(subscriptionName)
@@ -88,7 +89,7 @@ class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAft
       PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
   }
 
-  testIfEnabled("pubsub input stream") {
+  testIf("pubsub input stream", PubsubTestUtils.shouldRunTest) {
     val receiveStream = PubsubUtils.createStream(
       ssc, PubsubTestUtils.projectId, Some(topicName), subscriptionName,
       PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
@@ -112,7 +113,7 @@ class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAft
     }
   }
 
-  testIfEnabled("pubsub input stream, create pubsub") {
+  testIf("pubsub input stream, create pubsub", PubsubTestUtils.shouldRunTest) {
     val receiveStream = PubsubUtils.createStream(
       ssc, PubsubTestUtils.projectId, Some(topicName), subForCreateName,
       PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
index 9dd719a..39597ca 100644
--- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
@@ -101,11 +101,10 @@ private[pubsub] object PubsubTestUtils {
   val envVarNameForP12KeyPath = "GCP_TEST_P12_KEY_PATH"
   val envVarNameForAccount = "GCP_TEST_ACCOUNT"
 
-  lazy val shouldRunTests = {
-    val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
+  def shouldRunTest(): Boolean = {
+    val isEnvSet = sys.env.get(envVarNameForEnablingTests).contains("1")
     if (isEnvSet) {
       // scalastyle:off println
-      // Print this so that they are easily visible on the console and not hidden in the log4j logs.
       println(
         s"""
            |Google Pub/Sub tests that actually send data has been enabled by setting the environment

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-twitter/pom.xml b/streaming-twitter/pom.xml
index 1f18100..2bf29b5 100644
--- a/streaming-twitter/pom.xml
+++ b/streaming-twitter/pom.xml
@@ -36,6 +36,13 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>bahir-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a..0000000
--- a/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
-    protected transient JavaStreamingContext ssc;
-
-    @Before
-    public void setUp() {
-        SparkConf conf = new SparkConf()
-            .setMaster("local[2]")
-            .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
-        ssc = new JavaStreamingContext(conf, new Duration(1000));
-        ssc.checkpoint("checkpoint");
-    }
-
-    @After
-    public void tearDown() {
-        ssc.stop();
-        ssc = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-zeromq/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-zeromq/pom.xml b/streaming-zeromq/pom.xml
index 92115f2..5aaf2fa 100644
--- a/streaming-zeromq/pom.xml
+++ b/streaming-zeromq/pom.xml
@@ -36,6 +36,13 @@
 
   <dependencies>
     <dependency>
+      <groupId>org.apache.bahir</groupId>
+      <artifactId>bahir-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-tags_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index f9cee96..0000000
--- a/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-    protected transient JavaStreamingContext ssc;
-
-    @Before
-    public void setUp() {
-        final SparkConf conf = new SparkConf()
-            .setMaster("local[2]")
-            .setAppName("test")
-            .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
-        ssc = new JavaStreamingContext(conf, new Duration(1000));
-        ssc.checkpoint("checkpoint");
-    }
-
-    @After
-    public void tearDown() {
-        ssc.stop();
-        ssc = null;
-    }
-}
-