<title>Spark Streaming Akka</title>
<body>
<div class="container">
+<!--<div class="hero-unit Spark Streaming Akka">
+  <h1></h1>
+<div class="row">
+  <div class="col-md-12">
+    <!--
+<p>A library for reading data from Akka Actors using Spark Streaming.</p>
+<h2 id="linking">Linking</h2>
+<p>Using SBT:</p>
libraryDependencies += "org.apache.bahir" %% "spark-streaming-akka" % "2.1.3"
<p>Using Maven:</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>&lt;dependency&gt;
+    &lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
+    &lt;artifactId&gt;spark-streaming-akka_2.11&lt;/artifactId&gt;
+    &lt;version&gt;2.1.3&lt;/version&gt;
<p>This library can also be added to Spark jobs launched through <code class="highlighter-rouge">spark-shell</code> or <code class="highlighter-rouge">spark-submit</code> by using the <code class="highlighter-rouge">--packages</code> command line option.
For example, to include it when starting the spark shell:</p>
+For example, to include it when starting the spark shell:</p>
$ bin/spark-shell --packages org.apache.bahir:spark-streaming-akka_2.11:2.1.3
+<p>Unlike using <code class="highlighter-rouge">--jars</code>, using <code class="highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath.
+The <code class="highlighter-rouge">--packages</code> argument can also be used with <code class="highlighter-rouge">bin/spark-submit</code>.</p>
<p>This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.</p>
<h2 id="examples">Examples</h2>
+<h2 id="examples">Examples</h2>
<p>DStreams can be created with data streams received through Akka actors by using <code class="highlighter-rouge">AkkaUtils.createStream(ssc, actorProps, actor-name)</code>.</p>
<h3 id="scala-api">Scala API</h3>
+<h3 id="scala-api">Scala API</h3>
<p>You need to extend <code class="highlighter-rouge">ActorReceiver</code> so as to store received data into Spark using <code class="highlighter-rouge">store(...)</code> methods. The supervisor strategy of
this actor can be configured to handle failures, etc.</p>
+this actor can be configured to handle failures, etc.</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>class CustomActor extends ActorReceiver {
+  def receive = {
+    case data: String =&gt; store(data)
+  }
+// A new input stream can be created with this custom actor as
+val ssc: StreamingContext = ...
+val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")
<h3 id="java-api">Java API</h3>
<p>You need to extend <code class="highlighter-rouge">JavaActorReceiver</code> so as to store received data into Spark using <code class="highlighter-rouge">store(...)</code> methods. The supervisor strategy of
this actor can be configured to handle failures, etc.</p>
+this actor can be configured to handle failures, etc.</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>class CustomActor extends JavaActorReceiver {
+    @Override
+    public void onReceive(Object msg) throws Exception {
+        store((String) msg);
+    }
+// A new input stream can be created with this custom actor as
+JavaStreamingContext jssc = ...;
+JavaDStream&lt;String&gt; lines = AkkaUtils.&lt;String&gt;createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");
<p>See end-to-end examples at <a href="">Akka Examples</a></p>
</div>
+  </body>
<title>Spark Structured Streaming MQTT</title>
<body>
<div class="container">
+<!--<div class="hero-unit Spark Structured Streaming MQTT">
+  <h1></h1>
+<div class="row">
+  <div class="col-md-12">
+    <!--
<p><a href="">MQTT</a> is MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium.</p>
<h2 id="linking">Linking</h2>
<p>Using SBT:</p>
+<h2 id="linking">Linking</h2>
+<p>Using SBT:</p>
libraryDependencies += "org.apache.bahir" %% "spark-streaming-mqtt" % "2.1.3"
<p>Using Maven:</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>&lt;dependency&gt;
+    &lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
+    &lt;artifactId&gt;spark-streaming-mqtt_2.11&lt;/artifactId&gt;
+    &lt;version&gt;2.1.3&lt;/version&gt;
<p>This library can also be added to Spark jobs launched through <code class="highlighter-rouge">spark-shell</code> or <code class="highlighter-rouge">spark-submit</code> by using the <code class="highlighter-rouge">--packages</code> command line option.
For example, to include it when starting the spark shell:</p>
+For example, to include it when starting the spark shell:</p>
$ bin/spark-shell --packages org.apache.bahir:spark-streaming-mqtt_2.11:2.1.3
+<p>Unlike using <code class="highlighter-rouge">--jars</code>, using <code class="highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath.
+The <code class="highlighter-rouge">--packages</code> argument can also be used with <code class="highlighter-rouge">bin/spark-submit</code>.</p>
<p>This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.</p>
<h2 id="configuration-options">Configuration options.</h2>
+<h2 id="configuration-options">Configuration options.</h2>
+<p>This source uses the <a href="">Eclipse Paho Java Client</a>. Client API documentation is located <a href="">here</a>.</p>
+  <li><code class="highlighter-rouge">brokerUrl</code> A url MqttClient connects to. Set this as the url of the Mqtt Server. e.g. tcp://localhost:1883.</li>
+  <li><code class="highlighter-rouge">storageLevel</code> By default it is used for storing incoming messages on disk.</li>
+  <li><code class="highlighter-rouge">topic</code> Topic MqttClient subscribes to.</li>
+  <li><code class="highlighter-rouge">topics</code> List of topics MqttClient subscribes to.</li>
+  <li><code class="highlighter-rouge">clientId</code> clientId, this client is assoicated with. Provide the same value to recover a stopped client.</li>
+  <li><code class="highlighter-rouge">QoS</code> The maximum quality of service to subscribe each topic at. Messages published at a lower quality of service will be received at the published QoS. Messages published at a higher quality of service will be received using the QoS specified on the subscribe.</li>
+  <li><code class="highlighter-rouge">username</code> Sets the user name to use for the connection to Mqtt Server. Do not set it, if server does not need this. Setting it empty will lead to errors.</li>
+  <li><code class="highlighter-rouge">password</code> Sets the password to use for the connection.</li>
+  <li><code class="highlighter-rouge">cleanSession</code> Setting it true starts a clean session, removes all checkpointed messages by a previous run of this source. This is set to false by default.</li>
+  <li><code class="highlighter-rouge">connectionTimeout</code> Sets the connection timeout, a value of 0 is interpreted as wait until client connects. See <code class="highlighter-rouge">MqttConnectOptions.setConnectionTimeout</code> for more information.</li>
+  <li><code class="highlighter-rouge">keepAlive</code> Same as <code class="highlighter-rouge">MqttConnectOptions.setKeepAliveInterval</code>.</li>
+  <li><code class="highlighter-rouge">mqttVersion</code> Same as <code class="highlighter-rouge">MqttConnectOptions.setMqttVersion</code>.</li>
+<h2 id="examples">Examples</h2>
+<h3 id="scala-api">Scala API</h3>
<p>You need to extend <code class="highlighter-rouge">ActorReceiver</code> so as to store received data into Spark using <code class="highlighter-rouge">store(...)</code> methods. The supervisor strategy of
this actor can be configured to handle failures, etc.</p>
+this actor can be configured to handle failures, etc.</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>val lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
+val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topic)
+val lines = MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topic)
<p>Additional mqtt connection options can be provided:</p>
+<pre><code class="language-Scala">val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+val lines = MQTTUtils.createPairedStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
+val lines = MQTTUtils.createPairedByteArrayStream(ssc, brokerUrl, topics, storageLevel, clientId, username, password, cleanSession, qos, connectionTimeout, keepAliveInterval, mqttVersion)
<h3 id="java-api">Java API</h3>
<p>You need to extend <code class="highlighter-rouge">JavaActorReceiver</code> so as to store received data into Spark using <code class="highlighter-rouge">store(...)</code> methods. The supervisor strategy of
this actor can be configured to handle failures, etc.</p>
+this actor can be configured to handle failures, etc.</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>JavaDStream&lt;String&gt; lines = MQTTUtils.createStream(jssc, brokerUrl, topic);
+JavaReceiverInputDStream&lt;Tuple2&lt;String, String&gt;&gt; lines = MQTTUtils.createPairedStream(jssc, brokerUrl, topics);
+JavaReceiverInputDStream&lt;Tuple2&lt;String, String&gt;&gt; lines = MQTTUtils.createPairedByteArrayStream(jssc, brokerUrl, topics);
+<p>See end-to-end examples at <a href="">MQTT Examples</a></p>
+<h3 id="python-api">Python API</h3>
+<p>Create a DStream from a single topic.</p>
MQTTUtils.createStream(ssc, broker_url, topic)
<p>Create a DStream from a list of topics.</p>
MQTTUtils.createPairedStream(ssc, broker_url, topics)
</div>
+  </body>
<title>Spark Streaming Google Pub-Sub</title>
<body>
<div class="container">
+<!--<div class="hero-unit Spark Streaming Google Pub-Sub">
+  <h1></h1>
+<div class="row">
+  <div class="col-md-12">
+    <!--
<p>A library for reading data from <a href="">Google Cloud Pub/Sub</a> using Spark Streaming.</p>
<h2 id="linking">Linking</h2>
<p>Using SBT:</p>
+<h2 id="linking">Linking</h2>
+<p>Using SBT:</p>
libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % "2.1.3"
<p>Using Maven:</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>&lt;dependency&gt;
+    &lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
+    &lt;artifactId&gt;spark-streaming-pubsub_2.11&lt;/artifactId&gt;
+    &lt;version&gt;2.1.3&lt;/version&gt;
<p>This library can also be added to Spark jobs launched through <code class="highlighter-rouge">spark-shell</code> or <code class="highlighter-rouge">spark-submit</code> by using the <code class="highlighter-rouge">--packages</code> command line option.
For example, to include it when starting the spark shell:</p>
+For example, to include it when starting the spark shell:</p>
$ bin/spark-shell --packages org.apache.bahir:spark-streaming-pubsub_2.11:2.1.3
+<p>Unlike using <code class="highlighter-rouge">--jars</code>, using <code class="highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath.
+The <code class="highlighter-rouge">--packages</code> argument can also be used with <code class="highlighter-rouge">bin/spark-submit</code>.</p>
+<h2 id="examples">Examples</h2>
+<p>First you need to create credential by SparkGCPCredentials, it support four type of credentials
+* application default
+    <code class="highlighter-rouge"></code>
+* json type service account
+    <code class="highlighter-rouge">SparkGCPCredentials.builder.jsonServiceAccount(PATH_TO_JSON_KEY).build()</code>
+* p12 type service account
+    <code class="highlighter-rouge">SparkGCPCredentials.builder.p12ServiceAccount(PATH_TO_P12_KEY, EMAIL_ACCOUNT).build()</code>
+* metadata service account(running on dataproc)
+    <code class="highlighter-rouge">SparkGCPCredentials.builder.metadataServiceAccount().build()</code></p>
+<h3 id="scala-api">Scala API</h3>
+<div class="highlighter-rouge"><pre class="highlight"><code>val lines = PubsubUtils.createStream(ssc, projectId, subscriptionName, credential, ..)
+<h3 id="java-api">Java API</h3>
+<div class="highlighter-rouge"><pre class="highlight"><code>JavaDStream&lt;SparkPubsubMessage&gt; lines = PubsubUtils.createStream(jssc, projectId, subscriptionName, credential...)
+<p>See end-to-end examples at <a href="streaming-pubsub/examples">Google Cloud Pubsub Examples</a></p>
+<h3 id="unit-test">Unit Test</h3>
+<p>To run the PubSub test cases, you need to generate <strong>Google API service account key files</strong> and set the corresponding environment variable to enable the test.</p>
+<h4 id="to-generate-a-service-account-key-file-with-pubsub-permission">To generate a service account key file with PubSub permission</h4>
+  <li>Go to <a href="">Google API Console</a></li>
+  <li>Choose the <code class="highlighter-rouge">Credentials</code> Tab&gt; <code class="highlighter-rouge">Create credentials</code> button&gt; <code class="highlighter-rouge">Service account key</code></li>
+  <li>Fill the account name, assign <code class="highlighter-rouge">Role&gt; Pub/Sub&gt; Pub/Sub Editor</code> and check the option <code class="highlighter-rouge">Furnish a private key</code> to create one. You need to create one for JSON key file, another for P12.</li>
+  <li>The account email is the <code class="highlighter-rouge">Service account ID</code></li>
+<h4 id="setting-the-environment-variables-and-run-test">Setting the environment variables and run test</h4>
+<div class="highlighter-rouge"><pre class="highlight"><code>mvn clean package -DskipTests -pl streaming-pubsub
+export GCP_TEST_JSON_KEY_PATH=/path/to/pubsub/credential/files/Apache-Bahir-PubSub-1234abcd.json
+export GCP_TEST_P12_KEY_PATH=/path/to/pubsub/credential/files/Apache-Bahir-PubSub-5678efgh.p12
+mvn test -pl streaming-pubsub
+  </div>
+  </body>
+  <body>
+    <div class="container">
+<!--<div class="hero-unit Spark Streaming Twitter">
+  <h1></h1>
+<div class="row">
+  <div class="col-md-12">
+    <!--
+<p>A library for reading social data from <a href="">twitter</a> using Spark Streaming.</p>
+<h2 id="linking">Linking</h2>
+<p>Using SBT:</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.1.3"
+<p>Using Maven:</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>&lt;dependency&gt;
+    &lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
+    &lt;artifactId&gt;spark-streaming-twitter_2.11&lt;/artifactId&gt;
+    &lt;version&gt;2.1.3&lt;/version&gt;
+<p>This library can also be added to Spark jobs launched through <code class="highlighter-rouge">spark-shell</code> or <code class="highlighter-rouge">spark-submit</code> by using the <code class="highlighter-rouge">--packages</code> command line option.
+For example, to include it when starting the spark shell:</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>$ bin/spark-shell --packages org.apache.bahir:spark-streaming-twitter_2.11:2.1.3
+<p>Unlike using <code class="highlighter-rouge">--jars</code>, using <code class="highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath.
+The <code class="highlighter-rouge">--packages</code> argument can also be used with <code class="highlighter-rouge">bin/spark-submit</code>.</p>
+<p>This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.</p>
+<h2 id="examples">Examples</h2>
+<p><code class="highlighter-rouge">TwitterUtils</code> uses Twitter4j to get the public stream of tweets using <a href="">Twitter’s Streaming API</a>. Authentication information
+can be provided by any of the <a href="">methods</a> supported by Twitter4J library. You can import the <code class="highlighter-rouge">TwitterUtils</code> class and create a DStream with <code class="highlighter-rouge">TwitterUtils.createStream</code> as shown below.</p>
+<h3 id="scala-api">Scala API</h3>
+<div class="highlighter-rouge"><pre class="highlight"><code>import org.apache.spark.streaming.twitter._
+TwitterUtils.createStream(ssc, None)
+<h3 id="java-api">Java API</h3>
+<div class="highlighter-rouge"><pre class="highlight"><code>import org.apache.spark.streaming.twitter.*;
+<p>You can also either get the public stream, or get the filtered stream based on keywords.
+See end-to-end examples at <a href="">Twitter Examples</a></p>
+  </div>
+  </body>
+  <body>
+    <div class="container">
+<!--<div class="hero-unit Spark Streaming ZeroMQ">
+  <h1></h1>
+<div class="row">
+  <div class="col-md-12">
+    <!--
+<p>A library for reading data from <a href="">ZeroMQ</a> using Spark Streaming.</p>
+<h2 id="linking">Linking</h2>
+<p>Using SBT:</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>libraryDependencies += "org.apache.bahir" %% "spark-streaming-zeromq" % "2.1.3"
+<p>Using Maven:</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>&lt;dependency&gt;
+    &lt;groupId&gt;org.apache.bahir&lt;/groupId&gt;
+    &lt;artifactId&gt;spark-streaming-zeromq_2.11&lt;/artifactId&gt;
+    &lt;version&gt;2.1.3&lt;/version&gt;
+<p>This library can also be added to Spark jobs launched through <code class="highlighter-rouge">spark-shell</code> or <code class="highlighter-rouge">spark-submit</code> by using the <code class="highlighter-rouge">--packages</code> command line option.
+For example, to include it when starting the spark shell:</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>$ bin/spark-shell --packages org.apache.bahir:spark-streaming-zeromq_2.11:2.1.3
+<p>Unlike using <code class="highlighter-rouge">--jars</code>, using <code class="highlighter-rouge">--packages</code> ensures that this library and its dependencies will be added to the classpath.
+The <code class="highlighter-rouge">--packages</code> argument can also be used with <code class="highlighter-rouge">bin/spark-submit</code>.</p>
+<p>This library is cross-published for Scala 2.10 and Scala 2.11, so users should replace the proper Scala version (2.10 or 2.11) in the commands listed above.</p>
+<h2 id="examples">Examples</h2>
+<h3 id="scala-api">Scala API</h3>
+<div class="highlighter-rouge"><pre class="highlight"><code>val lines = ZeroMQUtils.createStream(ssc, ...)
+<h3 id="java-api">Java API</h3>
+<div class="highlighter-rouge"><pre class="highlight"><code>JavaDStream&lt;String&gt; lines = ZeroMQUtils.createStream(jssc, ...);
+<p>See end-to-end examples at <a href="">ZeroMQ Examples</a></p>
+  </div>
+  </body>
+  <body>
+    <div class="container">
+<!--<div class="hero-unit Extensions for Apache Spark">
+  <h1></h1>
+<div class="row">
+  <div class="col-md-12">
+    <!--
+<h3 id="apache-bahir-extensions-for-apache-spark">Apache Bahir Extensions for Apache Spark</h3>
+<p><br /></p>
+<h4 id="sql--data-sources">SQL  Data Sources</h4>
+<p><a href="../spark-sql-cloudant">Apache CouchDB/Cloudant data source</a></p>
+<p><br /></p>
+<h4 id="structured-streaming-data-sources">Structured Streaming Data Sources</h4>
+<p><a href="../spark-sql-streaming-akka">Akka data source</a></p>
+<p><a href="../spark-sql-streaming-mqtt">MQTT data source</a></p>
+<p><br /></p>
+<h4 id="discretized-streams-dstreams-connectors">Discretized Streams (DStreams) Connectors</h4>
+<p><a href="../spark-sql-cloudant">Apache CouchDB/Cloudant connector</a></p>
+<p><a href="../spark-streaming-akka">Akka connector</a></p>
+<p><a href="../spark-streaming-pubsub">Google Cloud Pub/Sub connector</a></p>
+<p><a href="../spark-streaming-mqtt">MQTT connector</a></p>
+<p><a href="../spark-streaming-twitter">Twitter connector</a></p>
+<p><a href="../spark-streaming-zeromq">ZeroMQ connector</a></p>
+  </div>
