You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2018/12/12 02:05:17 UTC
bahir git commit: [BAHIR-103] New module with common utilities and
test classes
Repository: bahir
Updated Branches:
refs/heads/master e0e49e23c -> 0601698c3
[BAHIR-103] New module with common utilities and test classes
Closes #73
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/0601698c
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/0601698c
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/0601698c
Branch: refs/heads/master
Commit: 0601698c3721fb3db58431683e556af28ffc0d6a
Parents: e0e49e2
Author: Lukasz Antoniak <lu...@gmail.com>
Authored: Mon Dec 3 08:52:10 2018 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Tue Dec 11 23:04:51 2018 -0300
----------------------------------------------------------------------
common/pom.xml | 78 ++++++++++++++++++++
.../org/apache/bahir/utils/FileHelper.scala | 46 ++++++++++++
.../scala/org/apache/bahir/utils/Logging.scala | 24 ++++++
.../scala/org/apache/bahir/utils/Retry.scala | 51 +++++++++++++
.../apache/spark/ConditionalSparkFunSuite.scala | 45 +++++++++++
.../streaming/LocalJavaStreamingContext.java | 44 +++++++++++
distribution/pom.xml | 22 +++++-
pom.xml | 10 +--
sql-cloudant/pom.xml | 12 +++
.../bahir/cloudant/ClientSparkFunSuite.scala | 31 ++------
.../bahir/cloudant/CloudantAllDocsDFSuite.scala | 14 ++--
.../bahir/cloudant/CloudantChangesDFSuite.scala | 19 ++---
.../bahir/cloudant/CloudantOptionSuite.scala | 16 ++--
.../bahir/cloudant/CloudantSparkSQLSuite.scala | 8 +-
.../org/apache/bahir/cloudant/TestUtils.scala | 14 +---
sql-streaming-akka/pom.xml | 5 ++
.../org/apache/bahir/utils/BahirUtils.scala | 47 ------------
.../scala/org/apache/bahir/utils/Logging.scala | 24 ------
.../streaming/akka/AkkaStreamSourceSuite.scala | 4 +-
sql-streaming-mqtt/pom.xml | 5 ++
.../sql/streaming/mqtt/CachedMQTTClient.scala | 1 +
.../sql/streaming/mqtt/MQTTStreamSink.scala | 1 +
.../bahir/sql/streaming/mqtt/MQTTUtils.scala | 33 ---------
.../org/apache/bahir/utils/BahirUtils.scala | 48 ------------
.../scala/org/apache/bahir/utils/Logging.scala | 25 -------
.../streaming/mqtt/LocalMessageStoreSuite.scala | 4 +-
.../streaming/mqtt/MQTTStreamSinkSuite.scala | 4 +-
.../streaming/mqtt/MQTTStreamSourceSuite.scala | 4 +-
streaming-akka/pom.xml | 7 ++
.../streaming/akka/JavaAkkaUtilsSuite.java | 57 +++++++-------
streaming-mqtt/pom.xml | 7 ++
.../streaming/LocalJavaStreamingContext.java | 44 -----------
.../spark/streaming/mqtt/MQTTStreamSuite.scala | 1 +
streaming-pubnub/pom.xml | 7 ++
.../streaming/LocalJavaStreamingContext.java | 43 -----------
streaming-pubsub/pom.xml | 7 ++
.../streaming/LocalJavaStreamingContext.java | 44 -----------
.../spark/streaming/pubsub/PubsubFunSuite.scala | 46 ------------
.../streaming/pubsub/PubsubStreamSuite.scala | 9 ++-
.../streaming/pubsub/PubsubTestUtils.scala | 5 +-
streaming-twitter/pom.xml | 7 ++
.../streaming/LocalJavaStreamingContext.java | 44 -----------
streaming-zeromq/pom.xml | 7 ++
.../streaming/LocalJavaStreamingContext.java | 44 -----------
44 files changed, 457 insertions(+), 561 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
new file mode 100644
index 0000000..d7757bb
--- /dev/null
+++ b/common/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-parent_2.11</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-common_2.11</artifactId>
+ <properties>
+ <sbt.project.name>bahir-common</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Apache Bahir - Common</name>
+ <url>http://bahir.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala
----------------------------------------------------------------------
diff --git a/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala b/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala
new file mode 100644
index 0000000..1800871
--- /dev/null
+++ b/common/src/main/scala/org/apache/bahir/utils/FileHelper.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.utils
+
+import java.io.{File, IOException}
+import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
+import java.nio.file.attribute.BasicFileAttributes
+
+object FileHelper extends Logging {
+ def deleteFileQuietly(file: File): Path = {
+ Files.walkFileTree(file.toPath, new SimpleFileVisitor[Path]() {
+ override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
+ try {
+ Files.delete(file)
+ } catch {
+ case t: Throwable => log.warn("Failed to delete", t)
+ }
+ FileVisitResult.CONTINUE
+ }
+
+ override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
+ try {
+ Files.delete(dir)
+ } catch {
+ case t: Throwable => log.warn("Failed to delete", t)
+ }
+ FileVisitResult.CONTINUE
+ }
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/main/scala/org/apache/bahir/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/common/src/main/scala/org/apache/bahir/utils/Logging.scala b/common/src/main/scala/org/apache/bahir/utils/Logging.scala
new file mode 100644
index 0000000..776ed5a
--- /dev/null
+++ b/common/src/main/scala/org/apache/bahir/utils/Logging.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.utils
+
+import org.slf4j.LoggerFactory
+
+trait Logging {
+ final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/main/scala/org/apache/bahir/utils/Retry.scala
----------------------------------------------------------------------
diff --git a/common/src/main/scala/org/apache/bahir/utils/Retry.scala b/common/src/main/scala/org/apache/bahir/utils/Retry.scala
new file mode 100644
index 0000000..a5c429a
--- /dev/null
+++ b/common/src/main/scala/org/apache/bahir/utils/Retry.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.utils
+
+object Retry {
+ /**
+ * Retry invocation of given code.
+ * @param attempts Number of attempts to try executing given code. -1 represents infinity.
+ * @param pauseMs Number of backoff milliseconds.
+ * @param retryExceptions Types of exceptions to retry.
+ * @param code Function to execute.
+ * @tparam A Type parameter.
+ * @return Returns result of function execution or exception in case of failure.
+ */
+ def apply[A](attempts: Int, pauseMs: Long, retryExceptions: Class[_]*)(code: => A): A = {
+ var result: Option[A] = None
+ var success = false
+ var remaining = attempts
+ while (!success) {
+ try {
+ remaining -= 1
+ result = Some(code)
+ success = true
+ }
+ catch {
+ case e: Exception =>
+ if (retryExceptions.contains(e.getClass) && (attempts == -1 || remaining > 0)) {
+ Thread.sleep(pauseMs)
+ } else {
+ throw e
+ }
+ }
+ }
+ result.get
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala b/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala
new file mode 100644
index 0000000..922ec5f
--- /dev/null
+++ b/common/src/test/java/org/apache/spark/ConditionalSparkFunSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+trait ConditionalSparkFunSuite extends SparkFunSuite {
+ /**
+ * Run test if given predicate is satisfied.
+ * @param testName Test name
+ * @param condition If satisfied, test will be executed
+ * @param testBody Test body
+ */
+ def testIf(testName: String, condition: () => Boolean)(testBody: => Unit) {
+ if (condition()) {
+ test(testName)(testBody)
+ } else {
+ ignore(testName)(testBody)
+ }
+ }
+
+ /**
+ * Run given code only if predicate has been satisfied.
+ * @param condition If satisfied, run code block
+ * @param body Code block
+ */
+ def runIf(condition: () => Boolean)(body: => Unit): Unit = {
+ if (condition()) {
+ body
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
new file mode 100644
index 0000000..012c7fb
--- /dev/null
+++ b/common/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class LocalJavaStreamingContext {
+ protected transient JavaStreamingContext ssc;
+
+ @Before
+ public void setUp() {
+ final SparkConf conf = new SparkConf()
+ .setMaster("local[2]")
+ .setAppName("test")
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
+ ssc = new JavaStreamingContext(conf, new Duration(1000));
+ ssc.checkpoint("checkpoint");
+ }
+
+ @After
+ public void tearDown() {
+ ssc.stop();
+ ssc = null;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index ea1ed49..18ba854 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -34,6 +34,21 @@
<dependencies>
<dependency>
<groupId>org.apache.bahir</groupId>
+ <artifactId>spark-sql-cloudant_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>spark-sql-streaming-akka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>spark-sql-streaming-mqtt_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
@@ -44,7 +59,12 @@
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
- <artifactId>spark-sql-streaming-mqtt_${scala.binary.version}</artifactId>
+ <artifactId>spark-streaming-pubnub_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>spark-streaming-pubsub_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 52fb22c..34862ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,15 +75,16 @@
</mailingLists>
<modules>
+ <module>common</module>
<module>sql-cloudant</module>
- <module>streaming-akka</module>
<module>sql-streaming-akka</module>
- <module>streaming-mqtt</module>
<module>sql-streaming-mqtt</module>
+ <module>streaming-akka</module>
+ <module>streaming-mqtt</module>
+ <module>streaming-pubnub</module>
+ <module>streaming-pubsub</module>
<module>streaming-twitter</module>
<module>streaming-zeromq</module>
- <module>streaming-pubsub</module>
- <module>streaming-pubnub</module>
</modules>
<properties>
@@ -107,7 +108,6 @@
<!-- Streaming Akka connector -->
<akka.group>com.typesafe.akka</akka.group>
<akka.version>2.5.12</akka.version>
- <akka_zeromq.version>2.3.16</akka_zeromq.version>
<protobuf.version>2.5.0</protobuf.version>
<jsr305.version>3.0.1</jsr305.version>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/pom.xml
----------------------------------------------------------------------
diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml
index d1a3be7..d81232a 100644
--- a/sql-cloudant/pom.xml
+++ b/sql-cloudant/pom.xml
@@ -36,6 +36,18 @@
<dependencies>
<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.cloudant</groupId>
<artifactId>cloudant-client</artifactId>
<version>2.11.0</version>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
index aa8a48e..ed14f17 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/ClientSparkFunSuite.scala
@@ -26,12 +26,12 @@ import com.cloudant.client.api.CloudantClient
import com.google.gson.{Gson, JsonArray, JsonObject}
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{ConditionalSparkFunSuite, SparkConf}
import org.apache.spark.sql.SparkSession
-import org.apache.bahir.cloudant.TestUtils.shouldRunTests
+import org.apache.bahir.utils.FileHelper
-class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter {
+class ClientSparkFunSuite extends ConditionalSparkFunSuite with BeforeAndAfter {
private val tempDir: File = new File(System.getProperty("java.io.tmpdir") + "/sql-cloudant/")
var client: CloudantClient = _
@@ -40,7 +40,7 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter {
var spark: SparkSession = _
override def beforeAll() {
- runIfTestsEnabled("Prepare Cloudant test databases") {
+ runIf(TestUtils.shouldRunTest) {
tempDir.mkdirs()
tempDir.deleteOnExit()
setupClient()
@@ -49,7 +49,7 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter {
}
override def afterAll() {
- TestUtils.deleteRecursively(tempDir)
+ FileHelper.deleteFileQuietly(tempDir)
deleteTestDbs()
teardownClient()
spark.close()
@@ -119,25 +119,4 @@ class ClientSparkFunSuite extends SparkFunSuite with BeforeAndAfter {
def deleteTestDb(dbName: String) {
client.deleteDB(dbName)
}
-
- /** Run the test if environment variable is set or ignore the test */
- def testIfEnabled(testName: String)(testBody: => Unit) {
- if (shouldRunTests) {
- test(testName)(testBody)
- } else {
- ignore(s"$testName [enable by setting env var CLOUDANT_USER and " +
- s"CLOUDANT_PASSWORD]")(testBody)
- }
- }
-
-
- /** Run the body of code only if tests are enabled */
- def runIfTestsEnabled(message: String)(body: => Unit): Unit = {
- if (shouldRunTests) {
- body
- } else {
- ignore(s"$message [enable by setting env var CLOUDANT_USER and " +
- s"CLOUDANT_PASSWORD]")(())
- }
- }
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala
index 982bbf9..635fa32 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantAllDocsDFSuite.scala
@@ -35,7 +35,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
.getOrCreate()
}
- testIfEnabled("load and save data from Cloudant database") {
+ testIf("load and save data from Cloudant database", TestUtils.shouldRunTest) {
// Loading data from Cloudant db
val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
// Caching df in memory to speed computations
@@ -45,7 +45,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
assert(df.count() == 1967)
}
- testIfEnabled("load and count data from Cloudant search index") {
+ testIf("load and count data from Cloudant search index", TestUtils.shouldRunTest) {
val df = spark.read.format("org.apache.bahir.cloudant")
.option("index", "_design/view/_search/n_flights").load("n_flight")
val total = df.filter(df("flightSegmentId") >"AA9")
@@ -54,7 +54,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
assert(total == 50)
}
- testIfEnabled("load data and count rows in filtered dataframe") {
+ testIf("load data and count rows in filtered dataframe", TestUtils.shouldRunTest) {
// Loading data from Cloudant db
val df = spark.read.format("org.apache.bahir.cloudant")
.load("n_airportcodemapping")
@@ -63,7 +63,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
}
// save data to Cloudant test
- testIfEnabled("save filtered dataframe to database") {
+ testIf("save filtered dataframe to database", TestUtils.shouldRunTest) {
val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
// Saving data frame with filter to Cloudant db
@@ -80,7 +80,7 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
}
// createDBOnSave option test
- testIfEnabled("save dataframe to database using createDBOnSave=true option") {
+ testIf("save dataframe to database using createDBOnSave=true option", TestUtils.shouldRunTest) {
val df = spark.read.format("org.apache.bahir.cloudant")
.load("n_airportcodemapping")
@@ -106,13 +106,13 @@ class CloudantAllDocsDFSuite extends ClientSparkFunSuite {
}
// view option tests
- testIfEnabled("load and count data from view") {
+ testIf("load and count data from view", TestUtils.shouldRunTest) {
val df = spark.read.format("org.apache.bahir.cloudant")
.option("view", "_design/view/_view/AA0").load("n_flight")
assert(df.count() == 5)
}
- testIfEnabled("load data from view with MapReduce function") {
+ testIf("load data from view with MapReduce function", TestUtils.shouldRunTest) {
val df = spark.read.format("org.apache.bahir.cloudant")
.option("view", "_design/view/_view/AAreduce?reduce=true")
.load("n_flight")
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
index 5e8f6f6..4210566 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantChangesDFSuite.scala
@@ -39,7 +39,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
spark.close()
}
- testIfEnabled("load and save data from Cloudant database") {
+ testIf("load and save data from Cloudant database", TestUtils.shouldRunTest) {
// Loading data from Cloudant db
val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
// Caching df in memory to speed computations
@@ -50,7 +50,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
assert(df.count() == 1967)
}
- testIfEnabled("load and count data from Cloudant search index") {
+ testIf("load and count data from Cloudant search index", TestUtils.shouldRunTest) {
val df = spark.read.format("org.apache.bahir.cloudant")
.option("index", "_design/view/_search/n_flights").load("n_flight")
val total = df.filter(df("flightSegmentId") >"AA9")
@@ -59,7 +59,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
assert(total == 50)
}
- testIfEnabled("load data and verify deleted doc is not in results") {
+ testIf("load data and verify deleted doc is not in results", TestUtils.shouldRunTest) {
val db = client.database("n_flight", false)
// delete a saved doc to verify it's not included when loading data
db.remove(deletedDoc.get("_id").getAsString, deletedDoc.get("_rev").getAsString)
@@ -71,7 +71,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
assert(!df.columns.contains("_deleted"))
}
- testIfEnabled("load data and count rows in filtered dataframe") {
+ testIf("load data and count rows in filtered dataframe", TestUtils.shouldRunTest) {
// Loading data from Cloudant db
val df = spark.read.format("org.apache.bahir.cloudant")
.load("n_airportcodemapping")
@@ -80,7 +80,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
}
// save data to Cloudant test
- testIfEnabled("save filtered dataframe to database") {
+ testIf("save filtered dataframe to database", TestUtils.shouldRunTest) {
val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
// Saving data frame with filter to Cloudant db
@@ -97,7 +97,7 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
}
// createDBOnSave option test
- testIfEnabled("save dataframe to database using createDBOnSave=true option") {
+ testIf("save dataframe to database using createDBOnSave=true option", TestUtils.shouldRunTest) {
val df = spark.read.format("org.apache.bahir.cloudant")
.load("n_airportcodemapping")
@@ -127,20 +127,21 @@ class CloudantChangesDFSuite extends ClientSparkFunSuite {
}
// view option tests
- testIfEnabled("load and count data from view") {
+ testIf("load and count data from view", TestUtils.shouldRunTest) {
val df = spark.read.format("org.apache.bahir.cloudant")
.option("view", "_design/view/_view/AA0").load("n_flight")
assert(df.count() == 5)
}
- testIfEnabled("load data from view with MapReduce function") {
+ testIf("load data from view with MapReduce function", TestUtils.shouldRunTest) {
val df = spark.read.format("org.apache.bahir.cloudant")
.option("view", "_design/view/_view/AAreduce?reduce=true")
.load("n_flight")
assert(df.count() == 1)
}
- testIfEnabled("load data and verify total count of selector, filter, and view option") {
+ testIf("load data and verify total count of selector, filter, and view option",
+ TestUtils.shouldRunTest) {
val df = spark.read.format("org.apache.bahir.cloudant")
.option("selector", "{\"flightSegmentId\": {\"$eq\": \"AA2\"}}")
.load("n_flight")
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
index c487937..4bc66e0 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantOptionSuite.scala
@@ -29,7 +29,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
spark.close()
}
- testIfEnabled("invalid api receiver option throws an error message") {
+ testIf("invalid api receiver option throws an error message", TestUtils.shouldRunTest) {
spark = SparkSession.builder().config(conf)
.config("cloudant.host", TestUtils.getHost)
.config("cloudant.username", TestUtils.getUsername)
@@ -44,7 +44,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
s"is invalid. Please supply the valid option '_all_docs' or '_changes'.")
}
- testIfEnabled("empty username option throws an error message") {
+ testIf("empty username option throws an error message", TestUtils.shouldRunTest) {
spark = SparkSession.builder().config(conf)
.config("cloudant.host", TestUtils.getHost)
.config("cloudant.username", "")
@@ -58,7 +58,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
s"is empty. Please supply the required value.")
}
- testIfEnabled("empty password option throws an error message") {
+ testIf("empty password option throws an error message", TestUtils.shouldRunTest) {
spark = SparkSession.builder().config(conf)
.config("cloudant.host", TestUtils.getHost)
.config("cloudant.username", TestUtils.getUsername)
@@ -72,7 +72,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
s"is empty. Please supply the required value.")
}
- testIfEnabled("empty databaseName throws an error message") {
+ testIf("empty databaseName throws an error message", TestUtils.shouldRunTest) {
spark = SparkSession.builder().config(conf)
.config("cloudant.host", TestUtils.getHost)
.config("cloudant.username", TestUtils.getUsername)
@@ -86,7 +86,8 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
s"Please supply the required value.")
}
- testIfEnabled("incorrect password throws an error message for changes receiver") {
+ testIf("incorrect password throws an error message for changes receiver",
+ TestUtils.shouldRunTest) {
spark = SparkSession.builder().config(conf)
.config("cloudant.protocol", TestUtils.getProtocol)
.config("cloudant.host", TestUtils.getHost)
@@ -103,7 +104,7 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
"\"reason\":\"Name or password is incorrect.\"}")
}
- testIfEnabled("string with valid value for cloudant.numberOfRetries option") {
+ testIf("string with valid value for cloudant.numberOfRetries option", TestUtils.shouldRunTest) {
spark = SparkSession.builder().config(conf)
.config("cloudant.host", TestUtils.getHost)
.config("cloudant.username", TestUtils.getUsername)
@@ -115,7 +116,8 @@ class CloudantOptionSuite extends ClientSparkFunSuite with BeforeAndAfter {
assert(df.count() === 2)
}
- testIfEnabled("invalid value for cloudant.numberOfRetries option throws an error message") {
+ testIf("invalid value for cloudant.numberOfRetries option throws an error message",
+ TestUtils.shouldRunTest) {
spark = SparkSession.builder().config(conf)
.config("cloudant.host", TestUtils.getHost)
.config("cloudant.username", TestUtils.getUsername)
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala
index 41e5e89..9b70314 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/CloudantSparkSQLSuite.scala
@@ -25,8 +25,6 @@ class CloudantSparkSQLSuite extends ClientSparkFunSuite {
protected override def _sqlContext: SQLContext = spark.sqlContext
}
- import testImplicits._
-
val endpoint = "_all_docs"
override def beforeAll() {
@@ -40,8 +38,8 @@ class CloudantSparkSQLSuite extends ClientSparkFunSuite {
.getOrCreate()
}
- testIfEnabled("verify results from temp view of database n_airportcodemapping") {
-
+ testIf("verify results from temp view of database n_airportcodemapping",
+ TestUtils.shouldRunTest) {
// create a temp table from Cloudant db and query it using sql syntax
val sparkSql = spark.sql(
s"""
@@ -69,7 +67,7 @@ class CloudantSparkSQLSuite extends ClientSparkFunSuite {
assert(df2count == airportData.count())
}
- testIfEnabled("verify results from temp view of index in n_flight") {
+ testIf("verify results from temp view of index in n_flight", TestUtils.shouldRunTest) {
// create a temp table from Cloudant index and query it using sql syntax
val sparkSql = spark.sql(
s"""
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala
index dee6542..2904c25 100644
--- a/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala
+++ b/sql-cloudant/src/test/scala/org/apache/bahir/cloudant/TestUtils.scala
@@ -17,8 +17,6 @@
package org.apache.bahir.cloudant
-import java.io.File
-
object TestUtils {
// Set CouchDB/Cloudant host, username and password for local testing
private val host = System.getenv("CLOUDANT_HOST")
@@ -37,15 +35,6 @@ object TestUtils {
"n_flightsegment"
)
- def deleteRecursively(file: File): Unit = {
- if (file.isDirectory) {
- file.listFiles.foreach(deleteRecursively)
- }
- if (file.exists && !file.delete) {
- throw new Exception(s"Unable to delete ${file.getAbsolutePath}")
- }
- }
-
// default value is https for cloudant.com accounts
def getProtocol: String = {
if (protocol != null && !protocol.isEmpty) {
@@ -71,12 +60,11 @@ object TestUtils {
password
}
- lazy val shouldRunTests = {
+ def shouldRunTest(): Boolean = {
val isEnvSet = (username != null && !username.isEmpty) &&
(password != null && !password.isEmpty)
if (isEnvSet) {
// scalastyle:off println
- // Print this so that they are easily visible on the console and not hidden in the log4j logs.
println(
s"""
|Sql-cloudant tests that require Cloudant databases have been enabled by
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/pom.xml b/sql-streaming-akka/pom.xml
index 028b719..98586c7 100644
--- a/sql-streaming-akka/pom.xml
+++ b/sql-streaming-akka/pom.xml
@@ -36,6 +36,11 @@
<dependencies>
<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
deleted file mode 100644
index 996a0a1..0000000
--- a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bahir.utils
-
-import java.io.{File, IOException}
-import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
-import java.nio.file.attribute.BasicFileAttributes
-
-object BahirUtils extends Logging {
-
- def recursiveDeleteDir(dir: File): Path = {
- Files.walkFileTree(dir.toPath, new SimpleFileVisitor[Path]() {
- override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
- try {
- Files.delete(file)
- } catch {
- case t: Throwable => log.warn("Failed to delete", t)
- }
- FileVisitResult.CONTINUE
- }
-
- override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
- try {
- Files.delete(dir)
- } catch {
- case t: Throwable => log.warn("Failed to delete", t)
- }
- FileVisitResult.CONTINUE
- }
- })
- }
-}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
deleted file mode 100644
index 776ed5a..0000000
--- a/sql-streaming-akka/src/main/scala/org/apache/bahir/utils/Logging.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bahir.utils
-
-import org.slf4j.LoggerFactory
-
-trait Logging {
- final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
-}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
index f61b067..b04ed3c 100644
--- a/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
+++ b/sql-streaming-akka/src/test/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSourceSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.types.StructType
-import org.apache.bahir.utils.BahirUtils
+import org.apache.bahir.utils.FileHelper
class AkkaStreamSourceSuite extends SparkFunSuite with BeforeAndAfter {
@@ -50,7 +50,7 @@ class AkkaStreamSourceSuite extends SparkFunSuite with BeforeAndAfter {
after {
Persistence.close()
- BahirUtils.recursiveDeleteDir(tempDir)
+ FileHelper.deleteFileQuietly(tempDir)
}
protected val tmpDir: String = tempDir.getAbsolutePath
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/pom.xml b/sql-streaming-mqtt/pom.xml
index 242f24b..63497dc 100644
--- a/sql-streaming-mqtt/pom.xml
+++ b/sql-streaming-mqtt/pom.xml
@@ -36,6 +36,11 @@
<dependencies>
<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
index f825eea..fed2601 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
@@ -27,6 +27,7 @@ import scala.util.control.NonFatal
import org.apache.spark.SparkEnv
import org.apache.bahir.utils.Logging
+import org.apache.bahir.utils.Retry
private[mqtt] object CachedMQTTClient extends Logging {
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
index 8654b88..f449e57 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.bahir.utils.Logging
+import org.apache.bahir.utils.Retry
class MQTTStreamWriter (schema: StructType, parameters: DataSourceOptions)
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
index 79fe7a2..a615d28 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTUtils.scala
@@ -82,36 +82,3 @@ private[mqtt] object MQTTUtils extends Logging {
(brokerUrl, clientId, topic, persistence, mqttConnectOptions, qos)
}
}
-
-private[mqtt] object Retry {
- /**
- * Retry invocation of given code.
- * @param attempts Number of attempts to try executing given code. -1 represents infinity.
- * @param pauseMs Number of backoff milliseconds.
- * @param retryExceptions Types of exceptions to retry.
- * @param code Function to execute.
- * @tparam A Type parameter.
- * @return Returns result of function execution or exception in case of failure.
- */
- def apply[A](attempts: Int, pauseMs: Long, retryExceptions: Class[_]*)(code: => A): A = {
- var result: Option[A] = None
- var success = false
- var remaining = attempts
- while ( ! success ) {
- try {
- remaining -= 1
- result = Some( code )
- success = true
- }
- catch {
- case e: Exception =>
- if (retryExceptions.contains(e.getClass) && (attempts == -1 || remaining > 0)) {
- Thread.sleep(pauseMs)
- } else {
- throw e
- }
- }
- }
- result.get
- }
-}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
deleted file mode 100644
index 3d27b06..0000000
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/BahirUtils.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bahir.utils
-
-import java.io.{File, IOException}
-import java.nio.file.{Files, FileVisitResult, Path, SimpleFileVisitor}
-import java.nio.file.attribute.BasicFileAttributes
-
-object BahirUtils extends Logging {
-
- def recursiveDeleteDir(dir: File): Path = {
- Files.walkFileTree(dir.toPath, new SimpleFileVisitor[Path]() {
- override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
- try {
- Files.delete(file)
- } catch {
- case t: Throwable => log.warn("Failed to delete", t)
- }
- FileVisitResult.CONTINUE
- }
-
- override def postVisitDirectory(dir: Path, exc: IOException): FileVisitResult = {
- try {
- Files.delete(dir)
- } catch {
- case t: Throwable => log.warn("Failed to delete", t)
- }
- FileVisitResult.CONTINUE
- }
- })
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala
deleted file mode 100644
index cbe97e9..0000000
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/utils/Logging.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bahir.utils
-
-import org.slf4j.LoggerFactory
-
-
-trait Logging {
- final val log = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
-}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala
index d1bbe18..0b6b80b 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/LocalMessageStoreSuite.scala
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkFunSuite
-import org.apache.bahir.utils.BahirUtils
+import org.apache.bahir.utils.FileHelper
class LocalMessageStoreSuite extends SparkFunSuite with BeforeAndAfter {
@@ -48,7 +48,7 @@ class LocalMessageStoreSuite extends SparkFunSuite with BeforeAndAfter {
after {
persistence.clear()
persistence.close()
- BahirUtils.recursiveDeleteDir(tempDir)
+ FileHelper.deleteFileQuietly(tempDir)
}
test("serialize and deserialize") {
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
index 14ea962..d72ba17 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
-import org.apache.bahir.utils.BahirUtils
+import org.apache.bahir.utils.FileHelper
class MQTTStreamSinkSuite extends SparkFunSuite with SharedSparkContext with BeforeAndAfter {
@@ -57,7 +57,7 @@ class MQTTStreamSinkSuite extends SparkFunSuite with SharedSparkContext with Bef
testClient.disconnect()
testClient.close()
mqttTestUtils.teardown()
- BahirUtils.recursiveDeleteDir(tempDir)
+ FileHelper.deleteFileQuietly(tempDir)
}
protected def createContextAndDF(messages: String*): (SQLContext, DataFrame) = {
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
index bb82715..a7eb770 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery}
-import org.apache.bahir.utils.BahirUtils
+import org.apache.bahir.utils.FileHelper
class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with BeforeAndAfter {
@@ -50,7 +50,7 @@ class MQTTStreamSourceSuite extends SparkFunSuite with SharedSparkContext with B
after {
mqttTestUtils.teardown()
- BahirUtils.recursiveDeleteDir(tempDir)
+ FileHelper.deleteFileQuietly(tempDir)
}
protected val tmpDir: String = tempDir.getAbsolutePath
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-akka/pom.xml b/streaming-akka/pom.xml
index 7929f33..5b94c7a 100644
--- a/streaming-akka/pom.xml
+++ b/streaming-akka/pom.xml
@@ -36,6 +36,13 @@
<dependencies>
<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
----------------------------------------------------------------------
diff --git a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
index 4a6d578..1c8fd78 100644
--- a/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
+++ b/streaming-akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
@@ -21,50 +21,45 @@ import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.util.Timeout;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.Test;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import java.util.concurrent.TimeUnit;
-public class JavaAkkaUtilsSuite {
-
- @Test // tests the API, does not actually test data receiving
- public void testAkkaUtils() {
- JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
- try {
- JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
- jsc, Props.create(JavaTestActor.class), "test");
- JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
- jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
- jsc,
- Props.create(JavaTestActor.class),
- "test", StorageLevel.MEMORY_AND_DISK_SER_2(),
- new ActorSystemCreatorForTest(),
- SupervisorStrategy.defaultStrategy());
- } finally {
- jsc.stop();
+public class JavaAkkaUtilsSuite extends LocalJavaStreamingContext {
+ @Test
+ public void testAkkaUtils() {
+ // tests the API, does not actually test data receiving
+ JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
+ ssc, Props.create(JavaTestActor.class), "test"
+ );
+ JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
+ ssc, Props.create(JavaTestActor.class), "test",
+ StorageLevel.MEMORY_AND_DISK_SER_2()
+ );
+ JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
+ ssc, Props.create(JavaTestActor.class), "test",
+ StorageLevel.MEMORY_AND_DISK_SER_2(), new ActorSystemCreatorForTest(),
+ SupervisorStrategy.defaultStrategy()
+ );
}
- }
}
class ActorSystemCreatorForTest implements Function0<ActorSystem> {
- @Override
- public ActorSystem call() {
- return null;
- }
+ @Override
+ public ActorSystem call() {
+ return null;
+ }
}
-
class JavaTestActor extends JavaActorReceiver {
- @Override
- public void onReceive(Object message) throws Exception {
- store((String) message);
- store((String) message, new Timeout(1000, TimeUnit.MILLISECONDS));
- }
+ @Override
+ public void onReceive(Object message) throws Exception {
+ store((String) message);
+ store((String) message, new Timeout(1000, TimeUnit.MILLISECONDS));
+ }
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-mqtt/pom.xml b/streaming-mqtt/pom.xml
index f23aa83..0f6d809 100644
--- a/streaming-mqtt/pom.xml
+++ b/streaming-mqtt/pom.xml
@@ -36,6 +36,13 @@
<dependencies>
<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a..0000000
--- a/streaming-mqtt/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index 6ef551b..d86aa98 100644
--- a/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/streaming-mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -77,6 +77,7 @@ class MQTTStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter
}
ssc.stop()
}
+
test("mqtt input stream2") {
val sendMessage1 = "MQTT demo for spark streaming1"
val sendMessage2 = "MQTT demo for spark streaming2"
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubnub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubnub/pom.xml b/streaming-pubnub/pom.xml
index ac0b925..464cfce 100644
--- a/streaming-pubnub/pom.xml
+++ b/streaming-pubnub/pom.xml
@@ -35,6 +35,13 @@
<dependencies>
<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index 448fb5e..0000000
--- a/streaming-pubnub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubsub/pom.xml b/streaming-pubsub/pom.xml
index 0885152..f6ecd37 100644
--- a/streaming-pubsub/pom.xml
+++ b/streaming-pubsub/pom.xml
@@ -36,6 +36,13 @@
<dependencies>
<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a..0000000
--- a/streaming-pubsub/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
deleted file mode 100644
index acdceb7..0000000
--- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubFunSuite.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.pubsub
-
-import org.apache.spark.SparkFunSuite
-
-/**
- * Helper class that runs Google Cloud Pub/Sub real data transfer tests of
- * ignores them based on env variable is set or not.
- */
-trait PubsubFunSuite extends SparkFunSuite {
- import PubsubTestUtils._
-
- /** Run the test if environment variable is set or ignore the test */
- def testIfEnabled(testName: String)(testBody: => Unit) {
- if (shouldRunTests) {
- test(testName)(testBody)
- } else {
- ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody)
- }
- }
-
- /** Run the give body of code only if Kinesis tests are enabled */
- def runIfTestsEnabled(message: String)(body: => Unit): Unit = {
- if (shouldRunTests) {
- body
- } else {
- ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")(())
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
index d91c7e6..8f499cb 100644
--- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubStreamSuite.scala
@@ -25,11 +25,12 @@ import scala.language.postfixOps
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually
+import org.apache.spark.ConditionalSparkFunSuite
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
-class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAfter {
+class PubsubStreamSuite extends ConditionalSparkFunSuite with Eventually with BeforeAndAfter {
val batchDuration = Seconds(1)
@@ -50,7 +51,7 @@ class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAft
private var subForCreateFullName: String = null
override def beforeAll(): Unit = {
- runIfTestsEnabled("Prepare PubsubTestUtils") {
+ runIf(PubsubTestUtils.shouldRunTest) {
pubsubTestUtils = new PubsubTestUtils
topicFullName = pubsubTestUtils.getFullTopicPath(topicName)
subscriptionFullName = pubsubTestUtils.getFullSubscriptionPath(subscriptionName)
@@ -88,7 +89,7 @@ class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAft
PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
}
- testIfEnabled("pubsub input stream") {
+ testIf("pubsub input stream", PubsubTestUtils.shouldRunTest) {
val receiveStream = PubsubUtils.createStream(
ssc, PubsubTestUtils.projectId, Some(topicName), subscriptionName,
PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
@@ -112,7 +113,7 @@ class PubsubStreamSuite extends PubsubFunSuite with Eventually with BeforeAndAft
}
}
- testIfEnabled("pubsub input stream, create pubsub") {
+ testIf("pubsub input stream, create pubsub", PubsubTestUtils.shouldRunTest) {
val receiveStream = PubsubUtils.createStream(
ssc, PubsubTestUtils.projectId, Some(topicName), subForCreateName,
PubsubTestUtils.credential, StorageLevel.MEMORY_AND_DISK_SER_2)
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
----------------------------------------------------------------------
diff --git a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
index 9dd719a..39597ca 100644
--- a/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
+++ b/streaming-pubsub/src/test/scala/org/apache/spark/streaming/pubsub/PubsubTestUtils.scala
@@ -101,11 +101,10 @@ private[pubsub] object PubsubTestUtils {
val envVarNameForP12KeyPath = "GCP_TEST_P12_KEY_PATH"
val envVarNameForAccount = "GCP_TEST_ACCOUNT"
- lazy val shouldRunTests = {
- val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
+ def shouldRunTest(): Boolean = {
+ val isEnvSet = sys.env.get(envVarNameForEnablingTests).contains("1")
if (isEnvSet) {
// scalastyle:off println
- // Print this so that they are easily visible on the console and not hidden in the log4j logs.
println(
s"""
|Google Pub/Sub tests that actually send data has been enabled by setting the environment
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-twitter/pom.xml b/streaming-twitter/pom.xml
index 1f18100..2bf29b5 100644
--- a/streaming-twitter/pom.xml
+++ b/streaming-twitter/pom.xml
@@ -36,6 +36,13 @@
<dependencies>
<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a..0000000
--- a/streaming-twitter/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-zeromq/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-zeromq/pom.xml b/streaming-zeromq/pom.xml
index 92115f2..5aaf2fa 100644
--- a/streaming-zeromq/pom.xml
+++ b/streaming-zeromq/pom.xml
@@ -36,6 +36,13 @@
<dependencies>
<dependency>
+ <groupId>org.apache.bahir</groupId>
+ <artifactId>bahir-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/bahir/blob/0601698c/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
----------------------------------------------------------------------
diff --git a/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index f9cee96..0000000
--- a/streaming-zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- final SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
-