You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/08/06 17:24:25 UTC

bahir git commit: [BAHIR-42] Refactor sql-streaming-mqtt scala example

Repository: bahir
Updated Branches:
  refs/heads/master 4a993afaa -> 1abeab29c


[BAHIR-42] Refactor sql-streaming-mqtt scala example


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/1abeab29
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/1abeab29
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/1abeab29

Branch: refs/heads/master
Commit: 1abeab29c8a5e884f4603ef12abd85971a9105b0
Parents: 4a993af
Author: Luciano Resende <lr...@apache.org>
Authored: Sat Aug 6 20:24:01 2016 +0300
Committer: Luciano Resende <lr...@apache.org>
Committed: Sat Aug 6 20:24:01 2016 +0300

----------------------------------------------------------------------
 .../streaming/mqtt/MQTTStreamWordCount.scala    | 73 ++++++++++++++++++++
 .../mqtt/examples/MQTTStreamWordCount.scala     | 73 --------------------
 2 files changed, 73 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/1abeab29/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala b/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala
new file mode 100644
index 0000000..237a8fa
--- /dev/null
+++ b/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.examples.sql.streaming.mqtt
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from MQTT Server.
+ *
+ * Usage: MQTTStreamWordCount <brokerUrl> <topic>
+ * <brokerUrl> and <topic> describe the MQTT server that Structured Streaming
+ * would connect to receive data.
+ *
+ * To run this on your local machine, a MQTT Server should be up and running.
+ *
+ */
+object MQTTStreamWordCount  {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.err.println("Usage: MQTTStreamWordCount <brokerUrl> <topic>") // scalastyle:off println
+      System.exit(1)
+    }
+
+    val brokerUrl = args(0)
+    val topic = args(1)
+
+    val spark = SparkSession
+      .builder
+      .appName("MQTTStreamWordCount")
+      .master("local[4]")
+      .getOrCreate()
+
+    import spark.implicits._
+
+    // Create DataFrame representing the stream of input lines from connection to mqtt server
+    val lines = spark.readStream
+      .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+      .option("topic", topic)
+      .load(brokerUrl).as[(String, Timestamp)]
+
+    // Split the lines into words
+    val words = lines.map(_._1).flatMap(_.split(" "))
+
+    // Generate running word count
+    val wordCounts = words.groupBy("value").count()
+
+    // Start running the query that prints the running counts to the console
+    val query = wordCounts.writeStream
+      .outputMode("complete")
+      .format("console")
+      .start()
+
+    query.awaitTermination()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/bahir/blob/1abeab29/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala
deleted file mode 100644
index c792858..0000000
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bahir.sql.streaming.mqtt.examples
-
-import java.sql.Timestamp
-
-import org.apache.spark.sql.SparkSession
-
-/**
- * Counts words in UTF8 encoded, '\n' delimited text received from MQTT Server.
- *
- * Usage: MQTTStreamWordCount <brokerUrl> <topic>
- * <brokerUrl> and <topic> describe the MQTT server that Structured Streaming
- * would connect to receive data.
- *
- * To run this on your local machine, a MQTT Server should be up and running.
- *
- */
-object MQTTStreamWordCount  {
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println("Usage: MQTTStreamWordCount <brokerUrl> <topic>") // scalastyle:off println
-      System.exit(1)
-    }
-
-    val brokerUrl = args(0)
-    val topic = args(1)
-
-    val spark = SparkSession
-      .builder
-      .appName("MQTTStreamWordCount")
-      .master("local[4]")
-      .getOrCreate()
-
-    import spark.implicits._
-
-    // Create DataFrame representing the stream of input lines from connection to mqtt server
-    val lines = spark.readStream
-      .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
-      .option("topic", topic)
-      .load(brokerUrl).as[(String, Timestamp)]
-
-    // Split the lines into words
-    val words = lines.map(_._1).flatMap(_.split(" "))
-
-    // Generate running word count
-    val wordCounts = words.groupBy("value").count()
-
-    // Start running the query that prints the running counts to the console
-    val query = wordCounts.writeStream
-      .outputMode("complete")
-      .format("console")
-      .start()
-
-    query.awaitTermination()
-  }
-}
-