You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/08/06 17:24:25 UTC
bahir git commit: [BAHIR-42] Refactor sql-streaming-mqtt scala example
Repository: bahir
Updated Branches:
refs/heads/master 4a993afaa -> 1abeab29c
[BAHIR-42] Refactor sql-streaming-mqtt scala example
Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/1abeab29
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/1abeab29
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/1abeab29
Branch: refs/heads/master
Commit: 1abeab29c8a5e884f4603ef12abd85971a9105b0
Parents: 4a993af
Author: Luciano Resende <lr...@apache.org>
Authored: Sat Aug 6 20:24:01 2016 +0300
Committer: Luciano Resende <lr...@apache.org>
Committed: Sat Aug 6 20:24:01 2016 +0300
----------------------------------------------------------------------
.../streaming/mqtt/MQTTStreamWordCount.scala | 73 ++++++++++++++++++++
.../mqtt/examples/MQTTStreamWordCount.scala | 73 --------------------
2 files changed, 73 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/bahir/blob/1abeab29/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala b/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala
new file mode 100644
index 0000000..237a8fa
--- /dev/null
+++ b/sql-streaming-mqtt/examples/src/main/scala/org/apache/bahir/examples/sql/streaming/mqtt/MQTTStreamWordCount.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bahir.examples.sql.streaming.mqtt
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from MQTT Server.
+ *
+ * Usage: MQTTStreamWordCount <brokerUrl> <topic>
+ * <brokerUrl> and <topic> describe the MQTT server that Structured Streaming
+ * would connect to receive data.
+ *
+ * To run this on your local machine, a MQTT Server should be up and running.
+ *
+ */
+object MQTTStreamWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: MQTTStreamWordCount <brokerUrl> <topic>") // scalastyle:off println
+ System.exit(1)
+ }
+
+ val brokerUrl = args(0)
+ val topic = args(1)
+
+ val spark = SparkSession
+ .builder
+ .appName("MQTTStreamWordCount")
+ .master("local[4]")
+ .getOrCreate()
+
+ import spark.implicits._
+
+ // Create DataFrame representing the stream of input lines from connection to mqtt server
+ val lines = spark.readStream
+ .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
+ .option("topic", topic)
+ .load(brokerUrl).as[(String, Timestamp)]
+
+ // Split the lines into words
+ val words = lines.map(_._1).flatMap(_.split(" "))
+
+ // Generate running word count
+ val wordCounts = words.groupBy("value").count()
+
+ // Start running the query that prints the running counts to the console
+ val query = wordCounts.writeStream
+ .outputMode("complete")
+ .format("console")
+ .start()
+
+ query.awaitTermination()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/bahir/blob/1abeab29/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala
----------------------------------------------------------------------
diff --git a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala b/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala
deleted file mode 100644
index c792858..0000000
--- a/sql-streaming-mqtt/src/main/scala/org/apache/bahir/sql/streaming/mqtt/examples/MQTTStreamWordCount.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bahir.sql.streaming.mqtt.examples
-
-import java.sql.Timestamp
-
-import org.apache.spark.sql.SparkSession
-
-/**
- * Counts words in UTF8 encoded, '\n' delimited text received from MQTT Server.
- *
- * Usage: MQTTStreamWordCount <brokerUrl> <topic>
- * <brokerUrl> and <topic> describe the MQTT server that Structured Streaming
- * would connect to receive data.
- *
- * To run this on your local machine, a MQTT Server should be up and running.
- *
- */
-object MQTTStreamWordCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: MQTTStreamWordCount <brokerUrl> <topic>") // scalastyle:off println
- System.exit(1)
- }
-
- val brokerUrl = args(0)
- val topic = args(1)
-
- val spark = SparkSession
- .builder
- .appName("MQTTStreamWordCount")
- .master("local[4]")
- .getOrCreate()
-
- import spark.implicits._
-
- // Create DataFrame representing the stream of input lines from connection to mqtt server
- val lines = spark.readStream
- .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
- .option("topic", topic)
- .load(brokerUrl).as[(String, Timestamp)]
-
- // Split the lines into words
- val words = lines.map(_._1).flatMap(_.split(" "))
-
- // Generate running word count
- val wordCounts = words.groupBy("value").count()
-
- // Start running the query that prints the running counts to the console
- val query = wordCounts.writeStream
- .outputMode("complete")
- .format("console")
- .start()
-
- query.awaitTermination()
- }
-}
-