You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2017/07/17 17:19:59 UTC

[3/4] bahir-website git commit: Update documentation for Bahir for Spark extensions

Update documentation for Bahir for Spark extensions

Add templates for new extensions
Update script to process new extensions
Generate documentation for release 2.1.1 and current


Project: http://git-wip-us.apache.org/repos/asf/bahir-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-website/commit/21c1d626
Tree: http://git-wip-us.apache.org/repos/asf/bahir-website/tree/21c1d626
Diff: http://git-wip-us.apache.org/repos/asf/bahir-website/diff/21c1d626

Branch: refs/heads/master
Commit: 21c1d62684bc2e1bfe9c78c85f14372b7c1cf3c6
Parents: 6ac0428
Author: Luciano Resende <lr...@apache.org>
Authored: Mon Jul 17 10:12:59 2017 -0700
Committer: Luciano Resende <lr...@apache.org>
Committed: Mon Jul 17 10:12:59 2017 -0700

----------------------------------------------------------------------
 site/docs/spark/2.1.1/documentation.md          |  56 ++++
 site/docs/spark/2.1.1/spark-sql-cloudant.md     | 316 +++++++++++++++++++
 .../spark/2.1.1/spark-sql-streaming-akka.md     | 137 ++++++++
 .../spark/2.1.1/spark-sql-streaming-mqtt.md     | 146 +++++++++
 site/docs/spark/2.1.1/spark-streaming-akka.md   |  89 ++++++
 site/docs/spark/2.1.1/spark-streaming-mqtt.md   |  98 ++++++
 site/docs/spark/2.1.1/spark-streaming-pubsub.md |  71 +++++
 .../docs/spark/2.1.1/spark-streaming-twitter.md |  74 +++++
 site/docs/spark/2.1.1/spark-streaming-zeromq.md |  65 ++++
 site/docs/spark/current/documentation.md        |  12 +-
 site/docs/spark/current/spark-sql-cloudant.md   | 316 +++++++++++++++++++
 .../spark/current/spark-sql-streaming-akka.md   | 137 ++++++++
 .../spark/current/spark-sql-streaming-mqtt.md   |  13 +-
 site/docs/spark/current/spark-streaming-akka.md |   8 +-
 site/docs/spark/current/spark-streaming-mqtt.md |  14 +-
 .../spark/current/spark-streaming-pubsub.md     |  71 +++++
 .../spark/current/spark-streaming-twitter.md    |  12 +-
 .../spark/current/spark-streaming-zeromq.md     |  10 +-
 site/docs/spark/overview.md                     |   3 +-
 .../spark/templates/spark-sql-cloudant.template |  26 ++
 .../templates/spark-sql-streaming-akka.template |  26 ++
 .../templates/spark-streaming-pubsub.template   |  26 ++
 update-doc.sh                                   |   3 +
 23 files changed, 1700 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/2.1.1/documentation.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/2.1.1/documentation.md b/site/docs/spark/2.1.1/documentation.md
