You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2019/01/28 17:11:51 UTC

bahir git commit: [BAHIR-107] Upgrade to Scala 2.12 and Spark 2.4.0

Repository: bahir
Updated Branches:
  refs/heads/master 5cfd7ac31 -> c51853d13


[BAHIR-107] Upgrade to Scala 2.12 and Spark 2.4.0

Closes #76


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/c51853d1
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/c51853d1
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/c51853d1

Branch: refs/heads/master
Commit: c51853d135ad2d9da67804259f4ed0e29223afb3
Parents: 5cfd7ac
Author: Lukasz Antoniak <lu...@gmail.com>
Authored: Tue Dec 11 06:57:46 2018 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Mon Jan 28 09:08:09 2019 -0800

----------------------------------------------------------------------
 common/pom.xml                                  |  4 +-
 dev/change-scala-version.sh                     | 13 +--
 dev/release-build.sh                            | 97 +++++++++++---------
 distribution/pom.xml                            |  4 +-
 pom.xml                                         | 76 ++++++++-------
 sql-cloudant/README.md                          |  2 +-
 sql-cloudant/pom.xml                            |  4 +-
 .../apache/bahir/cloudant/DefaultSource.scala   |  2 +-
 sql-streaming-akka/README.md                    |  2 +-
 sql-streaming-akka/pom.xml                      |  4 +-
 .../sql/streaming/akka/AkkaStreamSource.scala   | 19 ++--
 sql-streaming-mqtt/README.md                    |  2 +-
 sql-streaming-mqtt/pom.xml                      |  4 +-
 .../sql/streaming/mqtt/CachedMQTTClient.scala   | 12 ++-
 .../sql/streaming/mqtt/MQTTStreamSink.scala     | 94 ++++++++-----------
 .../sql/streaming/mqtt/MQTTStreamSource.scala   | 15 +--
 .../sql/mqtt/HdfsBasedMQTTStreamSource.scala    |  6 +-
 .../streaming/mqtt/MQTTStreamSinkSuite.scala    | 32 +++----
 .../streaming/mqtt/MQTTStreamSourceSuite.scala  |  9 +-
 streaming-akka/README.md                        |  2 +-
 streaming-akka/pom.xml                          |  4 +-
 streaming-mqtt/README.md                        |  2 +-
 streaming-mqtt/pom.xml                          |  4 +-
 streaming-pubnub/pom.xml                        |  4 +-
 streaming-pubsub/pom.xml                        |  4 +-
 streaming-twitter/README.md                     |  2 +-
 streaming-twitter/pom.xml                       |  6 +-
 streaming-zeromq/README.md                      |  2 +-
 streaming-zeromq/pom.xml                        |  4 +-
 29 files changed, 218 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index d7757bb..0e443ec 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -20,13 +20,13 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.bahir</groupId>
-    <artifactId>bahir-parent_2.11</artifactId>
+    <artifactId>bahir-parent_2.12</artifactId>
     <version>2.4.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
   <groupId>org.apache.bahir</groupId>
-  <artifactId>bahir-common_2.11</artifactId>
+  <artifactId>bahir-common_2.12</artifactId>
   <properties>
     <sbt.project.name>bahir-common</sbt.project.name>
   </properties>

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/dev/change-scala-version.sh
----------------------------------------------------------------------
diff --git a/dev/change-scala-version.sh b/dev/change-scala-version.sh
index 7203ee7..09c97e2 100755
--- a/dev/change-scala-version.sh
+++ b/dev/change-scala-version.sh
@@ -1,4 +1,5 @@
 #!/usr/bin/env bash
+
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -18,7 +19,7 @@
 
 set -e
 
