You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/01/28 17:11:51 UTC
bahir git commit: [BAHIR-107] Upgrade to Scala 2.12 and Spark 2.4.0
Repository: bahir
Updated Branches:
refs/heads/master 5cfd7ac31 -> c51853d13
[BAHIR-107] Upgrade to Scala 2.12 and Spark 2.4.0
Closes #76
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/c51853d1
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/c51853d1
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/c51853d1
Branch: refs/heads/master
Commit: c51853d135ad2d9da67804259f4ed0e29223afb3
Parents: 5cfd7ac
Author: Lukasz Antoniak <lu...@gmail.com>
Authored: Tue Dec 11 06:57:46 2018 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Mon Jan 28 09:08:09 2019 -0800
----------------------------------------------------------------------
common/pom.xml | 4 +-
dev/change-scala-version.sh | 13 +--
dev/release-build.sh | 97 +++++++++++---------
distribution/pom.xml | 4 +-
pom.xml | 76 ++++++++-------
sql-cloudant/README.md | 2 +-
sql-cloudant/pom.xml | 4 +-
.../apache/bahir/cloudant/DefaultSource.scala | 2 +-
sql-streaming-akka/README.md | 2 +-
sql-streaming-akka/pom.xml | 4 +-
.../sql/streaming/akka/AkkaStreamSource.scala | 19 ++--
sql-streaming-mqtt/README.md | 2 +-
sql-streaming-mqtt/pom.xml | 4 +-
.../sql/streaming/mqtt/CachedMQTTClient.scala | 12 ++-
.../sql/streaming/mqtt/MQTTStreamSink.scala | 94 ++++++++-----------
.../sql/streaming/mqtt/MQTTStreamSource.scala | 15 +--
.../sql/mqtt/HdfsBasedMQTTStreamSource.scala | 6 +-
.../streaming/mqtt/MQTTStreamSinkSuite.scala | 32 +++----
.../streaming/mqtt/MQTTStreamSourceSuite.scala | 9 +-
streaming-akka/README.md | 2 +-
streaming-akka/pom.xml | 4 +-
streaming-mqtt/README.md | 2 +-
streaming-mqtt/pom.xml | 4 +-
streaming-pubnub/pom.xml | 4 +-
streaming-pubsub/pom.xml | 4 +-
streaming-twitter/README.md | 2 +-
streaming-twitter/pom.xml | 6 +-
streaming-zeromq/README.md | 2 +-
streaming-zeromq/pom.xml | 4 +-
29 files changed, 218 insertions(+), 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index d7757bb..0e443ec 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-common_2.11</artifactId>
+ <artifactId>bahir-common_2.12</artifactId>
<properties>
<sbt.project.name>bahir-common</sbt.project.name>
</properties>
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/dev/change-scala-version.sh
----------------------------------------------------------------------
diff --git a/dev/change-scala-version.sh b/dev/change-scala-version.sh
index 7203ee7..09c97e2 100755
--- a/dev/change-scala-version.sh
+++ b/dev/change-scala-version.sh
@@ -1,4 +1,5 @@
#!/usr/bin/env bash
+
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -18,7 +19,7 @@
set -e
-VALID_VERSIONS=( 2.10 2.11 )
+VALID_VERSIONS=( 2.11 2.12 )
usage() {
echo "Usage: $(basename $0) [-h|--help] <version>
@@ -43,10 +44,10 @@ check_scala_version() {
check_scala_version "$TO_VERSION"
-if [ $TO_VERSION = "2.11" ]; then
- FROM_VERSION="2.10"
-else
+if [ $TO_VERSION = "2.12" ]; then
FROM_VERSION="2.11"
+else
+ FROM_VERSION="2.12"
fi
sed_i() {
@@ -59,7 +60,7 @@ BASEDIR=$(dirname $0)/..
find "$BASEDIR" -name 'pom.xml' -not -path '*target*' -print \
-exec bash -c "sed_i 's/\(artifactId.*\)_'$FROM_VERSION'/\1_'$TO_VERSION'/g' {}" \;
-# Also update <scala.binary.version> in parent POM
-# Match any scala binary version to ensure idempotency
+# also update <scala.binary.version> in parent POM
+# match any scala binary version to ensure idempotency
sed_i '1,/<scala\.binary\.version>[0-9]*\.[0-9]*</s/<scala\.binary\.version>[0-9]*\.[0-9]*</<scala.binary.version>'$TO_VERSION'</' \
"$BASEDIR/pom.xml"
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/dev/release-build.sh
----------------------------------------------------------------------
diff --git a/dev/release-build.sh b/dev/release-build.sh
index a2dc5e2..89f0a52 100755
--- a/dev/release-build.sh
+++ b/dev/release-build.sh
@@ -38,11 +38,11 @@ to the staging release location.
--release-publish --tag="v2.3.0-rc1"
Publish the maven artifacts of a release to the Apache staging maven repository.
-Note that this will publish both Scala 2.10 and 2.11 artifacts.
+Note that this will publish both Scala 2.11 and 2.12 artifacts.
--release-snapshot
Publish the maven snapshot artifacts to Apache snapshots maven repository
-Note that this will publish both Scala 2.10 and 2.11 artifacts.
+Note that this will publish both Scala 2.11 and 2.12 artifacts.
OPTIONS
@@ -78,7 +78,7 @@ if [ $# -eq 0 ]; then
fi
-# Process each provided argument configuration
+# process each provided argument configuration
while [ "${1+defined}" ]; do
IFS="=" read -ra PARTS <<< "$1"
case "${PARTS[0]}" in
@@ -134,7 +134,7 @@ while [ "${1+defined}" ]; do
echo "Error: Unknown option: $1" >&2
exit 1
;;
- *) # No more options
+ *) # no more options
break
;;
esac
@@ -160,7 +160,7 @@ fi
if [[ "$RELEASE_PUBLISH" == "true" ]]; then
if [[ "$GIT_REF" && "$GIT_TAG" ]]; then
- echo "ERROR: Only one argumented permitted when publishing : --gitCommitHash or --gitTag"
+ echo "ERROR: Only one argument permitted when publishing : --gitCommitHash or --gitTag"
exit_with_usage
fi
if [[ -z "$GIT_REF" && -z "$GIT_TAG" ]]; then
@@ -175,11 +175,11 @@ if [[ "$RELEASE_PUBLISH" == "true" && "$DRY_RUN" ]]; then
fi
if [[ "$RELEASE_SNAPSHOT" == "true" && "$DRY_RUN" ]]; then
- echo "ERROR: --dryRun not supported for --release-publish"
+ echo "ERROR: --dryRun not supported for --release-snapshot"
exit_with_usage
fi
-# Commit ref to checkout when building
+# commit ref to checkout when building
GIT_REF=${GIT_REF:-master}
if [[ "$RELEASE_PUBLISH" == "true" && "$GIT_TAG" ]]; then
GIT_REF="tags/$GIT_TAG"
@@ -200,11 +200,10 @@ fi
RELEASE_STAGING_LOCATION="https://dist.apache.org/repos/dist/dev/bahir/bahir-spark"
-
echo " "
-echo "-------------------------------------------------------------"
-echo "------- Release preparation with the following parameters ---"
-echo "-------------------------------------------------------------"
+echo "-----------------------------------------------------------------"
+echo "------- Release preparation with the following parameters -------"
+echo "-----------------------------------------------------------------"
echo "Executing ==> $GOAL"
echo "Git reference ==> $GIT_REF"
echo "release version ==> $RELEASE_VERSION"
@@ -220,7 +219,6 @@ echo $RELEASE_STAGING_LOCATION
echo " "
function checkout_code {
- # Checkout code
rm -rf target
mkdir target
cd target
@@ -231,33 +229,41 @@ function checkout_code {
git_hash=`git rev-parse --short HEAD`
echo "Checked out Bahir git hash $git_hash"
- cd "$BASE_DIR" #return to base dir
+ cd "$BASE_DIR" # return to base dir
}
if [[ "$RELEASE_PREPARE" == "true" ]]; then
echo "Preparing release $RELEASE_VERSION"
- # Checkout code
+ # checkout code
checkout_code
cd target/bahir
- # Build and prepare the release
- $MVN $PUBLISH_PROFILES release:clean release:prepare $DRY_RUN -Darguments="-Dgpg.passphrase=\"$GPG_PASSPHRASE\" -DskipTests" -DreleaseVersion="$RELEASE_VERSION" -DdevelopmentVersion="$DEVELOPMENT_VERSION" -Dtag="$RELEASE_TAG"
+ # test with scala 2.11 and 2.12
+ ./dev/change-scala-version.sh 2.11
+ $MVN $PUBLISH_PROFILES clean test -Dscala-2.11 || exit 1
+ ./dev/change-scala-version.sh 2.12
+ $MVN $PUBLISH_PROFILES clean test || exit 1
+
+ # build and prepare the release
+ $MVN $PUBLISH_PROFILES release:clean release:prepare $DRY_RUN \
+ -DskipTests=true -Dgpg.passphrase="$GPG_PASSPHRASE" \
+ -DreleaseVersion="$RELEASE_VERSION" -DdevelopmentVersion="$DEVELOPMENT_VERSION" -Dtag="$RELEASE_TAG"
- cd .. #exit bahir
+ cd .. # exit bahir
if [ -z "$DRY_RUN" ]; then
cd "$BASE_DIR/target/bahir"
git checkout $RELEASE_TAG
git clean -d -f -x
- $MVN $PUBLISH_PROFILES clean install -DskiptTests -Darguments="-DskipTests"
+ $MVN $PUBLISH_PROFILES clean install -DskipTests=true
cd "$BASE_DIR/target"
svn co $RELEASE_STAGING_LOCATION svn-bahir
mkdir -p svn-bahir/$RELEASE_VERSION-$RELEASE_RC
cp bahir/distribution/target/*.tar.gz svn-bahir/$RELEASE_VERSION-$RELEASE_RC/
- cp bahir/distribution/target/*.zip svn-bahir/$RELEASE_VERSION-$RELEASE_RC/
+ cp bahir/distribution/target/*.zip svn-bahir/$RELEASE_VERSION-$RELEASE_RC/
cd svn-bahir/$RELEASE_VERSION-$RELEASE_RC/
rm -f *.asc
@@ -265,49 +271,50 @@ if [[ "$RELEASE_PREPARE" == "true" ]]; then
rm -f *.sha*
for i in *.zip *.tar.gz; do shasum --algorithm 512 $i > $i.sha512; done
- cd .. #exit $RELEASE_VERSION-$RELEASE_RC/
+ cd .. # exit $RELEASE_VERSION-$RELEASE_RC
svn add $RELEASE_VERSION-$RELEASE_RC/
svn ci -m"Apache Bahir $RELEASE_VERSION-$RELEASE_RC"
fi
-
- cd "$BASE_DIR" #exit target
-
+ cd "$BASE_DIR" # exit target
exit 0
fi
if [[ "$RELEASE_PUBLISH" == "true" ]]; then
echo "Preparing release $RELEASE_VERSION"
- # Checkout code
+ # checkout code
checkout_code
cd target/bahir
- #Deploy default scala 2.11
- mvn $PUBLISH_PROFILES -DaltDeploymentRepository=apache.releases.https::default::https://repository.apache.org/service/local/staging/deploy/maven2 clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dgpg.passphrase=$GPG_PASSPHRASE
-
- #mvn clean
+ DEPLOYMENT_REPOSITORY="apache.releases.https::default::https://repository.apache.org/service/local/staging/deploy/maven2"
- #Deploy scala 2.10
- #./dev/change-scala-version.sh 2.10
- #mvn $PUBLISH_PROFILES -DaltDeploymentRepository=apache.releases.https::default::https://repository.apache.org/service/local/staging/deploy/maven2 clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dscala-2.10 -Dgpg.passphrase=$GPG_PASSPHRASE
+ # deploy default scala 2.12
+ $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \
+ -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \
+ -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE
- cd "$BASE_DIR" #exit target
+ # deploy scala 2.11
+ ./dev/change-scala-version.sh 2.11
+ $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \
+ -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \
+ -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE -Dscala-2.11
+ cd "$BASE_DIR" # exit target
exit 0
fi
if [[ "$RELEASE_SNAPSHOT" == "true" ]]; then
- # Checkout code
+ # checkout code
checkout_code
cd target/bahir
- CURRENT_VERSION=$($MVN help:evaluate -Dexpression=project.version \
- | grep -v INFO | grep -v WARNING | grep -v Download)
+ DEPLOYMENT_REPOSITORY="apache.snapshots.https::default::https://repository.apache.org/content/repositories/snapshots"
+ CURRENT_VERSION=$($MVN help:evaluate -Dexpression=project.version | grep -v INFO | grep -v WARNING | grep -v Download)
- # Publish Bahir Snapshots to Maven snapshot repo
+ # publish Bahir snapshots to maven repository
echo "Deploying Bahir SNAPSHOT at '$GIT_REF' ($git_hash)"
echo "Publish version is $CURRENT_VERSION"
if [[ ! $CURRENT_VERSION == *"SNAPSHOT"* ]]; then
@@ -316,19 +323,23 @@ if [[ "$RELEASE_SNAPSHOT" == "true" ]]; then
exit 1
fi
- #Deploy default scala 2.11
- $MVN $PUBLISH_PROFILES -DaltDeploymentRepository=apache.snapshots.https::default::https://repository.apache.org/content/repositories/snapshots clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dgpg.passphrase=$GPG_PASSPHRASE
+ # deploy default scala 2.12
+ $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \
+ -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \
+ -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE
- #Deploy scala 2.10
- ./dev/change-scala-version.sh 2.10
- $MVN $PUBLISH_PROFILES -DaltDeploymentRepository=apache.snapshots.https::default::https://repository.apache.org/content/repositories/snapshots clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dscala-2.10 -Dgpg.passphrase=$GPG_PASSPHRASE
+ # deploy scala 2.11
+ ./dev/change-scala-version.sh 2.11
+ $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \
+ -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \
+ -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE -Dscala-2.11
- cd "$BASE_DIR" #exit target
+ cd "$BASE_DIR" # exit target
exit 0
fi
-cd "$BASE_DIR" #return to base dir
+cd "$BASE_DIR" # return to base directory
rm -rf target
echo "ERROR: wrong execution goals"
exit_with_usage
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 18ba854..05a311f 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-spark-distribution_2.11</artifactId>
+ <artifactId>bahir-spark-distribution_2.12</artifactId>
<packaging>pom</packaging>
<name>Apache Bahir - Spark Extensions Distribution</name>
<url>http://bahir.apache.org/</url>
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af70a76..ec6b353 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@
<version>18</version>
</parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<version>2.4.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache Bahir - Parent POM</name>
@@ -90,18 +90,19 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <skipTests>false</skipTests>
<!-- General project dependencies version -->
<java.version>1.8</java.version>
<maven.version>3.3.9</maven.version>
- <scala.version>2.11.12</scala.version>
- <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.12.7</scala.version>
+ <scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<!-- Spark version -->
- <spark.version>2.3.2</spark.version>
+ <spark.version>2.4.0</spark.version>
<!-- Hadoop version -->
<hadoop.version>2.6.5</hadoop.version>
@@ -179,7 +180,7 @@
published POMs are flattened and do not contain variables. Without this dependency, some
subprojects' published POMs would contain variables like ${scala.binary.version} that will
be substituted according to the default properties instead of the ones determined by the
- profiles that were active during publishing, causing the Scala 2.10 build's POMs to have 2.11
+ profiles that were active during publishing, causing the Scala 2.11 build's POMs to have 2.12
dependencies due to the incorrect substitutions. By ensuring that maven-shade runs for all
subprojects, we eliminate this problem because the substitutions are baked into the final POM.
@@ -204,6 +205,17 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <!--
+ Temporary workaround. Spark Core 2.4.0 with Scala 2.12 depends on Apache Avro 1.8.2,
+ which pulls in transitive dependency of Paranamer 2.7. All integration tests fail
+ with error message same as SPARK-22128. Manually upgrading Paranamer to 2.8.
+ -->
+ <dependency>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer</artifactId>
+ <version>2.8</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<dependencyManagement>
@@ -213,6 +225,11 @@
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
+ <!-- Temporary fix, see above. -->
+ <exclusion>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer</artifactId>
+ </exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-plus</artifactId>
@@ -308,7 +325,7 @@
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- <version>3.2.11</version>
+ <version>3.5.3</version>
</dependency>
<dependency>
@@ -982,6 +999,9 @@
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <skipTests>${skipTests}</skipTests>
+ </configuration>
</plugin>
<!-- Build test-jar's for all projects, since some projects depend on tests from others -->
<plugin>
@@ -1040,33 +1060,20 @@
<profile>
<id>distribution</id>
-
<modules>
<module>distribution</module>
</modules>
</profile>
- <!--
<profile>
- <id>scala-2.10</id>
+ <id>scala-2.11</id>
<activation>
- <property><name>scala-2.10</name></property>
+ <property><name>scala-2.11</name></property>
</activation>
<properties>
- <scala.version>2.10.6</scala.version>
- <scala.binary.version>2.10</scala.binary.version>
- <jline.version>${scala.version}</jline.version>
- <jline.groupid>org.scala-lang</jline.groupid>
+ <scala.version>2.11.12</scala.version>
+ <scala.binary.version>2.11</scala.binary.version>
</properties>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>${jline.groupid}</groupId>
- <artifactId>jline</artifactId>
- <version>${jline.version}</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
<build>
<plugins>
<plugin>
@@ -1082,7 +1089,7 @@
<rules>
<bannedDependencies>
<excludes combine.children="append">
- <exclude>*:*_2.11</exclude>
+ <exclude>*:*_2.12</exclude>
</excludes>
</bannedDependencies>
</rules>
@@ -1095,13 +1102,13 @@
</profile>
<profile>
- <id>scala-2.11</id>
+ <id>scala-2.12</id>
<activation>
- <property><name>!scala-2.10</name></property>
+ <property><name>!scala-2.11</name></property>
</activation>
<properties>
- <scala.version>2.11.8</scala.version>
- <scala.binary.version>2.11</scala.binary.version>
+ <scala.version>2.12.7</scala.version>
+ <scala.binary.version>2.12</scala.binary.version>
</properties>
<build>
<plugins>
@@ -1118,7 +1125,7 @@
<rules>
<bannedDependencies>
<excludes combine.children="append">
- <exclude>*:*_2.10</exclude>
+ <exclude>*:*_2.11</exclude>
</excludes>
</bannedDependencies>
</rules>
@@ -1129,17 +1136,6 @@
</plugins>
</build>
</profile>
- -->
-
- <profile>
- <id>test-java-home</id>
- <activation>
- <property><name>env.JAVA_HOME</name></property>
- </activation>
- <properties>
- <test.java.home>${env.JAVA_HOME}</test.java.home>
- </properties>
- </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-cloudant/README.md
----------------------------------------------------------------------
diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md
index b651990..d315d7f 100644
--- a/sql-cloudant/README.md
+++ b/sql-cloudant/README.md
@@ -37,7 +37,7 @@ Submit a job in Scala:
spark-submit --class "<your class>" --master local[4] --packages org.apache.bahir:spark-sql-cloudant__{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}} <path to spark-sql-cloudant jar>
-This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
## Configuration options
The configuration is obtained in the following sequence:
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-cloudant/pom.xml
----------------------------------------------------------------------
diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml
index d81232a..a14862b 100644
--- a/sql-cloudant/pom.xml
+++ b/sql-cloudant/pom.xml
@@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>spark-sql-cloudant_2.11</artifactId>
+ <artifactId>spark-sql-cloudant_2.12</artifactId>
<properties>
<sbt.project.name>spark-sql-cloudant</sbt.project.name>
</properties>
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index 47643cc..84babdd 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -76,7 +76,7 @@ case class CloudantReadWriteRelation (config: CloudantConfig,
logger.warn("Database " + config.getDbname +
": nothing was saved because the number of records was 0!")
} else {
- val result = data.toJSON.foreachPartition { x =>
+ data.toJSON.foreachPartition { x: Iterator[String] =>
val list = x.toList // Has to pass as List, Iterator results in 0 data
dataAccess.saveAll(list)
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-akka/README.md
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/README.md b/sql-streaming-akka/README.md
index 29685ee..a29979b 100644
--- a/sql-streaming-akka/README.md
+++ b/sql-streaming-akka/README.md
@@ -22,7 +22,7 @@ For example, to include it when starting the spark shell:
Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
The `--packages` argument can also be used with `bin/spark-submit`.
-This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
## Examples
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/pom.xml b/sql-streaming-akka/pom.xml
index 98586c7..6d50325 100644
--- a/sql-streaming-akka/pom.xml
+++ b/sql-streaming-akka/pom.xml
@@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>spark-sql-streaming-akka_2.11</artifactId>
+ <artifactId>spark-sql-streaming-akka_2.12</artifactId>
<properties>
<sbt.project.name>sql-streaming-akka</sbt.project.name>
</properties>
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
index 3f2101c..f20a917 100644
--- a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
@@ -41,10 +41,12 @@ import com.typesafe.config.ConfigFactory
import org.rocksdb.{Options, RocksDB}
import org.apache.spark.SparkEnv
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.UTF8StringBuilder
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport}
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
@@ -262,7 +264,7 @@ class AkkaMicroBatchReader(urlOfPublisher: String,
override def readSchema(): StructType = AkkaStreamConstants.SCHEMA_DEFAULT
- override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = {
+ override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
assert(startOffset != null && endOffset != null,
"start offset and end offset should already be set before create read tasks.")
@@ -283,8 +285,9 @@ class AkkaMicroBatchReader(urlOfPublisher: String,
(0 until numPartitions).map { i =>
val slice = slices(i)
- new DataReaderFactory[Row] {
- override def createDataReader(): DataReader[Row] = new DataReader[Row] {
+ new InputPartition[InternalRow] {
+ override def createPartitionReader(): InputPartitionReader[InternalRow] =
+ new InputPartitionReader[InternalRow] {
private var currentIdx = -1
override def next(): Boolean = {
@@ -292,8 +295,10 @@ class AkkaMicroBatchReader(urlOfPublisher: String,
currentIdx < slice.size
}
- override def get(): Row = {
- Row.fromTuple(slice(currentIdx))
+ override def get(): InternalRow = {
+ val builder = new UTF8StringBuilder()
+ builder.append(slice(currentIdx)._1)
+ InternalRow(builder.build(), slice(currentIdx)._2.getTime)
}
override def close(): Unit = {}
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/README.md b/sql-streaming-mqtt/README.md
index 721b544..0fbf63e 100644
--- a/sql-streaming-mqtt/README.md
+++ b/sql-streaming-mqtt/README.md
@@ -22,7 +22,7 @@ For example, to include it when starting the spark shell:
Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
The `--packages` argument can also be used with `bin/spark-submit`.
-This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
## Examples
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/pom.xml b/sql-streaming-mqtt/pom.xml
index 3f818f8..53ccc32 100644
--- a/sql-streaming-mqtt/pom.xml
+++ b/sql-streaming-mqtt/pom.xml
@@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>spark-sql-streaming-mqtt_2.11</artifactId>
+ <artifactId>spark-sql-streaming-mqtt_2.12</artifactId>
<properties>
<sbt.project.name>sql-streaming-mqtt</sbt.project.name>
</properties>
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
index 8925e93..78eae52 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
@@ -94,8 +94,16 @@ private[mqtt] object CachedMQTTClient extends Logging {
private def closeMqttClient(params: Seq[(String, String)],
client: MqttClient, persistence: MqttClientPersistence): Unit = {
try {
- client.disconnect()
- persistence.close()
+ if (client.isConnected) {
+ client.disconnect()
+ }
+ try {
+ persistence.close()
+ } catch {
+ case NonFatal(e) => log.warn(
+ s"Error while closing MQTT persistent store ${e.getMessage}", e
+ )
+ }
client.close()
} catch {
case NonFatal(e) => log.warn(s"Error while closing MQTT client ${e.getMessage}", e)
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
index 846765c..23385f4 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
@@ -17,18 +17,17 @@
package org.apache.bahir.sql.streaming.mqtt
-import java.nio.charset.Charset
-
import scala.collection.JavaConverters._
+import scala.collection.mutable
import org.eclipse.paho.client.mqttv3.MqttException
import org.apache.spark.SparkEnv
-import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext}
-import org.apache.spark.sql.execution.streaming.sources.{PackedRowCommitMessage, PackedRowWriterFactory}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamWriteSupport}
-import org.apache.spark.sql.sources.v2.writer.{DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@@ -39,67 +38,50 @@ import org.apache.bahir.utils.Retry
class MQTTStreamWriter (schema: StructType, parameters: DataSourceOptions)
extends StreamWriter with Logging {
- private lazy val publishAttempts: Int =
- SparkEnv.get.conf.getInt("spark.mqtt.client.publish.attempts", -1)
- private lazy val publishBackoff: Long =
- SparkEnv.get.conf.getTimeAsMs("spark.mqtt.client.publish.backoff", "5s")
+ override def createWriterFactory(): DataWriterFactory[InternalRow] = {
+ // Skipping client identifier as single batch can be distributed to multiple
+ // Spark worker process. MQTT server does not support two connections
+ // declaring same client ID at given point in time.
+ val params = parameters.asMap().asScala.filterNot(
+ _._1.equalsIgnoreCase("clientId")
+ )
+ MQTTDataWriterFactory(params)
+ }
- assert(SparkSession.getActiveSession.isDefined)
- private val spark = SparkSession.getActiveSession.get
+ override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
- private var topic: String = _
- private var qos: Int = -1
+ override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+}
- initialize()
- private def initialize(): Unit = {
- val (_, _, topic_, _, _, qos_, _, _, _) = MQTTUtils.parseConfigParams(
- collection.immutable.HashMap() ++ parameters.asMap().asScala
- )
- topic = topic_
- qos = qos_
- }
+case class MQTTDataWriterFactory(config: mutable.Map[String, String])
+ extends DataWriterFactory[InternalRow] {
+ override def createDataWriter(
+ partitionId: Int, taskId: Long, epochId: Long
+ ): DataWriter[InternalRow] = new MQTTDataWriter(config)
+}
- override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory
+case object MQTTWriterCommitMessage extends WriterCommitMessage
- override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
- commit(messages)
- }
+class MQTTDataWriter(config: mutable.Map[String, String]) extends DataWriter[InternalRow] {
+ private lazy val publishAttempts: Int =
+ SparkEnv.get.conf.getInt("spark.mqtt.client.publish.attempts", -1)
+ private lazy val publishBackoff: Long =
+ SparkEnv.get.conf.getTimeAsMs("spark.mqtt.client.publish.backoff", "5s")
- override def commit(messages: Array[WriterCommitMessage]): Unit = {
- val rows = messages.collect {
- case PackedRowCommitMessage(rs) => rs
- }.flatten
+ private lazy val (_, _, topic, _, _, qos, _, _, _) = MQTTUtils.parseConfigParams(config.toMap)
- // Skipping client identifier as single batch can be distributed to multiple
- // Spark worker process. MQTT server does not support two connections
- // declaring same client ID at given point in time.
- val params_ = Seq() ++ parameters.asMap().asScala.toSeq.filterNot(
- _._1.equalsIgnoreCase("clientId")
- )
- // IMPL Note: Had to declare new value reference due to serialization requirements.
- val topic_ = topic
- val qos_ = qos
- val publishAttempts_ = publishAttempts
- val publishBackoff_ = publishBackoff
-
- val data = spark.createDataFrame(rows.toList.asJava, schema)
- data.foreachPartition (
- iterator => iterator.foreach(
- row => {
- val client = CachedMQTTClient.getOrCreate(params_.toMap)
- val message = row.mkString.getBytes(Charset.defaultCharset())
- Retry(publishAttempts_, publishBackoff_, classOf[MqttException]) {
- // In case of errors, retry sending the message.
- client.publish(topic_, message, qos_, false)
- }
- }
- )
- )
+ override def write(record: InternalRow): Unit = {
+ val client = CachedMQTTClient.getOrCreate(config.toMap)
+ val message = record.getBinary(0)
+ Retry(publishAttempts, publishBackoff, classOf[MqttException]) {
+ // In case of errors, retry sending the message.
+ client.publish(topic, message, qos, false)
+ }
}
- override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+ override def commit(): WriterCommitMessage = MQTTWriterCommitMessage
- override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+ override def abort(): Unit = {}
}
case class MQTTRelation(override val sqlContext: SQLContext, data: DataFrame)
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
index 7146ecc..e1314ae 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
@@ -17,6 +17,7 @@
package org.apache.bahir.sql.streaming.mqtt
+import java.{util => jutil}
import java.nio.charset.Charset
import java.sql.Timestamp
import java.text.SimpleDateFormat
@@ -31,9 +32,10 @@ import scala.collection.mutable.ListBuffer
import org.eclipse.paho.client.mqttv3._
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport}
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.types._
@@ -169,7 +171,7 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
MQTTStreamConstants.SCHEMA_DEFAULT
}
- override def createDataReaderFactories(): java.util.List[DataReaderFactory[Row]] = {
+ override def planInputPartitions(): jutil.List[InputPartition[InternalRow]] = {
val rawList: IndexedSeq[MQTTMessage] = synchronized {
val sliceStart = LongOffset.convert(startOffset).get.offset + 1
val sliceEnd = LongOffset.convert(endOffset).get.offset + 1
@@ -186,8 +188,9 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
(0 until numPartitions).map { i =>
val slice = slices(i)
- new DataReaderFactory[Row] {
- override def createDataReader(): DataReader[Row] = new DataReader[Row] {
+ new InputPartition[InternalRow] {
+ override def createPartitionReader(): InputPartitionReader[InternalRow] =
+ new InputPartitionReader[InternalRow] {
private var currentIdx = -1
override def next(): Boolean = {
@@ -195,8 +198,8 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
currentIdx < slice.size
}
- override def get(): Row = {
- Row(slice(currentIdx).id, slice(currentIdx).topic,
+ override def get(): InternalRow = {
+ InternalRow(slice(currentIdx).id, slice(currentIdx).topic,
slice(currentIdx).payload, slice(currentIdx).timestamp)
}
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala
index e6e202b..fd39557 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.mqtt
import java.io.IOException
-import java.sql.Timestamp
import java.util.Calendar
import java.util.concurrent.locks.{Lock, ReentrantLock}
@@ -28,10 +27,9 @@ import org.eclipse.paho.client.mqttv3._
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.FileContextManager
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
@@ -191,7 +189,7 @@ class HdfsBasedMQTTStreamSource(
// recover message data file offset from hdfs
val dataPath = new Path(receivedDataPath)
if (fs.exists(dataPath)) {
- val fileManager = new FileContextManager(dataPath, hadoopConfig)
+ val fileManager = CheckpointFileManager.create(dataPath, hadoopConfig)
val dataFileIndexs = fileManager.list(dataPath, new PathFilter {
private def isBatchFile(path: Path) = {
try {
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
index ecdd942..ab24cb3 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
@@ -22,19 +22,17 @@ import java.net.ConnectException
import java.util
import org.eclipse.paho.client.mqttv3.MqttClient
+import org.eclipse.paho.client.mqttv3.MqttException
import org.scalatest.BeforeAndAfter
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future
-import org.apache.spark.{SharedSparkContext, SparkEnv, SparkException, SparkFunSuite}
-import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.{SharedSparkContext, SparkEnv, SparkFunSuite}
+import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
import org.apache.spark.sql.execution.streaming.MemoryStream
-import org.apache.spark.sql.execution.streaming.sources.PackedRowCommitMessage
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.bahir.utils.FileHelper
@@ -56,7 +54,8 @@ class MQTTStreamSinkSuite(_ssl: Boolean) extends SparkFunSuite
}
after {
- testClient.disconnect()
+ CachedMQTTClient.clear()
+ testClient.disconnectForcibly()
testClient.close()
mqttTestUtils.teardown()
FileHelper.deleteFileQuietly(tempDir)
@@ -95,15 +94,12 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) {
"topic" -> "test",
"localStorage" -> tempDir.getAbsoluteFile.toString
)
- val schema = StructType(StructField("value", StringType) :: Nil)
- val messages : Array[Row] = Array(new GenericRowWithSchema(Array("value1"), schema))
- val thrown: Exception = intercept[SparkException] {
+ val thrown: Exception = intercept[MqttException] {
provider.createStreamWriter(
- "query1", schema, OutputMode.Complete(), new DataSourceOptions(parameters.asJava)
- ).commit(1, Array(PackedRowCommitMessage(messages)))
+ "query1", null, OutputMode.Complete(), new DataSourceOptions(parameters.asJava)
+ ).createWriterFactory().createDataWriter(1, 1, 1).write(null)
}
- // SparkException -> MqttException -> ConnectException
- assert(thrown.getCause.getCause.isInstanceOf[ConnectException])
+ assert(thrown.getCause.isInstanceOf[ConnectException])
}
test("basic usage") {
@@ -111,7 +107,7 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) {
val msg2 = "MQTT is a message queue."
val (_, dataFrame) = createContextAndDF(msg1, msg2)
- sendToMQTT(dataFrame).awaitTermination(3000)
+ sendToMQTT(dataFrame).awaitTermination(5000)
assert(Set(msg1, msg2).equals(messages.values.toSet))
}
@@ -120,7 +116,7 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) {
val msg = List.tabulate(100)(n => "Hello, World!" + n)
val (_, dataFrame) = createContextAndDF(msg: _*)
- sendToMQTT(dataFrame).awaitTermination(3000)
+ sendToMQTT(dataFrame).awaitTermination(5000)
assert(Set(msg: _*).equals(messages.values.toSet))
}
@@ -134,13 +130,13 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) {
intercept[IllegalArgumentException] {
provider.createStreamWriter(
"query1", null, OutputMode.Complete(), new DataSourceOptions(parameters.asJava)
- )
+ ).createWriterFactory().createDataWriter(1, 1, 1).write(null)
}
intercept[IllegalArgumentException] {
provider.createStreamWriter(
"query1", null, OutputMode.Complete(),
new DataSourceOptions(new util.HashMap[String, String])
- )
+ ).createWriterFactory().createDataWriter(1, 1, 1).write(null)
}
}
}
@@ -163,7 +159,7 @@ class StressTestMQTTSink extends MQTTStreamSinkSuite(false) {
val freeMemory: Long = Runtime.getRuntime.freeMemory()
log.info(s"Available memory before test run is ${freeMemory / (1024 * 1024)}MB.")
val noOfMsgs: Int = 200
- val noOfBatches: Int = 10
+ val noOfBatches: Int = 5
val messageBuilder = new StringBuilder()
for (i <- 0 until (500 * 1024)) yield messageBuilder.append(((i % 26) + 65).toChar)
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
index c4e340c..39cf0df 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
@@ -33,6 +33,7 @@ import org.scalatest.time.Span
import org.apache.spark.{SharedSparkContext, SparkFunSuite}
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery}
@@ -157,15 +158,15 @@ class BasicMQTTSourceSuite extends MQTTStreamSourceSuite {
// Clear in-memory cache to simulate recovery.
source.messages.clear()
source.setOffsetRange(Optional.empty(), Optional.empty())
- var message: Row = null
- for (f <- source.createDataReaderFactories().asScala) {
- val dataReader = f.createDataReader()
+ var message: InternalRow = null
+ for (f <- source.planInputPartitions().asScala) {
+ val dataReader = f.createPartitionReader()
if (dataReader.next()) {
message = dataReader.get()
}
}
source.commit(source.getCurrentOffset)
- assert(payload == new String(message.getAs[Array[Byte]](2), "UTF-8"))
+ assert(payload == new String(message.getBinary(2), "UTF-8"))
}
test("no server up") {
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-akka/README.md
----------------------------------------------------------------------
diff --git a/streaming-akka/README.md b/streaming-akka/README.md
index f57583e..bff9c25 100644
--- a/streaming-akka/README.md
+++ b/streaming-akka/README.md
@@ -23,7 +23,7 @@ For example, to include it when starting the spark shell:
Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
The `--packages` argument can also be used with `bin/spark-submit`.
-This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
## Examples
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-akka/pom.xml b/streaming-akka/pom.xml
index 5b94c7a..0d4e42f 100644
--- a/streaming-akka/pom.xml
+++ b/streaming-akka/pom.xml
@@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>spark-streaming-akka_2.11</artifactId>
+ <artifactId>spark-streaming-akka_2.12</artifactId>
<properties>
<sbt.project.name>streaming-akka</sbt.project.name>
</properties>
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md
index 05542de..811f822 100644
--- a/streaming-mqtt/README.md
+++ b/streaming-mqtt/README.md
@@ -23,7 +23,7 @@ For example, to include it when starting the spark shell:
Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
The `--packages` argument can also be used with `bin/spark-submit`.
-This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
## Configuration options.
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-mqtt/pom.xml b/streaming-mqtt/pom.xml
index 0f6d809..44ed1e0 100644
--- a/streaming-mqtt/pom.xml
+++ b/streaming-mqtt/pom.xml
@@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>spark-streaming-mqtt_2.11</artifactId>
+ <artifactId>spark-streaming-mqtt_2.12</artifactId>
<properties>
<sbt.project.name>streaming-mqtt</sbt.project.name>
</properties>
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-pubnub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubnub/pom.xml b/streaming-pubnub/pom.xml
index 464cfce..683cb29 100644
--- a/streaming-pubnub/pom.xml
+++ b/streaming-pubnub/pom.xml
@@ -19,13 +19,13 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<groupId>org.apache.bahir</groupId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>spark-streaming-pubnub_2.11</artifactId>
+ <artifactId>spark-streaming-pubnub_2.12</artifactId>
<properties>
<sbt.project.name>streaming-pubnub</sbt.project.name>
</properties>
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-pubsub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubsub/pom.xml b/streaming-pubsub/pom.xml
index f6ecd37..3f92983 100644
--- a/streaming-pubsub/pom.xml
+++ b/streaming-pubsub/pom.xml
@@ -19,14 +19,14 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<groupId>org.apache.bahir</groupId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>spark-streaming-pubsub_2.11</artifactId>
+ <artifactId>spark-streaming-pubsub_2.12</artifactId>
<properties>
<sbt.project.name>streaming-pubsub</sbt.project.name>
</properties>
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-twitter/README.md
----------------------------------------------------------------------
diff --git a/streaming-twitter/README.md b/streaming-twitter/README.md
index 4123ea9..1703606 100644
--- a/streaming-twitter/README.md
+++ b/streaming-twitter/README.md
@@ -23,7 +23,7 @@ For example, to include it when starting the spark shell:
Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
The `--packages` argument can also be used with `bin/spark-submit`.
-This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
## Examples
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-twitter/pom.xml b/streaming-twitter/pom.xml
index 2bf29b5..f031771 100644
--- a/streaming-twitter/pom.xml
+++ b/streaming-twitter/pom.xml
@@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>spark-streaming-twitter_2.11</artifactId>
+ <artifactId>spark-streaming-twitter_2.12</artifactId>
<properties>
<sbt.project.name>streaming-twitter</sbt.project.name>
</properties>
@@ -72,7 +72,7 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>algebird-core_${scala.binary.version}</artifactId>
- <version>0.11.0</version>
+ <version>0.12.4</version>
<scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-zeromq/README.md
----------------------------------------------------------------------
diff --git a/streaming-zeromq/README.md b/streaming-zeromq/README.md
index 8ced539..8d57424 100644
--- a/streaming-zeromq/README.md
+++ b/streaming-zeromq/README.md
@@ -24,7 +24,7 @@ For example, to include it when starting the spark shell:
Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
The `--packages` argument can also be used with `bin/spark-submit`.
-This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
## Examples
http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-zeromq/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-zeromq/pom.xml b/streaming-zeromq/pom.xml
index 5aaf2fa..ec1c4a7 100644
--- a/streaming-zeromq/pom.xml
+++ b/streaming-zeromq/pom.xml
@@ -20,13 +20,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>bahir-parent_2.11</artifactId>
+ <artifactId>bahir-parent_2.12</artifactId>
<version>2.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.bahir</groupId>
- <artifactId>spark-streaming-zeromq_2.11</artifactId>
+ <artifactId>spark-streaming-zeromq_2.12</artifactId>
<properties>
<sbt.project.name>streaming-zeromq</sbt.project.name>
</properties>