You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/06/10 15:23:54 UTC

[15/50] [abbrv] bahir git commit: [SPARK-9057][STREAMING] Twitter example joining to static RDD of word sentiment values

[SPARK-9057][STREAMING] Twitter example joining to static RDD of word sentiment values

Example of joining a static RDD of word sentiments to a streaming RDD of Tweets in order to demo the usage of the transform() method.

Author: Jeff L <sh...@alumni.carnegiemellon.edu>

Closes #8431 from Agent007/SPARK-9057.


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/c6155756
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/c6155756
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/c6155756

Branch: refs/heads/master
Commit: c61557569466855728f41c8180bbde8a78e2fb20
Parents: c25b799
Author: Jeff L <sh...@alumni.carnegiemellon.edu>
Authored: Fri Dec 18 15:06:54 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Dec 18 15:06:54 2015 +0000

----------------------------------------------------------------------
 .../JavaTwitterHashTagJoinSentiments.java       | 180 +++++++++++++++++++
 .../twitter/TwitterHashTagJoinSentiments.scala  |  96 ++++++++++
 2 files changed, 276 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/c6155756/streaming-twitter/examples/src/main/java/org/apache/spark/examples/streaming/twitter/JavaTwitterHashTagJoinSentiments.java
----------------------------------------------------------------------
diff --git a/streaming-twitter/examples/src/main/java/org/apache/spark/examples/streaming/twitter/JavaTwitterHashTagJoinSentiments.java b/streaming-twitter/examples/src/main/java/org/apache/spark/examples/streaming/twitter/JavaTwitterHashTagJoinSentiments.java
new file mode 100644
index 0000000..030ee30
--- /dev/null
+++ b/streaming-twitter/examples/src/main/java/org/apache/spark/examples/streaming/twitter/JavaTwitterHashTagJoinSentiments.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.twitter.TwitterUtils;
+import scala.Tuple2;
+import twitter4j.Status;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
+ * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
+ */
+public class JavaTwitterHashTagJoinSentiments {
+
+  public static void main(String[] args) throws IOException {
+    if (args.length < 4) {
+      System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" +
+        " <access token> <access token secret> [<filters>]");
+      System.exit(1);
+    }
+
+    StreamingExamples.setStreamingLogLevels();
+
+    String consumerKey = args[0];
+    String consumerSecret = args[1];
+    String accessToken = args[2];
+    String accessTokenSecret = args[3];
+    String[] filters = Arrays.copyOfRange(args, 4, args.length);
+
+    // Set the system properties so that Twitter4j library used by Twitter stream
+    // can use them to generate OAuth credentials
+    System.setProperty("twitter4j.oauth.consumerKey", consumerKey);
+    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret);
+    System.setProperty("twitter4j.oauth.accessToken", accessToken);
+    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret);
+
+    SparkConf sparkConf = new SparkConf().setAppName("JavaTwitterHashTagJoinSentiments");
+    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
+    JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(jssc, filters);
+
+    JavaDStream<String> words = stream.flatMap(new FlatMapFunction<Status, String>() {
+      @Override
+      public Iterable<String> call(Status s) {
+        return Arrays.asList(s.getText().split(" "));
+      }
+    });
+
+    JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() {
+      @Override
+      public Boolean call(String word) throws Exception {
+        return word.startsWith("#");
+      }
+    });
+
+    // Read in the word-sentiment list and create a static RDD from it
+    String wordSentimentFilePath = "data/streaming/AFINN-111.txt";
+    final JavaPairRDD<String, Double> wordSentiments = jssc.sparkContext().textFile(wordSentimentFilePath)
+      .mapToPair(new PairFunction<String, String, Double>(){
+        @Override
+        public Tuple2<String, Double> call(String line) {
+          String[] columns = line.split("\t");
+          return new Tuple2<String, Double>(columns[0],
+            Double.parseDouble(columns[1]));
+        }
+      });
+
+    JavaPairDStream<String, Integer> hashTagCount = hashTags.mapToPair(
+      new PairFunction<String, String, Integer>() {
+        @Override
+        public Tuple2<String, Integer> call(String s) {
+          // leave out the # character
+          return new Tuple2<String, Integer>(s.substring(1), 1);
+        }
+      });
+
+    JavaPairDStream<String, Integer> hashTagTotals = hashTagCount.reduceByKeyAndWindow(
+      new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer a, Integer b) {
+          return a + b;
+        }
+      }, new Duration(10000));
+
+    // Determine the hash tags with the highest sentiment values by joining the streaming RDD
+    // with the static RDD inside the transform() method and then multiplying
+    // the frequency of the hash tag by its sentiment value
+    JavaPairDStream<String, Tuple2<Double, Integer>> joinedTuples =
+      hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>,
+        JavaPairRDD<String, Tuple2<Double, Integer>>>() {
+        @Override
+        public JavaPairRDD<String, Tuple2<Double, Integer>> call(JavaPairRDD<String,
+          Integer> topicCount)
+          throws Exception {
+          return wordSentiments.join(topicCount);
+        }
+      });
+
+    JavaPairDStream<String, Double> topicHappiness = joinedTuples.mapToPair(
+      new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() {
+        @Override
+        public Tuple2<String, Double> call(Tuple2<String,
+          Tuple2<Double, Integer>> topicAndTuplePair) throws Exception {
+          Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2();
+          return new Tuple2<String, Double>(topicAndTuplePair._1(),
+            happinessAndCount._1() * happinessAndCount._2());
+        }
+      });
+
+    JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair(
+      new PairFunction<Tuple2<String, Double>, Double, String>() {
+        @Override
+        public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness)
+          throws Exception {
+          return new Tuple2<Double, String>(topicHappiness._2(),
+            topicHappiness._1());
+        }
+      });
+
+    JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair(
+      new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() {
+        @Override
+        public JavaPairRDD<Double, String> call(JavaPairRDD<Double,
+          String> happinessAndTopics) throws Exception {
+          return happinessAndTopics.sortByKey(false);
+        }
+      }
+    );
+
+    // Print hash tags with the most positive sentiment values
+    happiest10.foreachRDD(new Function<JavaPairRDD<Double, String>, Void>() {
+      @Override
+      public Void call(JavaPairRDD<Double, String> happinessTopicPairs) throws Exception {
+        List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10);
+        System.out.println(
+          String.format("\nHappiest topics in last 10 seconds (%s total):",
+            happinessTopicPairs.count()));
+        for (Tuple2<Double, String> pair : topList) {
+          System.out.println(
+            String.format("%s (%s happiness)", pair._2(), pair._1()));
+        }
+        return null;
+      }
+    });
+
+    jssc.start();
+    jssc.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/bahir/blob/c6155756/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterHashTagJoinSentiments.scala
----------------------------------------------------------------------
diff --git a/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterHashTagJoinSentiments.scala b/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterHashTagJoinSentiments.scala
new file mode 100644
index 0000000..0328fa8
--- /dev/null
+++ b/streaming-twitter/examples/src/main/scala/org/apache/spark/examples/streaming/twitter/TwitterHashTagJoinSentiments.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.twitter.TwitterUtils
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+
+/**
+ * Displays the most positive hash tags by joining the streaming Twitter data with a static RDD of
+ * the AFINN word list (http://neuro.imm.dtu.dk/wiki/AFINN)
+ */
+object TwitterHashTagJoinSentiments {
+  def main(args: Array[String]) {
+    if (args.length < 4) {
+      System.err.println("Usage: TwitterHashTagJoinSentiments <consumer key> <consumer secret> " +
+        "<access token> <access token secret> [<filters>]")
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
+    val filters = args.takeRight(args.length - 4)
+
+    // Set the system properties so that Twitter4j library used by Twitter stream
+    // can use them to generate OAuth credentials
+    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
+    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
+    System.setProperty("twitter4j.oauth.accessToken", accessToken)
+    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
+
+    val sparkConf = new SparkConf().setAppName("TwitterHashTagJoinSentiments")
+    val ssc = new StreamingContext(sparkConf, Seconds(2))
+    val stream = TwitterUtils.createStream(ssc, None, filters)
+
+    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
+
+    // Read in the word-sentiment list and create a static RDD from it
+    val wordSentimentFilePath = "data/streaming/AFINN-111.txt"
+    val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
+      val Array(word, happinessValue) = line.split("\t")
+      (word, happinessValue)
+    } cache()
+
+    // Determine the hash tags with the highest sentiment values by joining the streaming RDD
+    // with the static RDD inside the transform() method and then multiplying
+    // the frequency of the hash tag by its sentiment value
+    val happiest60 = hashTags.map(hashTag => (hashTag.tail, 1))
+      .reduceByKeyAndWindow(_ + _, Seconds(60))
+      .transform{topicCount => wordSentiments.join(topicCount)}
+      .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
+      .map{case (topic, happinessValue) => (happinessValue, topic)}
+      .transform(_.sortByKey(false))
+
+    val happiest10 = hashTags.map(hashTag => (hashTag.tail, 1))
+      .reduceByKeyAndWindow(_ + _, Seconds(10))
+      .transform{topicCount => wordSentiments.join(topicCount)}
+      .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}
+      .map{case (topic, happinessValue) => (happinessValue, topic)}
+      .transform(_.sortByKey(false))
+
+    // Print hash tags with the most positive sentiment values
+    happiest60.foreachRDD(rdd => {
+      val topList = rdd.take(10)
+      println("\nHappiest topics in last 60 seconds (%s total):".format(rdd.count()))
+      topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
+    })
+
+    happiest10.foreachRDD(rdd => {
+      val topList = rdd.take(10)
+      println("\nHappiest topics in last 10 seconds (%s total):".format(rdd.count()))
+      topList.foreach{case (happiness, tag) => println("%s (%s happiness)".format(tag, happiness))}
+    })
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+// scalastyle:on println