new file mode 100644
index 0000000..7eb03c6
--- /dev/null
+++ b/site/docs/spark/2.1.1/documentation.md
@@ -0,0 +1,56 @@
+---
+layout: page
+title: Extensions for Apache Spark
+description: Extensions for Apache Spark
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+### Apache Bahir Extensions for Apache Spark
+
+<br/>
+
+#### SQL  Data Sources
+
+[Apache CouchDB/Cloudant data source](../spark-sql-cloudant)
+
+<br/>
+
+#### Structured Streaming Data Sources
+
+[Akka data source](../spark-sql-streaming-akka)
+
+[MQTT data source](../spark-sql-streaming-mqtt)
+
+<br/>
+
+#### Discretized Streams (DStreams) Connectors
+
+[Akka connector](../spark-streaming-akka)
+
+[Google Cloud Pub/Sub connector](../spark-streaming-pubsub)
+
+[MQTT connector](../spark-streaming-mqtt)
+
+[Twitter connector](../spark-streaming-twitter)
+
+[ZeroMQ connector](../spark-streaming-zeromq)

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/2.1.1/spark-sql-cloudant.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/2.1.1/spark-sql-cloudant.md b/site/docs/spark/2.1.1/spark-sql-cloudant.md
new file mode 100644
index 0000000..7d256a5
--- /dev/null
+++ b/site/docs/spark/2.1.1/spark-sql-cloudant.md
@@ -0,0 +1,316 @@
+---
+layout: page
+title: Spark Data Source for Apache CouchDB/Cloudant
+description: Spark Data Source for Apache CouchDB/Cloudant
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+A library for reading data from Cloudant or CouchDB databases using Spark SQL and Spark Streaming.
+
+[IBM® Cloudant®](https://cloudant.com) is a document-oriented DataBase as a Service (DBaaS). It stores data as documents
+in JSON format. It's built with scalability, high availability, and durability in mind. It comes with a
+wide variety of indexing options including map-reduce, Cloudant Query, full-text indexing, and
+geospatial indexing. The replication capabilities make it easy to keep data in sync between database
+clusters, desktop PCs, and mobile devices.
+
+[Apache CouchDB™](http://couchdb.apache.org) is open source database software that focuses on ease of use and having an architecture that "completely embraces the Web". It has a document-oriented NoSQL database architecture and is implemented in the concurrency-oriented language Erlang; it uses JSON to store data, JavaScript as its query language using MapReduce, and HTTP for an API.
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-cloudant" % "2.1.1"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-sql-cloudant_2.11</artifactId>
+        <version>2.1.1</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-sql-cloudant_2.11:2.1.1
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+Submit a job in Python:
+
+    spark-submit  --master local[4] --jars <path to cloudant-spark.jar>  <path to python script>
+
+Submit a job in Scala:
+
+	spark-submit --class "<your class>" --master local[4] --jars <path to cloudant-spark.jar> <path to your app jar>
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+
+
+## Configuration options
+The configuration is obtained in the following sequence:
+
+1. default in the Config, which is set in the application.conf
+2. key in the SparkConf, which is set in SparkConf
+3. key in the parameters, which is set in a dataframe or temporaty table options
+4. "spark."+key in the SparkConf (as they are treated as the one passed in through spark-submit using --conf option)
+
+Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code.
+
+
+### Configuration in application.conf
+Default values are defined in [here](cloudant-spark-sql/src/main/resources/application.conf).
+
+### Configuration on SparkConf
+
+Name | Default | Meaning
+--- |:---:| ---
+cloudant.protocol|https|protocol to use to transfer data: http or https
+cloudant.host||cloudant host url
+cloudant.username||cloudant userid
+cloudant.password||cloudant password
+cloudant.useQuery|false|By default, _all_docs endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, _find endpoint will be used in place of _all_docs when query condition is not on primary key field (_id), so that query predicates may be driven into datastore.
+cloudant.queryLimit|25|The maximum number of results returned when querying the _find endpoint.
+jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition
+jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited
+jsonstore.rdd.minInPartition|10|the min rows in a partition.
+jsonstore.rdd.requestTimeout|900000| the request timeout in milliseconds
+bulkSize|200| the bulk save size
+schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs
+createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.
+
+### Configuration on Spark SQL Temporary Table or DataFrame
+
+Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS:
+
+Name | Default | Meaning
+--- |:---:| ---
+database||cloudant database name
+view||cloudant view w/o the database name. only used for load.
+index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results.
+path||cloudant: as database name if database is not present
+schemaSampleSize|"-1"| the sample size used to discover the schema for this temp table. -1 scans all documents
+bulkSize|200| the bulk save size
+createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised.
+
+For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compount field. An example of loading data from a view:
+
+```python
+spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')")
+
+```
+
+### Configuration on Cloudant Receiver for Spark Streaming
+
+Name | Default | Meaning
+--- |:---:| ---
+cloudant.host||cloudant host url
+cloudant.username||cloudant userid
+cloudant.password||cloudant password
+database||cloudant database name
+selector| all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark.
+
+
+### Configuration in spark-submit using --conf option
+
+The above stated configuration keys can also be set using `spark-submit --conf` option. When passing configuration in spark-submit, make sure adding "spark." as prefix to the keys.
+
+
+## Examples
+
+### Python API
+
+#### Using SQL In Python
+
+```python
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using temp tables")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .getOrCreate()
+
+
+# Loading temp table from Cloudant db
+spark.sql(" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
+airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
+airportData.printSchema()
+print 'Total # of rows in airportData: ' + str(airportData.count())
+for code in airportData.collect():
+    print code._id
+```
+
+See [CloudantApp.py](examples/python/CloudantApp.py) for examples.
+
+Submit job example:
+```
+spark-submit  --packages org.apache.bahir:spark-sql-cloudant_2.11:2.1.1 --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD sql-cloudant/examples/python/CloudantApp.py
+```
+
+#### Using DataFrame In Python
+
+```python
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using dataframes")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .config("jsonstore.rdd.partitions", 8)\
+    .getOrCreate()
+
+# ***1. Loading dataframe from Cloudant db
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+df.cache()
+df.printSchema()
+df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
+df.filter(df._id >= 'CAA').select("_id",'airportName').show()	    
+```
+
+See [CloudantDF.py](examples/python/CloudantDF.py) for examples.
+
+In case of doing multiple operations on a dataframe (select, filter etc.),
+you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again.
+Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`.  Alternatively for large dbs to persist in memory & disk, use:
+
+```python
+from pyspark import StorageLevel
+df.persist(storageLevel = StorageLevel(True, True, False, True, 1))
+```
+
+[Sample code](examples/python/CloudantDFOption.py) on using DataFrame option to define cloudant configuration
+
+### Scala API
+
+#### Using SQL In Scala
+
+```scala
+val spark = SparkSession
+      .builder()
+      .appName("Cloudant Spark SQL Example")
+      .config("cloudant.host","ACCOUNT.cloudant.com")
+      .config("cloudant.username", "USERNAME")
+      .config("cloudant.password","PASSWORD")
+      .getOrCreate()
+
+// For implicit conversions of Dataframe to RDDs
+import spark.implicits._
+
+// create a temp table from Cloudant db and query it using sql syntax
+spark.sql(
+    s"""
+    |CREATE TEMPORARY TABLE airportTable
+    |USING org.apache.bahir.cloudant
+    |OPTIONS ( database 'n_airportcodemapping')
+    """.stripMargin)
+// create a dataframe
+val airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
+airportData.printSchema()
+println(s"Total # of rows in airportData: " + airportData.count())
+// convert dataframe to array of Rows, and process each row
+airportData.map(t => "code: " + t(0) + ",name:" + t(1)).collect().foreach(println)
+```
+See [CloudantApp.scala](examples/scala/src/main/scala/mytest/spark/CloudantApp.scala) for examples.
+
+Submit job example:
+```
+spark-submit --class org.apache.spark.examples.sql.cloudant.CloudantApp --packages org.apache.bahir:spark-sql-cloudant_2.11:2.1.1 --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD  /path/to/spark-sql-cloudant_2.11-2.1.1-tests.jar
+```
+
+### Using DataFrame In Scala
+
+```scala
+val spark = SparkSession
+      .builder()
+      .appName("Cloudant Spark SQL Example with Dataframe")
+      .config("cloudant.host","ACCOUNT.cloudant.com")
+      .config("cloudant.username", "USERNAME")
+      .config("cloudant.password","PASSWORD")
+      .config("createDBOnSave","true") // to create a db on save
+      .config("jsonstore.rdd.partitions", "20") // using 20 partitions
+      .getOrCreate()
+
+// 1. Loading data from Cloudant db
+val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
+// Caching df in memory to speed computations
+// and not to retrieve data from cloudant again
+df.cache()
+df.printSchema()
+
+// 2. Saving dataframe to Cloudant db
+val df2 = df.filter(df("flightSegmentId") === "AA106")
+    .select("flightSegmentId","economyClassBaseCost")
+df2.show()
+df2.write.format("org.apache.bahir.cloudant").save("n_flight2")
+```
+
+See [CloudantDF.scala](examples/scala/src/main/scala/mytest/spark/CloudantDF.scala) for examples.
+
+[Sample code](examples/scala/src/main/scala/mytest/spark/CloudantDFOption.scala) on using DataFrame option to define Cloudant configuration.
+
+
+### Using Streams In Scala
+
+```scala
+val ssc = new StreamingContext(sparkConf, Seconds(10))
+val changes = ssc.receiverStream(new CloudantReceiver(Map(
+  "cloudant.host" -> "ACCOUNT.cloudant.com",
+  "cloudant.username" -> "USERNAME",
+  "cloudant.password" -> "PASSWORD",
+  "database" -> "n_airportcodemapping")))
+
+changes.foreachRDD((rdd: RDD[String], time: Time) => {
+  // Get the singleton instance of SparkSession
+  val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
+
+  println(s"========= $time =========")
+  // Convert RDD[String] to DataFrame
+  val changesDataFrame = spark.read.json(rdd)
+  if (!changesDataFrame.schema.isEmpty) {
+    changesDataFrame.printSchema()
+    changesDataFrame.select("*").show()
+    ....
+  }
+})
+ssc.start()
+// run streaming for 120 secs
+Thread.sleep(120000L)
+ssc.stop(true)
+
+```
+
+See [CloudantStreaming.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala) for examples.
+
+By default, Spark Streaming will load all documents from a database. If you want to limit the loading to
+specific documents, use `selector` option of `CloudantReceiver` and specify your conditions
+(See [CloudantStreamingSelector.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala)
+example for more details):
+
+```scala
+val changes = ssc.receiverStream(new CloudantReceiver(Map(
+  "cloudant.host" -> "ACCOUNT.cloudant.com",
+  "cloudant.username" -> "USERNAME",
+  "cloudant.password" -> "PASSWORD",
+  "database" -> "sales",
+  "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}")))
+```

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/2.1.1/spark-sql-streaming-akka.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/2.1.1/spark-sql-streaming-akka.md b/site/docs/spark/2.1.1/spark-sql-streaming-akka.md
new file mode 100644
index 0000000..61fb92e
--- /dev/null
+++ b/site/docs/spark/2.1.1/spark-sql-streaming-akka.md
@@ -0,0 +1,137 @@
+---
+layout: page
+title: Spark Structured Streaming Akka
+description: Spark Structured Streaming Akka
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+A library for reading data from Akka Actors using Spark SQL Streaming ( or Structured streaming.).
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-akka" % "2.1.1"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-sql-streaming-akka_2.11</artifactId>
+        <version>2.1.1</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-akka_2.11:2.1.1
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+
+## Examples
+
+A SQL Stream can be created with data streams received from Akka Feeder actor using,
+
+        sqlContext.readStream
+                .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                .option("urlOfPublisher", "feederActorUri")
+                .load()
+
+## Enable recovering from failures.
+
+Setting values for option `persistenceDirPath` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown.
+
+        sqlContext.readStream
+                .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                .option("urlOfPublisher", "feederActorUri")
+                .option("persistenceDirPath", "/path/to/localdir")
+                .load()
+
+## Configuration options.
+
+This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.4/akka/actor/Actor.html).
+
+* `urlOfPublisher` The url of Publisher or Feeder actor that the Receiver actor connects to. Set this as the tcp url of the Publisher or Feeder actor.
+* `persistenceDirPath` By default it is used for storing incoming messages on disk.
+
+### Scala API
+
+An example, for scala API to count words from incoming message stream.
+
+        // Create DataFrame representing the stream of input lines from connection
+        // to publisher or feeder actor
+        val lines = spark.readStream
+                    .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                    .option("urlOfPublisher", urlOfPublisher)
+                    .load().as[(String, Timestamp)]
+
+        // Split the lines into words
+        val words = lines.map(_._1).flatMap(_.split(" "))
+
+        // Generate running word count
+        val wordCounts = words.groupBy("value").count()
+
+        // Start running the query that prints the running counts to the console
+        val query = wordCounts.writeStream
+                    .outputMode("complete")
+                    .format("console")
+                    .start()
+
+        query.awaitTermination()
+
+Please see `AkkaStreamWordCount.scala` for full example.     
+
+### Java API
+
+An example, for Java API to count words from incoming message stream.
+
+        // Create DataFrame representing the stream of input lines from connection
+        // to publisher or feeder actor
+        Dataset<String> lines = spark
+                                .readStream()
+                                .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                                .option("urlOfPublisher", urlOfPublisher)
+                                .load().select("value").as(Encoders.STRING());
+
+        // Split the lines into words
+        Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+          @Override
+          public Iterator<String> call(String s) throws Exception {
+            return Arrays.asList(s.split(" ")).iterator();
+          }
+        }, Encoders.STRING());
+
+        // Generate running word count
+        Dataset<Row> wordCounts = words.groupBy("value").count();
+
+        // Start running the query that prints the running counts to the console
+        StreamingQuery query = wordCounts.writeStream()
+                                .outputMode("complete")
+                                .format("console")
+                                .start();
+
+        query.awaitTermination();   
+
+Please see `JavaAkkaStreamWordCount.java` for full example.      

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/2.1.1/spark-sql-streaming-mqtt.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/2.1.1/spark-sql-streaming-mqtt.md b/site/docs/spark/2.1.1/spark-sql-streaming-mqtt.md
new file mode 100644
index 0000000..a969feb
--- /dev/null
+++ b/site/docs/spark/2.1.1/spark-sql-streaming-mqtt.md
@@ -0,0 +1,146 @@
+---
+layout: page
+title: Spark Structured Streaming MQTT
+description: Spark Structured Streaming MQTT
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+A library for reading data from MQTT Servers using Spark SQL Streaming ( or Structured streaming.).
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.1.1"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-sql-streaming-mqtt_2.11</artifactId>
+        <version>2.1.1</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.1.1
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+
+## Examples
+
+A SQL Stream can be created with data streams received through MQTT Server using,
+
+    sqlContext.readStream
+        .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+        .option("topic", "mytopic")
+        .load("tcp://localhost:1883")
+
+## Enable recovering from failures.
+
+Setting values for option `localStorage` and `clientId` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown.
+
+    sqlContext.readStream
+        .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+        .option("topic", "mytopic")
+        .option("localStorage", "/path/to/localdir")
+        .option("clientId", "some-client-id")
+        .load("tcp://localhost:1883")
+
+## Configuration options.
+
+This source uses [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html).
+
+ * `brokerUrl` A url MqttClient connects to. Set this or `path` as the url of the Mqtt Server. e.g. tcp://localhost:1883.
+ * `persistence` By default it is used for storing incoming messages on disk. If `memory` is provided as value for this option, then recovery on restart is not supported.
+ * `topic` Topic MqttClient subscribes to.
+ * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client.
+ * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.
+ * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.
+ * `password` Sets the password to use for the connection.
+ * `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default.
+ * `connectionTimeout` Sets the connection timeout, a value of 0 is interpretted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information.
+ * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`.
+ * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`.
+
+### Scala API
+
+An example, for scala API to count words from incoming message stream.
+
+    // Create DataFrame representing the stream of input lines from connection to mqtt server
+    val lines = spark.readStream
+      .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+      .option("topic", topic)
+      .load(brokerUrl).as[(String, Timestamp)]
+
+    // Split the lines into words
+    val words = lines.map(_._1).flatMap(_.split(" "))
+
+    // Generate running word count
+    val wordCounts = words.groupBy("value").count()
+
+    // Start running the query that prints the running counts to the console
+    val query = wordCounts.writeStream
+      .outputMode("complete")
+      .format("console")
+      .start()
+
+    query.awaitTermination()
+
+Please see `MQTTStreamWordCount.scala` for full example.
+
+### Java API
+
+An example, for Java API to count words from incoming message stream.
+
+    // Create DataFrame representing the stream of input lines from connection to mqtt server.
+    Dataset<String> lines = spark
+            .readStream()
+            .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+            .option("topic", topic)
+            .load(brokerUrl).select("value").as(Encoders.STRING());
+
+    // Split the lines into words
+    Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+        @Override
+        public Iterator<String> call(String x) {
+            return Arrays.asList(x.split(" ")).iterator();
+        }
+    }, Encoders.STRING());
+
+    // Generate running word count
+    Dataset<Row> wordCounts = words.groupBy("value").count();
+
+    // Start running the query that prints the running counts to the console
+    StreamingQuery query = wordCounts.writeStream()
+            .outputMode("complete")
+            .format("console")
+            .start();
+
+    query.awaitTermination();
+
+Please see `JavaMQTTStreamWordCount.java` for full example.

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/2.1.1/spark-streaming-akka.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/2.1.1/spark-streaming-akka.md b/site/docs/spark/2.1.1/spark-streaming-akka.md
new file mode 100644
index 0000000..11561ae
--- /dev/null
+++ b/site/docs/spark/2.1.1/spark-streaming-akka.md
@@ -0,0 +1,89 @@
+---
+layout: page
+title: Spark Streaming Akka
+description: Spark Streaming Akka
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+A library for reading data from Akka Actors using Spark Streaming.
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-akka" % "2.1.1"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-akka_2.11</artifactId>
+        <version>2.1.1</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-akka_2.11:2.1.1
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+
+## Examples
+
+DStreams can be created with data streams received through Akka actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`.
+
+### Scala API
+
+You need to extend `ActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
+
+    class CustomActor extends ActorReceiver {
+      def receive = {
+        case data: String => store(data)
+      }
+    }
+
+    // A new input stream can be created with this custom actor as
+    val ssc: StreamingContext = ...
+    val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")
+
+### Java API
+
+You need to extend `JavaActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
+
+    class CustomActor extends JavaActorReceiver {
+        @Override
+        public void onReceive(Object msg) throws Exception {
+            store((String) msg);
+        }
+    }
+
+    // A new input stream can be created with this custom actor as
+    JavaStreamingContext jssc = ...;
+    JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");
+
+See end-to-end examples at [Akka Examples](https://github.com/apache/bahir/tree/master/streaming-akka/examples)

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/2.1.1/spark-streaming-mqtt.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/2.1.1/spark-streaming-mqtt.md b/site/docs/spark/2.1.1/spark-streaming-mqtt.md
new file mode 100644
index 0000000..a201a07
--- /dev/null
+++ b/site/docs/spark/2.1.1/spark-streaming-mqtt.md
@@ -0,0 +1,98 @@
+---
+layout: page
+title: Spark Structured Streaming MQTT
+description: Spark Structured Streaming MQTT
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+
+[MQTT](http://mqtt.org/) is MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-mqtt" % "2.1.1"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-mqtt_2.11</artifactId>
+        <version>2.1.1</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.1.1
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+
+## Configuration options.
+
+This source uses the [Eclipse Paho Java Client](https://eclipse.org/paho/clients/java/). Client API documentation is located [here](http://www.eclipse.org/paho/files/javadoc/index.html).
+
+ * `brokerUrl` A url MqttClient connects to. Set this as the url of the Mqtt Server. e.g. tcp://localhost:1883.
+ * `storageLevel` By default it is used for storing incoming messages on disk.
+ * `topic` Topic MqttClient subscribes to.
+ * `topics` List of topics MqttClient subscribes to.
+ * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client.
+ * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.
+ * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.
+ * `password` Sets the password to use for the connection.
+ * `cleanSession` Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default.
+ * `connectionTimeout` Sets the connection timeout, a value of 0 is interpreted as wait until client connects. See `MqttConnectOptions.setConnectionTimeout` for more information.
+ * `keepAlive` Same as `MqttConnectOptions.setKeepAliveInterval`.
+ * `mqttVersion` Same as `MqttConnectOptions.setMqttVersion`.
+
+
+## Examples
+
+### Scala API
+
+You need to extend `ActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
+
+    val lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
+    val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic)
+
+Additional mqtt connection options can be provided:
+
+```Scala
+val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+```
+
+### Java API
+
+You need to extend `JavaActorReceiver` so as to store received data into Spark using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
+
+    JavaDStream<String> lines = MQTTUtils.createStream(jssc, brokerUrl, topic);
+    JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedStream(jssc, brokerUrl, topics);
+
+See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples)

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/2.1.1/spark-streaming-pubsub.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/2.1.1/spark-streaming-pubsub.md b/site/docs/spark/2.1.1/spark-streaming-pubsub.md
new file mode 100644
index 0000000..1b3ba5a
--- /dev/null
+++ b/site/docs/spark/2.1.1/spark-streaming-pubsub.md
@@ -0,0 +1,71 @@
+---
+layout: page
+title: Spark Streaming Google Pub-Sub
+description: Spark Streaming Google Pub-Sub
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+A library for reading data from [Google Cloud Pub/Sub](https://cloud.google.com/pubsub/) using Spark Streaming.
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % "2.1.1"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-pubsub_2.11</artifactId>
+        <version>2.1.1</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-pubsub_2.11:2.1.1
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+## Examples
+
+First you need to create credential by SparkGCPCredentials, it support four type of credentials
+* application default
+    `SparkGCPCredentials.builder.build()`
+* json type service account
+    `SparkGCPCredentials.builder.jsonServiceAccount(PATH_TO_JSON_KEY).build()`
+* p12 type service account
+    `SparkGCPCredentials.builder.p12ServiceAccount(PATH_TO_P12_KEY, EMAIL_ACCOUNT).build()`
+* metadata service account(running on dataproc)
+    `SparkGCPCredentials.builder.metadataServiceAccount().build()`
+
+### Scala API
+
+    val lines = PubsubUtils.createStream(ssc, projectId, subscriptionName, credential, ..)
+
+### Java API
+
+    JavaDStream<SparkPubsubMessage> lines = PubsubUtils.createStream(jssc, projectId, subscriptionName, credential...)
+
+See end-to-end examples at [Google Cloud Pubsub Examples](streaming-pubsub/examples)

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/2.1.1/spark-streaming-twitter.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/2.1.1/spark-streaming-twitter.md b/site/docs/spark/2.1.1/spark-streaming-twitter.md
new file mode 100644
index 0000000..3fed7bc
--- /dev/null
+++ b/site/docs/spark/2.1.1/spark-streaming-twitter.md
@@ -0,0 +1,74 @@
+---
+layout: page
+title: Spark Streaming Twitter
+description: Spark Streaming Twitter
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+A library for reading social data from [twitter](http://twitter.com/) using Spark Streaming.
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.1.1"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-twitter_2.11</artifactId>
+        <version>2.1.1</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-twitter_2.11:2.1.1
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+
+
+## Examples
+
+`TwitterUtils` uses Twitter4j to get the public stream of tweets using [Twitter's Streaming API](https://dev.twitter.com/docs/streaming-apis). Authentication information
+can be provided by any of the [methods](http://twitter4j.org/en/configuration.html) supported by Twitter4J library. You can import the `TwitterUtils` class and create a DStream with `TwitterUtils.createStream` as shown below.
+
+### Scala API
+
+    import org.apache.spark.streaming.twitter._
+
+    TwitterUtils.createStream(ssc, None)
+
+### Java API
+
+    import org.apache.spark.streaming.twitter.*;
+
+    TwitterUtils.createStream(jssc);
+
+
+You can also either get the public stream, or get the filtered stream based on keywords.
+See end-to-end examples at [Twitter Examples](https://github.com/apache/bahir/tree/master/streaming-twitter/examples)

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/2.1.1/spark-streaming-zeromq.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/2.1.1/spark-streaming-zeromq.md b/site/docs/spark/2.1.1/spark-streaming-zeromq.md
new file mode 100644
index 0000000..df9062c
--- /dev/null
+++ b/site/docs/spark/2.1.1/spark-streaming-zeromq.md
@@ -0,0 +1,65 @@
+---
+layout: page
+title: Spark Streaming ZeroMQ
+description: Spark Streaming ZeroMQ
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+
+A library for reading data from [ZeroMQ](http://zeromq.org/) using Spark Streaming.
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-zeromq" % "2.1.1"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-zeromq_2.11</artifactId>
+        <version>2.1.1</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-zeromq_2.11:2.1.1
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.
+
+## Examples
+
+
+### Scala API
+
+    val lines = ZeroMQUtils.createStream(ssc, ...)
+
+### Java API
+
+    JavaDStream<String> lines = ZeroMQUtils.createStream(jssc, ...);
+
+See end-to-end examples at [ZeroMQ Examples](https://github.com/apache/bahir/tree/master/streaming-zeromq/examples)

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/current/documentation.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/current/documentation.md b/site/docs/spark/current/documentation.md
index 38148e9..7eb03c6 100644
--- a/site/docs/spark/current/documentation.md
+++ b/site/docs/spark/current/documentation.md
@@ -29,8 +29,16 @@ limitations under the License.
 
 <br/>
 
+#### SQL  Data Sources
+
+[Apache CouchDB/Cloudant data source](../spark-sql-cloudant)
+
+<br/>
+
 #### Structured Streaming Data Sources
 
+[Akka data source](../spark-sql-streaming-akka)
+
 [MQTT data source](../spark-sql-streaming-mqtt)
 
 <br/>
@@ -39,8 +47,10 @@ limitations under the License.
 
 [Akka connector](../spark-streaming-akka)
 
+[Google Cloud Pub/Sub connector](../spark-streaming-pubsub)
+
 [MQTT connector](../spark-streaming-mqtt)
 
 [Twitter connector](../spark-streaming-twitter)
 
-[ZeroMQ connector](../spark-streaming-zeromq)
\ No newline at end of file
+[ZeroMQ connector](../spark-streaming-zeromq)

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/current/spark-sql-cloudant.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/current/spark-sql-cloudant.md b/site/docs/spark/current/spark-sql-cloudant.md
new file mode 100644
index 0000000..1065f29
--- /dev/null
+++ b/site/docs/spark/current/spark-sql-cloudant.md
@@ -0,0 +1,316 @@
+---
+layout: page
+title: Spark Data Source for Apache CouchDB/Cloudant
+description: Spark Data Source for Apache CouchDB/Cloudant
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+A library for reading data from Cloudant or CouchDB databases using Spark SQL and Spark Streaming. 
+
+[IBM® Cloudant®](https://cloudant.com) is a document-oriented DataBase as a Service (DBaaS). It stores data as documents 
+in JSON format. It's built with scalability, high availability, and durability in mind. It comes with a 
+wide variety of indexing options including map-reduce, Cloudant Query, full-text indexing, and 
+geospatial indexing. The replication capabilities make it easy to keep data in sync between database 
+clusters, desktop PCs, and mobile devices.
+
+[Apache CouchDB™](http://couchdb.apache.org) is open source database software that focuses on ease of use and having an architecture that "completely embraces the Web". It has a document-oriented NoSQL database architecture and is implemented in the concurrency-oriented language Erlang; it uses JSON to store data, JavaScript as its query language using MapReduce, and HTTP for an API.
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-cloudant" % "2.2.0-SNAPSHOT"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-sql-cloudant_2.11</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+Submit a job in Python:
+    
+    spark-submit  --master local[4] --jars <path to cloudant-spark.jar>  <path to python script> 
+    
+Submit a job in Scala:
+
+	spark-submit --class "<your class>" --master local[4] --jars <path to cloudant-spark.jar> <path to your app jar>
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+
+
+## Configuration options	
+The configuration is obtained in the following sequence:
+
+1. default in the Config, which is set in the application.conf
+2. key in the SparkConf, which is set in SparkConf
+3. key in the parameters, which is set in a dataframe or temporaty table options
+4. "spark."+key in the SparkConf (as they are treated as the one passed in through spark-submit using --conf option)
+
+Here each subsequent configuration overrides the previous one. Thus, configuration set using DataFrame option overrides what has beens set in SparkConf. And configuration passed in spark-submit using --conf takes precedence over any setting in the code.
+
+
+### Configuration in application.conf
+Default values are defined in [here](cloudant-spark-sql/src/main/resources/application.conf).
+
+### Configuration on SparkConf
+
+Name | Default | Meaning
+--- |:---:| ---
+cloudant.protocol|https|protocol to use to transfer data: http or https
+cloudant.host||cloudant host url
+cloudant.username||cloudant userid
+cloudant.password||cloudant password
+cloudant.useQuery|false|By default, _all_docs endpoint is used if configuration 'view' and 'index' (see below) are not set. When useQuery is enabled, _find endpoint will be used in place of _all_docs when query condition is not on primary key field (_id), so that query predicates may be driven into datastore. 
+cloudant.queryLimit|25|The maximum number of results returned when querying the _find endpoint.
+jsonstore.rdd.partitions|10|the number of partitions intent used to drive JsonStoreRDD loading query result in parallel. The actual number is calculated based on total rows returned and satisfying maxInPartition and minInPartition
+jsonstore.rdd.maxInPartition|-1|the max rows in a partition. -1 means unlimited
+jsonstore.rdd.minInPartition|10|the min rows in a partition.
+jsonstore.rdd.requestTimeout|900000| the request timeout in milliseconds
+bulkSize|200| the bulk save size
+schemaSampleSize| "-1" | the sample size for RDD schema discovery. 1 means we are using only first document for schema discovery; -1 means all documents; 0 will be treated as 1; any number N means min(N, total) docs 
+createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. 
+
+### Configuration on Spark SQL Temporary Table or DataFrame
+
+Besides all the configurations passed to a temporary table or dataframe through SparkConf, it is also possible to set the following configurations in temporary table or dataframe using OPTIONS: 
+
+Name | Default | Meaning
+--- |:---:| ---
+database||cloudant database name
+view||cloudant view w/o the database name. only used for load.
+index||cloudant search index w/o the database name. only used for load data with less than or equal to 200 results.
+path||cloudant: as database name if database is not present
+schemaSampleSize|"-1"| the sample size used to discover the schema for this temp table. -1 scans all documents
+bulkSize|200| the bulk save size
+createDBOnSave|"false"| whether to create a new database during save operation. If false, a database should already exist. If true, a new database will be created. If true, and a database with a provided name already exists, an error will be raised. 
+
+For fast loading, views are loaded without include_docs. Thus, a derived schema will always be: `{id, key, value}`, where `value `can be a compount field. An example of loading data from a view: 
+
+```python
+spark.sql(" CREATE TEMPORARY TABLE flightTable1 USING org.apache.bahir.cloudant OPTIONS ( database 'n_flight', view '_design/view/_view/AA0')")
+
+```
+
+### Configuration on Cloudant Receiver for Spark Streaming
+
+Name | Default | Meaning
+--- |:---:| ---
+cloudant.host||cloudant host url
+cloudant.username||cloudant userid
+cloudant.password||cloudant password
+database||cloudant database name
+selector| all documents| a selector written in Cloudant Query syntax, specifying conditions for selecting documents. Only documents satisfying the selector's conditions will be retrieved from Cloudant and loaded into Spark.
+
+
+### Configuration in spark-submit using --conf option
+
+The above stated configuration keys can also be set using `spark-submit --conf` option. When passing configuration in spark-submit, make sure adding "spark." as prefix to the keys.
+
+
+## Examples
+
+### Python API
+
+#### Using SQL In Python 
+	
+```python
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using temp tables")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .getOrCreate()
+
+
+# Loading temp table from Cloudant db
+spark.sql(" CREATE TEMPORARY TABLE airportTable USING org.apache.bahir.cloudant OPTIONS ( database 'n_airportcodemapping')")
+airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
+airportData.printSchema()
+print 'Total # of rows in airportData: ' + str(airportData.count())
+for code in airportData.collect():
+    print code._id
+```
+
+See [CloudantApp.py](examples/python/CloudantApp.py) for examples.
+
+Submit job example:
+```
+spark-submit  --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD sql-cloudant/examples/python/CloudantApp.py
+```
+
+#### Using DataFrame In Python 
+
+```python
+spark = SparkSession\
+    .builder\
+    .appName("Cloudant Spark SQL Example in Python using dataframes")\
+    .config("cloudant.host","ACCOUNT.cloudant.com")\
+    .config("cloudant.username", "USERNAME")\
+    .config("cloudant.password","PASSWORD")\
+    .config("jsonstore.rdd.partitions", 8)\
+    .getOrCreate()
+
+# ***1. Loading dataframe from Cloudant db
+df = spark.read.load("n_airportcodemapping", "org.apache.bahir.cloudant")
+df.cache() 
+df.printSchema()
+df.filter(df.airportName >= 'Moscow').select("_id",'airportName').show()
+df.filter(df._id >= 'CAA').select("_id",'airportName').show()	    
+```
+
+See [CloudantDF.py](examples/python/CloudantDF.py) for examples.
+	
+In case of doing multiple operations on a dataframe (select, filter etc.),
+you should persist a dataframe. Otherwise, every operation on a dataframe will load the same data from Cloudant again.
+Persisting will also speed up computation. This statement will persist an RDD in memory: `df.cache()`.  Alternatively for large dbs to persist in memory & disk, use: 
+
+```python
+from pyspark import StorageLevel
+df.persist(storageLevel = StorageLevel(True, True, False, True, 1))
+```
+
+[Sample code](examples/python/CloudantDFOption.py) on using DataFrame option to define cloudant configuration
+
+### Scala API
+
+#### Using SQL In Scala 
+
+```scala
+val spark = SparkSession
+      .builder()
+      .appName("Cloudant Spark SQL Example")
+      .config("cloudant.host","ACCOUNT.cloudant.com")
+      .config("cloudant.username", "USERNAME")
+      .config("cloudant.password","PASSWORD")
+      .getOrCreate()
+
+// For implicit conversions of Dataframe to RDDs
+import spark.implicits._
+    
+// create a temp table from Cloudant db and query it using sql syntax
+spark.sql(
+    s"""
+    |CREATE TEMPORARY TABLE airportTable
+    |USING org.apache.bahir.cloudant
+    |OPTIONS ( database 'n_airportcodemapping')
+    """.stripMargin)
+// create a dataframe
+val airportData = spark.sql("SELECT _id, airportName FROM airportTable WHERE _id >= 'CAA' AND _id <= 'GAA' ORDER BY _id")
+airportData.printSchema()
+println(s"Total # of rows in airportData: " + airportData.count())
+// convert dataframe to array of Rows, and process each row
+airportData.map(t => "code: " + t(0) + ",name:" + t(1)).collect().foreach(println)
+```
+See [CloudantApp.scala](examples/scala/src/main/scala/mytest/spark/CloudantApp.scala) for examples.
+
+Submit job example:
+```
+spark-submit --class org.apache.spark.examples.sql.cloudant.CloudantApp --packages org.apache.bahir:spark-sql-cloudant_2.11:2.2.0-SNAPSHOT --conf spark.cloudant.host=ACCOUNT.cloudant.com --conf spark.cloudant.username=USERNAME --conf spark.cloudant.password=PASSWORD  /path/to/spark-sql-cloudant_2.11-2.2.0-SNAPSHOT-tests.jar
+```
+
+### Using DataFrame In Scala 
+
+```scala
+val spark = SparkSession
+      .builder()
+      .appName("Cloudant Spark SQL Example with Dataframe")
+      .config("cloudant.host","ACCOUNT.cloudant.com")
+      .config("cloudant.username", "USERNAME")
+      .config("cloudant.password","PASSWORD")
+      .config("createDBOnSave","true") // to create a db on save
+      .config("jsonstore.rdd.partitions", "20") // using 20 partitions
+      .getOrCreate()
+          
+// 1. Loading data from Cloudant db
+val df = spark.read.format("org.apache.bahir.cloudant").load("n_flight")
+// Caching df in memory to speed computations
+// and not to retrieve data from cloudant again
+df.cache() 
+df.printSchema()
+
+// 2. Saving dataframe to Cloudant db
+val df2 = df.filter(df("flightSegmentId") === "AA106")
+    .select("flightSegmentId","economyClassBaseCost")
+df2.show()
+df2.write.format("org.apache.bahir.cloudant").save("n_flight2")
+```
+
+See [CloudantDF.scala](examples/scala/src/main/scala/mytest/spark/CloudantDF.scala) for examples.
+    
+[Sample code](examples/scala/src/main/scala/mytest/spark/CloudantDFOption.scala) on using DataFrame option to define Cloudant configuration.
+ 
+ 
+### Using Streams In Scala 
+
+```scala
+val ssc = new StreamingContext(sparkConf, Seconds(10))
+val changes = ssc.receiverStream(new CloudantReceiver(Map(
+  "cloudant.host" -> "ACCOUNT.cloudant.com",
+  "cloudant.username" -> "USERNAME",
+  "cloudant.password" -> "PASSWORD",
+  "database" -> "n_airportcodemapping")))
+
+changes.foreachRDD((rdd: RDD[String], time: Time) => {
+  // Get the singleton instance of SparkSession
+  val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
+
+  println(s"========= $time =========")
+  // Convert RDD[String] to DataFrame
+  val changesDataFrame = spark.read.json(rdd)
+  if (!changesDataFrame.schema.isEmpty) {
+    changesDataFrame.printSchema()
+    changesDataFrame.select("*").show()
+    ....
+  }
+})
+ssc.start()
+// run streaming for 120 secs
+Thread.sleep(120000L)
+ssc.stop(true)
+	
+```
+
+See [CloudantStreaming.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreaming.scala) for examples.
+
+By default, Spark Streaming will load all documents from a database. If you want to limit the loading to 
+specific documents, use `selector` option of `CloudantReceiver` and specify your conditions 
+(See [CloudantStreamingSelector.scala](examples/scala/src/main/scala/mytest/spark/CloudantStreamingSelector.scala)
+example for more details):
+
+```scala
+val changes = ssc.receiverStream(new CloudantReceiver(Map(
+  "cloudant.host" -> "ACCOUNT.cloudant.com",
+  "cloudant.username" -> "USERNAME",
+  "cloudant.password" -> "PASSWORD",
+  "database" -> "sales",
+  "selector" -> "{\"month\":\"May\", \"rep\":\"John\"}")))
+```

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/current/spark-sql-streaming-akka.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/current/spark-sql-streaming-akka.md b/site/docs/spark/current/spark-sql-streaming-akka.md
new file mode 100644
index 0000000..40ad088
--- /dev/null
+++ b/site/docs/spark/current/spark-sql-streaming-akka.md
@@ -0,0 +1,137 @@
+---
+layout: page
+title: Spark Structured Streaming Akka
+description: Spark Structured Streaming Akka
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+A library for reading data from Akka Actors using Spark SQL Streaming ( or Structured streaming.). 
+
+## Linking
+
+Using SBT:
+
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-akka" % "2.2.0-SNAPSHOT"
+
+Using Maven:
+
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-sql-streaming-akka_2.11</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-akka_2.11:2.2.0-SNAPSHOT
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+This library is compiled for Scala 2.11 only, and intends to support Spark 2.0 onwards.
+
+## Examples
+
+A SQL Stream can be created with data streams received from Akka Feeder actor using,
+
+        sqlContext.readStream
+                .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                .option("urlOfPublisher", "feederActorUri")
+                .load()
+                
+## Enable recovering from failures.
+                
+Setting values for option `persistenceDirPath` helps in recovering in case of a restart, by restoring the state where it left off before the shutdown.
+                
+        sqlContext.readStream
+                .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                .option("urlOfPublisher", "feederActorUri")
+                .option("persistenceDirPath", "/path/to/localdir")
+                .load() 
+                       
+## Configuration options.
+                       
+This source uses [Akka Actor api](http://doc.akka.io/api/akka/2.4/akka/actor/Actor.html).
+                       
+* `urlOfPublisher` The url of Publisher or Feeder actor that the Receiver actor connects to. Set this as the tcp url of the Publisher or Feeder actor.
+* `persistenceDirPath` By default it is used for storing incoming messages on disk.
+
+### Scala API
+
+An example, for scala API to count words from incoming message stream. 
+
+        // Create DataFrame representing the stream of input lines from connection
+        // to publisher or feeder actor
+        val lines = spark.readStream
+                    .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                    .option("urlOfPublisher", urlOfPublisher)
+                    .load().as[(String, Timestamp)]
+    
+        // Split the lines into words
+        val words = lines.map(_._1).flatMap(_.split(" "))
+    
+        // Generate running word count
+        val wordCounts = words.groupBy("value").count()
+    
+        // Start running the query that prints the running counts to the console
+        val query = wordCounts.writeStream
+                    .outputMode("complete")
+                    .format("console")
+                    .start()
+    
+        query.awaitTermination()
+        
+Please see `AkkaStreamWordCount.scala` for full example.     
+   
+### Java API
+   
+An example, for Java API to count words from incoming message stream.
+   
+        // Create DataFrame representing the stream of input lines from connection
+        // to publisher or feeder actor
+        Dataset<String> lines = spark
+                                .readStream()
+                                .format("org.apache.bahir.sql.streaming.akka.AkkaStreamSourceProvider")
+                                .option("urlOfPublisher", urlOfPublisher)
+                                .load().select("value").as(Encoders.STRING());
+    
+        // Split the lines into words
+        Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+          @Override
+          public Iterator<String> call(String s) throws Exception {
+            return Arrays.asList(s.split(" ")).iterator();
+          }
+        }, Encoders.STRING());
+    
+        // Generate running word count
+        Dataset<Row> wordCounts = words.groupBy("value").count();
+    
+        // Start running the query that prints the running counts to the console
+        StreamingQuery query = wordCounts.writeStream()
+                                .outputMode("complete")
+                                .format("console")
+                                .start();
+    
+        query.awaitTermination();   
+         
+Please see `JavaAkkaStreamWordCount.java` for full example.      

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/current/spark-sql-streaming-mqtt.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/current/spark-sql-streaming-mqtt.md b/site/docs/spark/current/spark-sql-streaming-mqtt.md
index eeda845..83d9c82 100644
--- a/site/docs/spark/current/spark-sql-streaming-mqtt.md
+++ b/site/docs/spark/current/spark-sql-streaming-mqtt.md
@@ -25,26 +25,26 @@ limitations under the License.
 
 {% include JB/setup %}
 
-A library for reading data from MQTT Servers using Spark SQL Streaming ( or Structured streaming.). 
+A library for reading data from MQTT Servers using Spark SQL Streaming ( or Structured streaming.).
 
 ## Linking
 
 Using SBT:
 
-    libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.1.0"
+    libraryDependencies += "org.apache.bahir" %% "spark-sql-streaming-mqtt" % "2.1.0-SNAPSHOT"
 
 Using Maven:
 
     <dependency>
         <groupId>org.apache.bahir</groupId>
         <artifactId>spark-sql-streaming-mqtt_2.11</artifactId>
-        <version>2.1.0</version>
+        <version>2.2.0-SNAPSHOT</version>
     </dependency>
 
 This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
 For example, to include it when starting the spark shell:
 
-    $ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.1.0
+    $ bin/spark-shell --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.1.0-SNAPSHOT
 
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.
@@ -89,7 +89,7 @@ This source uses [Eclipse Paho Java Client](https://eclipse.org/paho/clients/jav
 
 ### Scala API
 
-An example, for scala API to count words from incoming message stream. 
+An example, for scala API to count words from incoming message stream.
 
     // Create DataFrame representing the stream of input lines from connection to mqtt server
     val lines = spark.readStream
@@ -115,7 +115,7 @@ Please see `MQTTStreamWordCount.scala` for full example.
 
 ### Java API
 
-An example, for Java API to count words from incoming message stream. 
+An example, for Java API to count words from incoming message stream.
 
     // Create DataFrame representing the stream of input lines from connection to mqtt server.
     Dataset<String> lines = spark
@@ -144,4 +144,3 @@ An example, for Java API to count words from incoming message stream.
     query.awaitTermination();
 
 Please see `JavaMQTTStreamWordCount.java` for full example.
-

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/current/spark-streaming-akka.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/current/spark-streaming-akka.md b/site/docs/spark/current/spark-streaming-akka.md
index 57ef91d..17a91bf 100644
--- a/site/docs/spark/current/spark-streaming-akka.md
+++ b/site/docs/spark/current/spark-streaming-akka.md
@@ -25,26 +25,26 @@ limitations under the License.
 
 {% include JB/setup %}
 
-A library for reading data from Akka Actors using Spark Streaming. 
+A library for reading data from Akka Actors using Spark Streaming.
 
 ## Linking
 
 Using SBT:
 
-    libraryDependencies += "org.apache.bahir" %% "spark-streaming-akka" % "2.1.0"
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-akka" % "2.1.0-SNAPSHOT"
 
 Using Maven:
 
     <dependency>
         <groupId>org.apache.bahir</groupId>
         <artifactId>spark-streaming-akka_2.11</artifactId>
-        <version>2.1.0</version>
+        <version>2.2.0-SNAPSHOT</version>
     </dependency>
 
 This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
 For example, to include it when starting the spark shell:
 
-    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-akka_2.11:2.1.0
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-akka_2.11:2.1.0-SNAPSHOT
 
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/current/spark-streaming-mqtt.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/current/spark-streaming-mqtt.md b/site/docs/spark/current/spark-streaming-mqtt.md
index 38d2922..14b072b 100644
--- a/site/docs/spark/current/spark-streaming-mqtt.md
+++ b/site/docs/spark/current/spark-streaming-mqtt.md
@@ -26,26 +26,26 @@ limitations under the License.
 {% include JB/setup %}
 
 
-[MQTT](http://mqtt.org/) is MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. 
+[MQTT](http://mqtt.org/) is MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.
 
 ## Linking
 
 Using SBT:
 
-    libraryDependencies += "org.apache.bahir" %% "spark-streaming-mqtt" % "2.1.0"
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-mqtt" % "2.1.0-SNAPSHOT"
 
 Using Maven:
 
     <dependency>
         <groupId>org.apache.bahir</groupId>
         <artifactId>spark-streaming-mqtt_2.11</artifactId>
-        <version>2.1.0</version>
+        <version>2.2.0-SNAPSHOT</version>
     </dependency>
 
 This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
 For example, to include it when starting the spark shell:
 
-    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.1.0
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.1.0-SNAPSHOT
 
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.
@@ -59,6 +59,7 @@ This source uses the [Eclipse Paho Java Client](https://eclipse.org/paho/clients
  * `brokerUrl` A url MqttClient connects to. Set this as the url of the Mqtt Server. e.g. tcp://localhost:1883.
  * `storageLevel` By default it is used for storing incoming messages on disk.
  * `topic` Topic MqttClient subscribes to.
+ * `topics` List of topics MqttClient subscribes to.
  * `clientId` clientId, this client is assoicated with. Provide the same value to recover a stopped client.
  * `QoS` The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.
  * `username` Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.
@@ -77,11 +78,13 @@ You need to extend `ActorReceiver` so as to store received data into Spark using
 this actor can be configured to handle failures, etc.
 
     val lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
+    val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic)
 
 Additional mqtt connection options can be provided:
 
 ```Scala
 val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
 ```
 
 ### Java API
@@ -90,5 +93,6 @@ You need to extend `JavaActorReceiver` so as to store received data into Spark u
 this actor can be configured to handle failures, etc.
 
     JavaDStream<String> lines = MQTTUtils.createStream(jssc, brokerUrl, topic);
+    JavaReceiverInputDStream<Tuple2<String, String>> lines = MQTTUtils.createPairedStream(jssc, brokerUrl, topics);
 
-See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples)
\ No newline at end of file
+See end-to-end examples at [MQTT Examples](https://github.com/apache/bahir/tree/master/streaming-mqtt/examples)

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/current/spark-streaming-pubsub.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/current/spark-streaming-pubsub.md b/site/docs/spark/current/spark-streaming-pubsub.md
new file mode 100644
index 0000000..6e36a60
--- /dev/null
+++ b/site/docs/spark/current/spark-streaming-pubsub.md
@@ -0,0 +1,71 @@
+---
+layout: page
+title: Spark Streaming Google Pub-Sub
+description: Spark Streaming Google Pub-Sub
+group: nav-right
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+{% include JB/setup %}
+A library for reading data from [Google Cloud Pub/Sub](https://cloud.google.com/pubsub/) using Spark Streaming.
+
+## Linking
+
+Using SBT:
+    
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % "2.2.0-SNAPSHOT"
+    
+Using Maven:
+    
+    <dependency>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>spark-streaming-pubsub_2.11</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </dependency>
+
+This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
+For example, to include it when starting the spark shell:
+
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-pubsub_2.11:2.2.0-SNAPSHOT
+
+Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
+The `--packages` argument can also be used with `bin/spark-submit`.
+
+## Examples
+
+First you need to create credential by SparkGCPCredentials, it support four type of credentials
+* application default
+    `SparkGCPCredentials.builder.build()`
+* json type service account
+    `SparkGCPCredentials.builder.jsonServiceAccount(PATH_TO_JSON_KEY).build()`
+* p12 type service account
+    `SparkGCPCredentials.builder.p12ServiceAccount(PATH_TO_P12_KEY, EMAIL_ACCOUNT).build()`
+* metadata service account(running on dataproc)
+    `SparkGCPCredentials.builder.metadataServiceAccount().build()`
+
+### Scala API
+    
+    val lines = PubsubUtils.createStream(ssc, projectId, subscriptionName, credential, ..)
+    
+### Java API
+    
+    JavaDStream<SparkPubsubMessage> lines = PubsubUtils.createStream(jssc, projectId, subscriptionName, credential...) 
+
+See end-to-end examples at [Google Cloud Pubsub Examples](streaming-pubsub/examples)

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/current/spark-streaming-twitter.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/current/spark-streaming-twitter.md b/site/docs/spark/current/spark-streaming-twitter.md
index 9e05f07..74029af 100644
--- a/site/docs/spark/current/spark-streaming-twitter.md
+++ b/site/docs/spark/current/spark-streaming-twitter.md
@@ -25,26 +25,26 @@ limitations under the License.
 
 {% include JB/setup %}
 
-A library for reading social data from [twitter](http://twitter.com/) using Spark Streaming. 
+A library for reading social data from [twitter](http://twitter.com/) using Spark Streaming.
 
 ## Linking
 
 Using SBT:
 
-    libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.1.0"
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.1.0-SNAPSHOT"
 
 Using Maven:
 
     <dependency>
         <groupId>org.apache.bahir</groupId>
         <artifactId>spark-streaming-twitter_2.11</artifactId>
-        <version>2.1.0</version>
+        <version>2.2.0-SNAPSHOT</version>
     </dependency>
 
 This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
 For example, to include it when starting the spark shell:
 
-    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-twitter_2.11:2.1.0
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-twitter_2.11:2.1.0-SNAPSHOT
 
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.
@@ -70,5 +70,5 @@ can be provided by any of the [methods](http://twitter4j.org/en/configuration.ht
     TwitterUtils.createStream(jssc);
 
 
-You can also either get the public stream, or get the filtered stream based on keywords. 
-See end-to-end examples at [Twitter Examples](https://github.com/apache/bahir/tree/master/streaming-twitter/examples)
\ No newline at end of file
+You can also either get the public stream, or get the filtered stream based on keywords.
+See end-to-end examples at [Twitter Examples](https://github.com/apache/bahir/tree/master/streaming-twitter/examples)

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/current/spark-streaming-zeromq.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/current/spark-streaming-zeromq.md b/site/docs/spark/current/spark-streaming-zeromq.md
index cf10156..828aa1c 100644
--- a/site/docs/spark/current/spark-streaming-zeromq.md
+++ b/site/docs/spark/current/spark-streaming-zeromq.md
@@ -25,26 +25,26 @@ limitations under the License.
 
 {% include JB/setup %}
 
-A library for reading data from [ZeroMQ](http://zeromq.org/) using Spark Streaming. 
+A library for reading data from [ZeroMQ](http://zeromq.org/) using Spark Streaming.
 
 ## Linking
 
 Using SBT:
 
-    libraryDependencies += "org.apache.bahir" %% "spark-streaming-zeromq" % "2.1.0"
+    libraryDependencies += "org.apache.bahir" %% "spark-streaming-zeromq" % "2.1.0-SNAPSHOT"
 
 Using Maven:
 
     <dependency>
         <groupId>org.apache.bahir</groupId>
         <artifactId>spark-streaming-zeromq_2.11</artifactId>
-        <version>2.1.0</version>
+        <version>2.2.0-SNAPSHOT</version>
     </dependency>
 
 This library can also be added to Spark jobs launched through `spark-shell` or `spark-submit` by using the `--packages` command line option.
 For example, to include it when starting the spark shell:
 
-    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-zeromq_2.11:2.1.0
+    $ bin/spark-shell --packages org.apache.bahir:spark-streaming-zeromq_2.11:2.1.0-SNAPSHOT
 
 Unlike using `--jars`, using `--packages` ensures that this library and its dependencies will be added to the classpath.
 The `--packages` argument can also be used with `bin/spark-submit`.
@@ -62,4 +62,4 @@ This library is cross-published for Scala 2.10 and Scala 2.11, so users should r
 
     JavaDStream<String> lines = ZeroMQUtils.createStream(jssc, ...);
 
-See end-to-end examples at [ZeroMQ Examples](https://github.com/apache/bahir/tree/master/streaming-zeromq/examples)
\ No newline at end of file
+See end-to-end examples at [ZeroMQ Examples](https://github.com/apache/bahir/tree/master/streaming-zeromq/examples)

http://git-wip-us.apache.org/repos/asf/bahir-website/blob/21c1d626/site/docs/spark/overview.md
----------------------------------------------------------------------
diff --git a/site/docs/spark/overview.md b/site/docs/spark/overview.md
index 7b232ce..7c52280 100644
--- a/site/docs/spark/overview.md
+++ b/site/docs/spark/overview.md
@@ -28,7 +28,8 @@ limitations under the License.
 ### Apache Bahir Extensions for Apache Spark
 
  - [Current - 2.2.0-SNAPSHOT](/docs/spark/current/documentation)
+ - [2.1.1](/docs/spark/2.1.1/documentation)
  - [2.1.0](/docs/spark/2.1.0/documentation)
  - [2.0.2](/docs/spark/2.0.2/documentation)
  - [2.0.1](/docs/spark/2.0.1/documentation)
- - [2.0.0](/docs/spark/2.0.0/documentation)
\ No newline at end of file
+ - [2.0.0](/docs/spark/2.0.0/documentation)