-VALID_VERSIONS=( 2.10 2.11 )
+VALID_VERSIONS=( 2.11 2.12 )
 
 usage() {
   echo "Usage: $(basename $0) [-h|--help] <version>
@@ -43,10 +44,10 @@ check_scala_version() {
 
 check_scala_version "$TO_VERSION"
 
-if [ $TO_VERSION = "2.11" ]; then
-  FROM_VERSION="2.10"
-else
+if [ $TO_VERSION = "2.12" ]; then
   FROM_VERSION="2.11"
+else
+  FROM_VERSION="2.12"
 fi
 
 sed_i() {
@@ -59,7 +60,7 @@ BASEDIR=$(dirname $0)/..
 find "$BASEDIR" -name 'pom.xml' -not -path '*target*' -print \
   -exec bash -c "sed_i 's/\(artifactId.*\)_'$FROM_VERSION'/\1_'$TO_VERSION'/g' {}" \;
 
-# Also update <scala.binary.version> in parent POM
-# Match any scala binary version to ensure idempotency
+# also update <scala.binary.version> in parent POM
+# match any scala binary version to ensure idempotency
 sed_i '1,/<scala\.binary\.version>[0-9]*\.[0-9]*</s/<scala\.binary\.version>[0-9]*\.[0-9]*</<scala.binary.version>'$TO_VERSION'</' \
   "$BASEDIR/pom.xml"

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/dev/release-build.sh
----------------------------------------------------------------------
diff --git a/dev/release-build.sh b/dev/release-build.sh
index a2dc5e2..89f0a52 100755
--- a/dev/release-build.sh
+++ b/dev/release-build.sh
@@ -38,11 +38,11 @@ to the staging release location.
 
 --release-publish --tag="v2.3.0-rc1"
 Publish the maven artifacts of a release to the Apache staging maven repository.
-Note that this will publish both Scala 2.10 and 2.11 artifacts.
+Note that this will publish both Scala 2.11 and 2.12 artifacts.
 
 --release-snapshot
 Publish the maven snapshot artifacts to Apache snapshots maven repository
-Note that this will publish both Scala 2.10 and 2.11 artifacts.
+Note that this will publish both Scala 2.11 and 2.12 artifacts.
 
 OPTIONS
 
@@ -78,7 +78,7 @@ if [ $# -eq 0 ]; then
 fi
 
 
-# Process each provided argument configuration
+# process each provided argument configuration
 while [ "${1+defined}" ]; do
   IFS="=" read -ra PARTS <<< "$1"
   case "${PARTS[0]}" in
@@ -134,7 +134,7 @@ while [ "${1+defined}" ]; do
      echo "Error: Unknown option: $1" >&2
      exit 1
      ;;
-    *)  # No more options
+    *)  # no more options
      break
      ;;
   esac
@@ -160,7 +160,7 @@ fi
 
 if [[ "$RELEASE_PUBLISH" == "true"  ]]; then
     if [[ "$GIT_REF" && "$GIT_TAG" ]]; then
-        echo "ERROR: Only one argumented permitted when publishing : --gitCommitHash or --gitTag"
+        echo "ERROR: Only one argument permitted when publishing : --gitCommitHash or --gitTag"
         exit_with_usage
     fi
     if [[ -z "$GIT_REF" && -z "$GIT_TAG" ]]; then
@@ -175,11 +175,11 @@ if [[ "$RELEASE_PUBLISH" == "true" && "$DRY_RUN" ]]; then
 fi
 
 if [[ "$RELEASE_SNAPSHOT" == "true" && "$DRY_RUN" ]]; then
-    echo "ERROR: --dryRun not supported for --release-publish"
+    echo "ERROR: --dryRun not supported for --release-snapshot"
     exit_with_usage
 fi
 
-# Commit ref to checkout when building
+# commit ref to checkout when building
 GIT_REF=${GIT_REF:-master}
 if [[ "$RELEASE_PUBLISH" == "true" && "$GIT_TAG" ]]; then
     GIT_REF="tags/$GIT_TAG"
@@ -200,11 +200,10 @@ fi
 
 RELEASE_STAGING_LOCATION="https://dist.apache.org/repos/dist/dev/bahir/bahir-spark"
 
-
 echo "  "
-echo "-------------------------------------------------------------"
-echo "------- Release preparation with the following parameters ---"
-echo "-------------------------------------------------------------"
+echo "-----------------------------------------------------------------"
+echo "------- Release preparation with the following parameters -------"
+echo "-----------------------------------------------------------------"
 echo "Executing           ==> $GOAL"
 echo "Git reference       ==> $GIT_REF"
 echo "release version     ==> $RELEASE_VERSION"
@@ -220,7 +219,6 @@ echo $RELEASE_STAGING_LOCATION
 echo "  "
 
 function checkout_code {
-    # Checkout code
     rm -rf target
     mkdir target
     cd target
@@ -231,33 +229,41 @@ function checkout_code {
     git_hash=`git rev-parse --short HEAD`
     echo "Checked out Bahir git hash $git_hash"
 
-    cd "$BASE_DIR" #return to base dir
+    cd "$BASE_DIR" # return to base dir
 }
 
 if [[ "$RELEASE_PREPARE" == "true" ]]; then
     echo "Preparing release $RELEASE_VERSION"
-    # Checkout code
+    # checkout code
     checkout_code
     cd target/bahir
 
-    # Build and prepare the release
-    $MVN $PUBLISH_PROFILES release:clean release:prepare $DRY_RUN -Darguments="-Dgpg.passphrase=\"$GPG_PASSPHRASE\" -DskipTests" -DreleaseVersion="$RELEASE_VERSION" -DdevelopmentVersion="$DEVELOPMENT_VERSION" -Dtag="$RELEASE_TAG"
+    # test with scala 2.11 and 2.12
+    ./dev/change-scala-version.sh 2.11
+    $MVN $PUBLISH_PROFILES clean test -Dscala-2.11 || exit 1
+    ./dev/change-scala-version.sh 2.12
+    $MVN $PUBLISH_PROFILES clean test || exit 1
+
+    # build and prepare the release
+    $MVN $PUBLISH_PROFILES release:clean release:prepare $DRY_RUN \
+        -DskipTests=true -Dgpg.passphrase="$GPG_PASSPHRASE" \
+        -DreleaseVersion="$RELEASE_VERSION" -DdevelopmentVersion="$DEVELOPMENT_VERSION" -Dtag="$RELEASE_TAG"
 
-    cd .. #exit bahir
+    cd .. # exit bahir
 
     if [ -z "$DRY_RUN" ]; then
         cd "$BASE_DIR/target/bahir"
         git checkout $RELEASE_TAG
         git clean -d -f -x
 
-        $MVN $PUBLISH_PROFILES clean install -DskiptTests -Darguments="-DskipTests"
+        $MVN $PUBLISH_PROFILES clean install -DskipTests=true
 
         cd "$BASE_DIR/target"
         svn co $RELEASE_STAGING_LOCATION svn-bahir
         mkdir -p svn-bahir/$RELEASE_VERSION-$RELEASE_RC
 
         cp bahir/distribution/target/*.tar.gz svn-bahir/$RELEASE_VERSION-$RELEASE_RC/
-        cp bahir/distribution/target/*.zip    svn-bahir/$RELEASE_VERSION-$RELEASE_RC/
+        cp bahir/distribution/target/*.zip svn-bahir/$RELEASE_VERSION-$RELEASE_RC/
 
         cd svn-bahir/$RELEASE_VERSION-$RELEASE_RC/
         rm -f *.asc
@@ -265,49 +271,50 @@ if [[ "$RELEASE_PREPARE" == "true" ]]; then
         rm -f *.sha*
         for i in *.zip *.tar.gz; do shasum --algorithm 512 $i > $i.sha512; done
 
-        cd .. #exit $RELEASE_VERSION-$RELEASE_RC/
+        cd .. # exit $RELEASE_VERSION-$RELEASE_RC
 
         svn add $RELEASE_VERSION-$RELEASE_RC/
         svn ci -m"Apache Bahir $RELEASE_VERSION-$RELEASE_RC"
     fi
 
-
-    cd "$BASE_DIR" #exit target
-
+    cd "$BASE_DIR" # exit target
     exit 0
 fi
 
 
 if [[ "$RELEASE_PUBLISH" == "true" ]]; then
     echo "Preparing release $RELEASE_VERSION"
-    # Checkout code
+    # checkout code
     checkout_code
     cd target/bahir
 
-    #Deploy default scala 2.11
-    mvn $PUBLISH_PROFILES -DaltDeploymentRepository=apache.releases.https::default::https://repository.apache.org/service/local/staging/deploy/maven2 clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dgpg.passphrase=$GPG_PASSPHRASE
-
-    #mvn clean
+    DEPLOYMENT_REPOSITORY="apache.releases.https::default::https://repository.apache.org/service/local/staging/deploy/maven2"
 
-    #Deploy scala 2.10
-    #./dev/change-scala-version.sh 2.10
-    #mvn $PUBLISH_PROFILES -DaltDeploymentRepository=apache.releases.https::default::https://repository.apache.org/service/local/staging/deploy/maven2 clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dscala-2.10 -Dgpg.passphrase=$GPG_PASSPHRASE
+    # deploy default scala 2.12
+    $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \
+        -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \
+        -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE
 
-    cd "$BASE_DIR" #exit target
+    # deploy scala 2.11
+    ./dev/change-scala-version.sh 2.11
+    $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \
+        -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \
+        -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE -Dscala-2.11
 
+    cd "$BASE_DIR" # exit target
     exit 0
 fi
 
 
 if [[ "$RELEASE_SNAPSHOT" == "true" ]]; then
-    # Checkout code
+    # checkout code
     checkout_code
     cd target/bahir
 
-    CURRENT_VERSION=$($MVN help:evaluate -Dexpression=project.version \
-    | grep -v INFO | grep -v WARNING | grep -v Download)
+    DEPLOYMENT_REPOSITORY="apache.snapshots.https::default::https://repository.apache.org/content/repositories/snapshots"
+    CURRENT_VERSION=$($MVN help:evaluate -Dexpression=project.version | grep -v INFO | grep -v WARNING | grep -v Download)
 
-    # Publish Bahir Snapshots to Maven snapshot repo
+    # publish Bahir snapshots to maven repository
     echo "Deploying Bahir SNAPSHOT at '$GIT_REF' ($git_hash)"
     echo "Publish version is $CURRENT_VERSION"
     if [[ ! $CURRENT_VERSION == *"SNAPSHOT"* ]]; then
@@ -316,19 +323,23 @@ if [[ "$RELEASE_SNAPSHOT" == "true" ]]; then
         exit 1
     fi
 
-    #Deploy default scala 2.11
-    $MVN $PUBLISH_PROFILES -DaltDeploymentRepository=apache.snapshots.https::default::https://repository.apache.org/content/repositories/snapshots clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dgpg.passphrase=$GPG_PASSPHRASE
+    # deploy default scala 2.12
+    $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \
+        -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \
+        -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE
 
-    #Deploy scala 2.10
-    ./dev/change-scala-version.sh 2.10
-    $MVN $PUBLISH_PROFILES -DaltDeploymentRepository=apache.snapshots.https::default::https://repository.apache.org/content/repositories/snapshots clean package gpg:sign install:install deploy:deploy -DskiptTests -Darguments="-DskipTests" -Dscala-2.10 -Dgpg.passphrase=$GPG_PASSPHRASE
+    # deploy scala 2.11
+    ./dev/change-scala-version.sh 2.11
+    $MVN $PUBLISH_PROFILES clean package gpg:sign install:install deploy:deploy \
+        -DaltDeploymentRepository=$DEPLOYMENT_REPOSITORY \
+        -DskipTests=true -Dgpg.passphrase=$GPG_PASSPHRASE -Dscala-2.11
 
-    cd "$BASE_DIR" #exit target
+    cd "$BASE_DIR" # exit target
     exit 0
 fi
 
 
-cd "$BASE_DIR" #return to base dir
+cd "$BASE_DIR" # return to base directory
 rm -rf target
 echo "ERROR: wrong execution goals"
 exit_with_usage

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 18ba854..05a311f 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -20,13 +20,13 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.bahir</groupId>
-        <artifactId>bahir-parent_2.11</artifactId>
+        <artifactId>bahir-parent_2.12</artifactId>
         <version>2.4.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 
     <groupId>org.apache.bahir</groupId>
-    <artifactId>bahir-spark-distribution_2.11</artifactId>
+    <artifactId>bahir-spark-distribution_2.12</artifactId>
     <packaging>pom</packaging>
     <name>Apache Bahir - Spark Extensions Distribution</name>
     <url>http://bahir.apache.org/</url>

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index af70a76..ec6b353 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@
     <version>18</version>
   </parent>
   <groupId>org.apache.bahir</groupId>
-  <artifactId>bahir-parent_2.11</artifactId>
+  <artifactId>bahir-parent_2.12</artifactId>
   <version>2.4.0-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>Apache Bahir - Parent POM</name>
@@ -90,18 +90,19 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <skipTests>false</skipTests>
 
     <!-- General project dependencies version -->
     <java.version>1.8</java.version>
     <maven.version>3.3.9</maven.version>
-    <scala.version>2.11.12</scala.version>
-    <scala.binary.version>2.11</scala.binary.version>
+    <scala.version>2.12.7</scala.version>
+    <scala.binary.version>2.12</scala.binary.version>
 
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
 
     <!-- Spark version -->
-    <spark.version>2.3.2</spark.version>
+    <spark.version>2.4.0</spark.version>
 
     <!-- Hadoop version -->
     <hadoop.version>2.6.5</hadoop.version>
@@ -179,7 +180,7 @@
       published POMs are flattened and do not contain variables. Without this dependency, some
       subprojects' published POMs would contain variables like ${scala.binary.version} that will
       be substituted according to the default properties instead of the ones determined by the
-      profiles that were active during publishing, causing the Scala 2.10 build's POMs to have 2.11
+      profiles that were active during publishing, causing the Scala 2.11 build's POMs to have 2.12
       dependencies due to the incorrect substitutions. By ensuring that maven-shade runs for all
       subprojects, we eliminate this problem because the substitutions are baked into the final POM.
 
@@ -204,6 +205,17 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <!--
+      Temporary workaround. Spark Core 2.4.0 with Scala 2.12 depends on Apache Avro 1.8.2,
+      which pulls in transitive dependency of Paranamer 2.7. All integration tests fail
+      with error message same as SPARK-22128. Manually upgrading Paranamer to 2.8.
+    -->
+    <dependency>
+      <groupId>com.thoughtworks.paranamer</groupId>
+      <artifactId>paranamer</artifactId>
+      <version>2.8</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <dependencyManagement>
@@ -213,6 +225,11 @@
         <artifactId>spark-core_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
         <exclusions>
+          <!-- Temporary fix, see above. -->
+          <exclusion>
+            <groupId>com.thoughtworks.paranamer</groupId>
+            <artifactId>paranamer</artifactId>
+          </exclusion>
           <exclusion>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-plus</artifactId>
@@ -308,7 +325,7 @@
       <dependency>
         <groupId>org.json4s</groupId>
         <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
-        <version>3.2.11</version>
+        <version>3.5.3</version>
       </dependency>
 
       <dependency>
@@ -982,6 +999,9 @@
       <plugin>
         <groupId>org.scalatest</groupId>
         <artifactId>scalatest-maven-plugin</artifactId>
+        <configuration>
+          <skipTests>${skipTests}</skipTests>
+        </configuration>
       </plugin>
       <!-- Build test-jar's for all projects, since some projects depend on tests from others -->
       <plugin>
@@ -1040,33 +1060,20 @@
 
     <profile>
       <id>distribution</id>
-
       <modules>
         <module>distribution</module>
       </modules>
     </profile>
 
-    <!--
     <profile>
-      <id>scala-2.10</id>
+      <id>scala-2.11</id>
       <activation>
-        <property><name>scala-2.10</name></property>
+        <property><name>scala-2.11</name></property>
       </activation>
       <properties>
-        <scala.version>2.10.6</scala.version>
-        <scala.binary.version>2.10</scala.binary.version>
-        <jline.version>${scala.version}</jline.version>
-        <jline.groupid>org.scala-lang</jline.groupid>
+        <scala.version>2.11.12</scala.version>
+        <scala.binary.version>2.11</scala.binary.version>
       </properties>
-      <dependencyManagement>
-        <dependencies>
-          <dependency>
-            <groupId>${jline.groupid}</groupId>
-            <artifactId>jline</artifactId>
-            <version>${jline.version}</version>
-          </dependency>
-        </dependencies>
-      </dependencyManagement>
       <build>
         <plugins>
           <plugin>
@@ -1082,7 +1089,7 @@
                   <rules>
                     <bannedDependencies>
                       <excludes combine.children="append">
-                        <exclude>*:*_2.11</exclude>
+                        <exclude>*:*_2.12</exclude>
                       </excludes>
                     </bannedDependencies>
                   </rules>
@@ -1095,13 +1102,13 @@
     </profile>
 
     <profile>
-      <id>scala-2.11</id>
+      <id>scala-2.12</id>
       <activation>
-        <property><name>!scala-2.10</name></property>
+        <property><name>!scala-2.11</name></property>
       </activation>
       <properties>
-        <scala.version>2.11.8</scala.version>
-        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.12.7</scala.version>
+        <scala.binary.version>2.12</scala.binary.version>
       </properties>
       <build>
         <plugins>
@@ -1118,7 +1125,7 @@
                   <rules>
                     <bannedDependencies>
                       <excludes combine.children="append">
-                        <exclude>*:*_2.10</exclude>
+                        <exclude>*:*_2.11</exclude>
                       </excludes>
                     </bannedDependencies>
                   </rules>
@@ -1129,17 +1136,6 @@
         </plugins>
       </build>
     </profile>
-    -->
-
-    <profile>
-      <id>test-java-home</id>
-      <activation>
-        <property><name>env.JAVA_HOME</name></property>
-      </activation>
-      <properties>
-        <test.java.home>${env.JAVA_HOME}</test.java.home>
-      </properties>
-    </profile>
 
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-cloudant/README.md
----------------------------------------------------------------------
diff --git a/sql-cloudant/README.md b/sql-cloudant/README.md
index b651990..d315d7f 100644
--- a/sql-cloudant/README.md
+++ b/sql-cloudant/README.md
@@ -37,7 +37,7 @@ Submit a job in Scala:
 
 	spark-submit --class "<your class>" --master local[4] --packages org.apache.bahir:spark-sql-cloudant__{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}} <path to spark-sql-cloudant jar>
 
-This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
 
 ## Configuration options	
 The configuration is obtained in the following sequence:

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-cloudant/pom.xml
----------------------------------------------------------------------
diff --git a/sql-cloudant/pom.xml b/sql-cloudant/pom.xml
index d81232a..a14862b 100644
--- a/sql-cloudant/pom.xml
+++ b/sql-cloudant/pom.xml
@@ -20,13 +20,13 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.bahir</groupId>
-    <artifactId>bahir-parent_2.11</artifactId>
+    <artifactId>bahir-parent_2.12</artifactId>
     <version>2.4.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
   <groupId>org.apache.bahir</groupId>
-  <artifactId>spark-sql-cloudant_2.11</artifactId>
+  <artifactId>spark-sql-cloudant_2.12</artifactId>
   <properties>
     <sbt.project.name>spark-sql-cloudant</sbt.project.name>
   </properties>

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
index 47643cc..84babdd 100644
--- a/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
+++ b/sql-cloudant/src/main/scala/org/apache/bahir/cloudant/DefaultSource.scala
@@ -76,7 +76,7 @@ case class CloudantReadWriteRelation (config: CloudantConfig,
       logger.warn("Database " + config.getDbname +
         ": nothing was saved because the number of records was 0!")
     } else {
-      val result = data.toJSON.foreachPartition { x =>
+      data.toJSON.foreachPartition { x: Iterator[String] =>
         val list = x.toList // Has to pass as List, Iterator results in 0 data
         dataAccess.saveAll(list)
       }

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-akka/README.md
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/README.md b/sql-streaming-akka/README.md
index 29685ee..a29979b 100644
--- a/sql-streaming-akka/README.md
+++ b/sql-streaming-akka/README.md
@@ -22,7 +22,7 @@ For example, to include it when starting the spark shell:
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.
 
-This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
 
 ## Examples
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/pom.xml b/sql-streaming-akka/pom.xml
index 98586c7..6d50325 100644
--- a/sql-streaming-akka/pom.xml
+++ b/sql-streaming-akka/pom.xml
@@ -20,13 +20,13 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.bahir</groupId>
-        <artifactId>bahir-parent_2.11</artifactId>
+        <artifactId>bahir-parent_2.12</artifactId>
         <version>2.4.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 
     <groupId>org.apache.bahir</groupId>
-    <artifactId>spark-sql-streaming-akka_2.11</artifactId>
+    <artifactId>spark-sql-streaming-akka_2.12</artifactId>
     <properties>
         <sbt.project.name>sql-streaming-akka</sbt.project.name>
     </properties>

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
index 3f2101c..f20a917 100644
--- a/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
+++ b/sql-streaming-akka/src/main/scala/org/apache/bahir/sql/streaming/akka/AkkaStreamSource.scala
@@ -41,10 +41,12 @@ import com.typesafe.config.ConfigFactory
 import org.rocksdb.{Options, RocksDB}
 
 import org.apache.spark.SparkEnv
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.UTF8StringBuilder
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport}
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 
@@ -262,7 +264,7 @@ class AkkaMicroBatchReader(urlOfPublisher: String,
 
   override def readSchema(): StructType = AkkaStreamConstants.SCHEMA_DEFAULT
 
-  override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] = {
+  override def planInputPartitions(): util.List[InputPartition[InternalRow]] = {
     assert(startOffset != null && endOffset != null,
       "start offset and end offset should already be set before create read tasks.")
 
@@ -283,8 +285,9 @@ class AkkaMicroBatchReader(urlOfPublisher: String,
 
     (0 until numPartitions).map { i =>
       val slice = slices(i)
-      new DataReaderFactory[Row] {
-        override def createDataReader(): DataReader[Row] = new DataReader[Row] {
+      new InputPartition[InternalRow] {
+        override def createPartitionReader(): InputPartitionReader[InternalRow] =
+            new InputPartitionReader[InternalRow] {
           private var currentIdx = -1
 
           override def next(): Boolean = {
@@ -292,8 +295,10 @@ class AkkaMicroBatchReader(urlOfPublisher: String,
             currentIdx < slice.size
           }
 
-          override def get(): Row = {
-            Row.fromTuple(slice(currentIdx))
+          override def get(): InternalRow = {
+            val builder = new UTF8StringBuilder()
+            builder.append(slice(currentIdx)._1)
+            InternalRow(builder.build(), slice(currentIdx)._2.getTime)
           }
 
           override def close(): Unit = {}

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/README.md b/sql-streaming-mqtt/README.md
index 721b544..0fbf63e 100644
--- a/sql-streaming-mqtt/README.md
+++ b/sql-streaming-mqtt/README.md
@@ -22,7 +22,7 @@ For example, to include it when starting the spark shell:
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.
 
-This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
 
 ## Examples
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/pom.xml b/sql-streaming-mqtt/pom.xml
index 3f818f8..53ccc32 100644
--- a/sql-streaming-mqtt/pom.xml
+++ b/sql-streaming-mqtt/pom.xml
@@ -20,13 +20,13 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.bahir</groupId>
-    <artifactId>bahir-parent_2.11</artifactId>
+    <artifactId>bahir-parent_2.12</artifactId>
     <version>2.4.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
   <groupId>org.apache.bahir</groupId>
-  <artifactId>spark-sql-streaming-mqtt_2.11</artifactId>
+  <artifactId>spark-sql-streaming-mqtt_2.12</artifactId>
   <properties>
     <sbt.project.name>sql-streaming-mqtt</sbt.project.name>
   </properties>

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
index 8925e93..78eae52 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/CachedMQTTClient.scala
@@ -94,8 +94,16 @@ private[mqtt] object CachedMQTTClient extends Logging {
   private def closeMqttClient(params: Seq[(String, String)],
       client: MqttClient, persistence: MqttClientPersistence): Unit = {
     try {
-      client.disconnect()
-      persistence.close()
+      if (client.isConnected) {
+        client.disconnect()
+      }
+      try {
+        persistence.close()
+      } catch {
+        case NonFatal(e) => log.warn(
+          s"Error while closing MQTT persistent store ${e.getMessage}", e
+        )
+      }
       client.close()
     } catch {
       case NonFatal(e) => log.warn(s"Error while closing MQTT client ${e.getMessage}", e)

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
index 846765c..23385f4 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSink.scala
@@ -17,18 +17,17 @@
 
 package org.apache.bahir.sql.streaming.mqtt
 
-import java.nio.charset.Charset
-
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.eclipse.paho.client.mqttv3.MqttException
 
 import org.apache.spark.SparkEnv
-import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext}
-import org.apache.spark.sql.execution.streaming.sources.{PackedRowCommitMessage, PackedRowWriterFactory}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamWriteSupport}
-import org.apache.spark.sql.sources.v2.writer.{DataWriterFactory, WriterCommitMessage}
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -39,67 +38,50 @@ import org.apache.bahir.utils.Retry
 
 class MQTTStreamWriter (schema: StructType, parameters: DataSourceOptions)
     extends StreamWriter with Logging {
-  private lazy val publishAttempts: Int =
-    SparkEnv.get.conf.getInt("spark.mqtt.client.publish.attempts", -1)
-  private lazy val publishBackoff: Long =
-    SparkEnv.get.conf.getTimeAsMs("spark.mqtt.client.publish.backoff", "5s")
+  override def createWriterFactory(): DataWriterFactory[InternalRow] = {
+    // Skipping client identifier as single batch can be distributed to multiple
+    // Spark worker process. MQTT server does not support two connections
+    // declaring same client ID at given point in time.
+    val params = parameters.asMap().asScala.filterNot(
+      _._1.equalsIgnoreCase("clientId")
+    )
+    MQTTDataWriterFactory(params)
+  }
 
-  assert(SparkSession.getActiveSession.isDefined)
-  private val spark = SparkSession.getActiveSession.get
+  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
 
-  private var topic: String = _
-  private var qos: Int = -1
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+}
 
-  initialize()
-  private def initialize(): Unit = {
-    val (_, _, topic_, _, _, qos_, _, _, _) = MQTTUtils.parseConfigParams(
-      collection.immutable.HashMap() ++ parameters.asMap().asScala
-    )
-    topic = topic_
-    qos = qos_
-  }
+case class MQTTDataWriterFactory(config: mutable.Map[String, String])
+    extends DataWriterFactory[InternalRow] {
+  override def createDataWriter(
+    partitionId: Int, taskId: Long, epochId: Long
+  ): DataWriter[InternalRow] = new MQTTDataWriter(config)
+}
 
-  override def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory
+case object MQTTWriterCommitMessage extends WriterCommitMessage
 
-  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
-    commit(messages)
-  }
+class MQTTDataWriter(config: mutable.Map[String, String]) extends DataWriter[InternalRow] {
+  private lazy val publishAttempts: Int =
+    SparkEnv.get.conf.getInt("spark.mqtt.client.publish.attempts", -1)
+  private lazy val publishBackoff: Long =
+    SparkEnv.get.conf.getTimeAsMs("spark.mqtt.client.publish.backoff", "5s")
 
-  override def commit(messages: Array[WriterCommitMessage]): Unit = {
-    val rows = messages.collect {
-      case PackedRowCommitMessage(rs) => rs
-    }.flatten
+  private lazy val (_, _, topic, _, _, qos, _, _, _) = MQTTUtils.parseConfigParams(config.toMap)
 
-    // Skipping client identifier as single batch can be distributed to multiple
-    // Spark worker process. MQTT server does not support two connections
-    // declaring same client ID at given point in time.
-    val params_ = Seq() ++ parameters.asMap().asScala.toSeq.filterNot(
-      _._1.equalsIgnoreCase("clientId")
-    )
-    // IMPL Note: Had to declare new value reference due to serialization requirements.
-    val topic_ = topic
-    val qos_ = qos
-    val publishAttempts_ = publishAttempts
-    val publishBackoff_ = publishBackoff
-
-    val data = spark.createDataFrame(rows.toList.asJava, schema)
-    data.foreachPartition (
-      iterator => iterator.foreach(
-        row => {
-          val client = CachedMQTTClient.getOrCreate(params_.toMap)
-          val message = row.mkString.getBytes(Charset.defaultCharset())
-          Retry(publishAttempts_, publishBackoff_, classOf[MqttException]) {
-            // In case of errors, retry sending the message.
-            client.publish(topic_, message, qos_, false)
-          }
-        }
-      )
-    )
+  override def write(record: InternalRow): Unit = {
+    val client = CachedMQTTClient.getOrCreate(config.toMap)
+    val message = record.getBinary(0)
+    Retry(publishAttempts, publishBackoff, classOf[MqttException]) {
+      // In case of errors, retry sending the message.
+      client.publish(topic, message, qos, false)
+    }
   }
 
-  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
+  override def commit(): WriterCommitMessage = MQTTWriterCommitMessage
 
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+  override def abort(): Unit = {}
 }
 
 case class MQTTRelation(override val sqlContext: SQLContext, data: DataFrame)

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
index 7146ecc..e1314ae 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.bahir.sql.streaming.mqtt
 
+import java.{util => jutil}
 import java.nio.charset.Charset
 import java.sql.Timestamp
 import java.text.SimpleDateFormat
@@ -31,9 +32,10 @@ import scala.collection.mutable.ListBuffer
 import org.eclipse.paho.client.mqttv3._
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport}
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
 import org.apache.spark.sql.types._
 
@@ -169,7 +171,7 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
     MQTTStreamConstants.SCHEMA_DEFAULT
   }
 
-  override def createDataReaderFactories(): java.util.List[DataReaderFactory[Row]] = {
+  override def planInputPartitions(): jutil.List[InputPartition[InternalRow]] = {
     val rawList: IndexedSeq[MQTTMessage] = synchronized {
       val sliceStart = LongOffset.convert(startOffset).get.offset + 1
       val sliceEnd = LongOffset.convert(endOffset).get.offset + 1
@@ -186,8 +188,9 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
 
     (0 until numPartitions).map { i =>
       val slice = slices(i)
-      new DataReaderFactory[Row] {
-        override def createDataReader(): DataReader[Row] = new DataReader[Row] {
+      new InputPartition[InternalRow] {
+        override def createPartitionReader(): InputPartitionReader[InternalRow] =
+            new InputPartitionReader[InternalRow] {
           private var currentIdx = -1
 
           override def next(): Boolean = {
@@ -195,8 +198,8 @@ class MQTTStreamSource(options: DataSourceOptions, brokerUrl: String, persistenc
             currentIdx < slice.size
           }
 
-          override def get(): Row = {
-            Row(slice(currentIdx).id, slice(currentIdx).topic,
+          override def get(): InternalRow = {
+            InternalRow(slice(currentIdx).id, slice(currentIdx).topic,
               slice(currentIdx).payload, slice(currentIdx).timestamp)
           }
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala b/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala
index e6e202b..fd39557 100644
--- a/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala
+++ b/sql-streaming-mqtt/src/main/scala/org/apache/spark/sql/mqtt/HdfsBasedMQTTStreamSource.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.mqtt
 
 import java.io.IOException
-import java.sql.Timestamp
 import java.util.Calendar
 import java.util.concurrent.locks.{Lock, ReentrantLock}
 
@@ -28,10 +27,9 @@ import org.eclipse.paho.client.mqttv3._
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.HDFSMetadataLog.FileContextManager
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -191,7 +189,7 @@ class HdfsBasedMQTTStreamSource(
     // recover message data file offset from hdfs
     val dataPath = new Path(receivedDataPath)
     if (fs.exists(dataPath)) {
-      val fileManager = new FileContextManager(dataPath, hadoopConfig)
+      val fileManager = CheckpointFileManager.create(dataPath, hadoopConfig)
       val dataFileIndexs = fileManager.list(dataPath, new PathFilter {
         private def isBatchFile(path: Path) = {
           try {

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
index ecdd942..ab24cb3 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSinkSuite.scala
@@ -22,19 +22,17 @@ import java.net.ConnectException
 import java.util
 
 import org.eclipse.paho.client.mqttv3.MqttClient
+import org.eclipse.paho.client.mqttv3.MqttException
 import org.scalatest.BeforeAndAfter
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent.Future
 
-import org.apache.spark.{SharedSparkContext, SparkEnv, SparkException, SparkFunSuite}
-import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.{SharedSparkContext, SparkEnv, SparkFunSuite}
+import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.streaming.MemoryStream
-import org.apache.spark.sql.execution.streaming.sources.PackedRowCommitMessage
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 
 import org.apache.bahir.utils.FileHelper
 
@@ -56,7 +54,8 @@ class MQTTStreamSinkSuite(_ssl: Boolean) extends SparkFunSuite
   }
 
   after {
-    testClient.disconnect()
+    CachedMQTTClient.clear()
+    testClient.disconnectForcibly()
     testClient.close()
     mqttTestUtils.teardown()
     FileHelper.deleteFileQuietly(tempDir)
@@ -95,15 +94,12 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) {
       "topic" -> "test",
       "localStorage" -> tempDir.getAbsoluteFile.toString
     )
-    val schema = StructType(StructField("value", StringType) :: Nil)
-    val messages : Array[Row] = Array(new GenericRowWithSchema(Array("value1"), schema))
-    val thrown: Exception = intercept[SparkException] {
+    val thrown: Exception = intercept[MqttException] {
       provider.createStreamWriter(
-        "query1", schema, OutputMode.Complete(), new DataSourceOptions(parameters.asJava)
-      ).commit(1, Array(PackedRowCommitMessage(messages)))
+        "query1", null, OutputMode.Complete(), new DataSourceOptions(parameters.asJava)
+      ).createWriterFactory().createDataWriter(1, 1, 1).write(null)
     }
-    // SparkException -> MqttException -> ConnectException
-    assert(thrown.getCause.getCause.isInstanceOf[ConnectException])
+    assert(thrown.getCause.isInstanceOf[ConnectException])
   }
 
   test("basic usage") {
@@ -111,7 +107,7 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) {
     val msg2 = "MQTT is a message queue."
     val (_, dataFrame) = createContextAndDF(msg1, msg2)
 
-    sendToMQTT(dataFrame).awaitTermination(3000)
+    sendToMQTT(dataFrame).awaitTermination(5000)
 
     assert(Set(msg1, msg2).equals(messages.values.toSet))
   }
@@ -120,7 +116,7 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) {
     val msg = List.tabulate(100)(n => "Hello, World!" + n)
     val (_, dataFrame) = createContextAndDF(msg: _*)
 
-    sendToMQTT(dataFrame).awaitTermination(3000)
+    sendToMQTT(dataFrame).awaitTermination(5000)
 
     assert(Set(msg: _*).equals(messages.values.toSet))
   }
@@ -134,13 +130,13 @@ class BasicMQTTSinkSuite extends MQTTStreamSinkSuite(false) {
     intercept[IllegalArgumentException] {
       provider.createStreamWriter(
         "query1", null, OutputMode.Complete(), new DataSourceOptions(parameters.asJava)
-      )
+      ).createWriterFactory().createDataWriter(1, 1, 1).write(null)
     }
     intercept[IllegalArgumentException] {
       provider.createStreamWriter(
         "query1", null, OutputMode.Complete(),
         new DataSourceOptions(new util.HashMap[String, String])
-      )
+      ).createWriterFactory().createDataWriter(1, 1, 1).write(null)
     }
   }
 }
@@ -163,7 +159,7 @@ class StressTestMQTTSink extends MQTTStreamSinkSuite(false) {
     val freeMemory: Long = Runtime.getRuntime.freeMemory()
     log.info(s"Available memory before test run is ${freeMemory / (1024 * 1024)}MB.")
     val noOfMsgs: Int = 200
-    val noOfBatches: Int = 10
+    val noOfBatches: Int = 5
 
     val messageBuilder = new StringBuilder()
     for (i <- 0 until (500 * 1024)) yield messageBuilder.append(((i % 26) + 65).toChar)

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
index c4e340c..39cf0df 100644
--- a/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
+++ b/sql-streaming-mqtt/src/test/scala/org/apache/bahir/sql/streaming/mqtt/MQTTStreamSourceSuite.scala
@@ -33,6 +33,7 @@ import org.scalatest.time.Span
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery}
 
@@ -157,15 +158,15 @@ class BasicMQTTSourceSuite extends MQTTStreamSourceSuite {
     // Clear in-memory cache to simulate recovery.
     source.messages.clear()
     source.setOffsetRange(Optional.empty(), Optional.empty())
-    var message: Row = null
-    for (f <- source.createDataReaderFactories().asScala) {
-      val dataReader = f.createDataReader()
+    var message: InternalRow = null
+    for (f <- source.planInputPartitions().asScala) {
+      val dataReader = f.createPartitionReader()
       if (dataReader.next()) {
         message = dataReader.get()
       }
     }
     source.commit(source.getCurrentOffset)
-    assert(payload == new String(message.getAs[Array[Byte]](2), "UTF-8"))
+    assert(payload == new String(message.getBinary(2), "UTF-8"))
   }
 
   test("no server up") {

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-akka/README.md
----------------------------------------------------------------------
diff --git a/streaming-akka/README.md b/streaming-akka/README.md
index f57583e..bff9c25 100644
--- a/streaming-akka/README.md
+++ b/streaming-akka/README.md
@@ -23,7 +23,7 @@ For example, to include it when starting the spark shell:
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.
 
-This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
 
 ## Examples
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-akka/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-akka/pom.xml b/streaming-akka/pom.xml
index 5b94c7a..0d4e42f 100644
--- a/streaming-akka/pom.xml
+++ b/streaming-akka/pom.xml
@@ -20,13 +20,13 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.bahir</groupId>
-    <artifactId>bahir-parent_2.11</artifactId>
+    <artifactId>bahir-parent_2.12</artifactId>
     <version>2.4.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
   <groupId>org.apache.bahir</groupId>
-  <artifactId>spark-streaming-akka_2.11</artifactId>
+  <artifactId>spark-streaming-akka_2.12</artifactId>
   <properties>
     <sbt.project.name>streaming-akka</sbt.project.name>
   </properties>

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-mqtt/README.md
----------------------------------------------------------------------
diff --git a/streaming-mqtt/README.md b/streaming-mqtt/README.md
index 05542de..811f822 100644
--- a/streaming-mqtt/README.md
+++ b/streaming-mqtt/README.md
@@ -23,7 +23,7 @@ For example, to include it when starting the spark shell:
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.
 
-This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
 
 ## Configuration options.
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-mqtt/pom.xml b/streaming-mqtt/pom.xml
index 0f6d809..44ed1e0 100644
--- a/streaming-mqtt/pom.xml
+++ b/streaming-mqtt/pom.xml
@@ -20,13 +20,13 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.bahir</groupId>
-    <artifactId>bahir-parent_2.11</artifactId>
+    <artifactId>bahir-parent_2.12</artifactId>
     <version>2.4.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
   <groupId>org.apache.bahir</groupId>
-  <artifactId>spark-streaming-mqtt_2.11</artifactId>
+  <artifactId>spark-streaming-mqtt_2.12</artifactId>
   <properties>
     <sbt.project.name>streaming-mqtt</sbt.project.name>
   </properties>

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-pubnub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubnub/pom.xml b/streaming-pubnub/pom.xml
index 464cfce..683cb29 100644
--- a/streaming-pubnub/pom.xml
+++ b/streaming-pubnub/pom.xml
@@ -19,13 +19,13 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <artifactId>bahir-parent_2.11</artifactId>
+    <artifactId>bahir-parent_2.12</artifactId>
     <groupId>org.apache.bahir</groupId>
     <version>2.4.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
-  <artifactId>spark-streaming-pubnub_2.11</artifactId>
+  <artifactId>spark-streaming-pubnub_2.12</artifactId>
   <properties>
     <sbt.project.name>streaming-pubnub</sbt.project.name>
   </properties>

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-pubsub/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-pubsub/pom.xml b/streaming-pubsub/pom.xml
index f6ecd37..3f92983 100644
--- a/streaming-pubsub/pom.xml
+++ b/streaming-pubsub/pom.xml
@@ -19,14 +19,14 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <artifactId>bahir-parent_2.11</artifactId>
+    <artifactId>bahir-parent_2.12</artifactId>
     <groupId>org.apache.bahir</groupId>
     <version>2.4.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
   <groupId>org.apache.bahir</groupId>
-  <artifactId>spark-streaming-pubsub_2.11</artifactId>
+  <artifactId>spark-streaming-pubsub_2.12</artifactId>
   <properties>
     <sbt.project.name>streaming-pubsub</sbt.project.name>
   </properties>

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-twitter/README.md
----------------------------------------------------------------------
diff --git a/streaming-twitter/README.md b/streaming-twitter/README.md
index 4123ea9..1703606 100644
--- a/streaming-twitter/README.md
+++ b/streaming-twitter/README.md
@@ -23,7 +23,7 @@ For example, to include it when starting the spark shell:
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.
 
-This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
 
 
 ## Examples

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-twitter/pom.xml b/streaming-twitter/pom.xml
index 2bf29b5..f031771 100644
--- a/streaming-twitter/pom.xml
+++ b/streaming-twitter/pom.xml
@@ -20,13 +20,13 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.bahir</groupId>
-    <artifactId>bahir-parent_2.11</artifactId>
+    <artifactId>bahir-parent_2.12</artifactId>
     <version>2.4.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
   <groupId>org.apache.bahir</groupId>
-  <artifactId>spark-streaming-twitter_2.11</artifactId>
+  <artifactId>spark-streaming-twitter_2.12</artifactId>
   <properties>
     <sbt.project.name>streaming-twitter</sbt.project.name>
   </properties>
@@ -72,7 +72,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>algebird-core_${scala.binary.version}</artifactId>
-      <version>0.11.0</version>
+      <version>0.12.4</version>
       <scope>test</scope>
     </dependency>
   </dependencies>

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-zeromq/README.md
----------------------------------------------------------------------
diff --git a/streaming-zeromq/README.md b/streaming-zeromq/README.md
index 8ced539..8d57424 100644
--- a/streaming-zeromq/README.md
+++ b/streaming-zeromq/README.md
@@ -24,7 +24,7 @@ For example, to include it when starting the spark shell:
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.
 
-This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+This library is cross-published for Scala 2.11 and Scala 2.12, so users should replace the proper Scala version in the commands listed above.
 
 ## Examples
 

http://git-wip-us.apache.org/repos/asf/bahir/blob/c51853d1/streaming-zeromq/pom.xml
----------------------------------------------------------------------
diff --git a/streaming-zeromq/pom.xml b/streaming-zeromq/pom.xml
index 5aaf2fa..ec1c4a7 100644
--- a/streaming-zeromq/pom.xml
+++ b/streaming-zeromq/pom.xml
@@ -20,13 +20,13 @@
   <modelVersion>4.0.0</modelVersion>
   <parent>
     <groupId>org.apache.bahir</groupId>
-    <artifactId>bahir-parent_2.11</artifactId>
+    <artifactId>bahir-parent_2.12</artifactId>
     <version>2.4.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
   <groupId>org.apache.bahir</groupId>
-  <artifactId>spark-streaming-zeromq_2.11</artifactId>
+  <artifactId>spark-streaming-zeromq_2.12</artifactId>
   <properties>
     <sbt.project.name>streaming-zeromq</sbt.project.name>
   </properties>