You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/04 15:50:06 UTC
[2/2] flink git commit: [streaming] Removed StockPrices example
[streaming] Removed StockPrices example
The purpose of this was to serve as an example for a blogpost.
Moved it to another branch and removed it from the master.
Closes #777
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39010dce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39010dce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39010dce
Branch: refs/heads/master
Commit: 39010dce426936f66a3f9440e77d5c4edadd7bd9
Parents: 23a3646
Author: mbalassi <mb...@apache.org>
Authored: Thu Jun 4 10:11:42 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Thu Jun 4 15:48:48 2015 +0200
----------------------------------------------------------------------
.../flink-streaming-examples/pom.xml | 23 -
.../examples/windowing/StockPrices.java | 438 -------------------
.../scala/examples/join/WindowJoin.scala | 3 +-
.../scala/examples/windowing/StockPrices.scala | 228 ----------
4 files changed, 2 insertions(+), 690 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/39010dce/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
index 2ebd606..6c75dc6 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
@@ -314,29 +314,6 @@ under the License.
</configuration>
</execution>
- <!-- StockPrices -->
- <execution>
- <id>StockPrices</id>
- <phase>package</phase>
- <goals>
- <goal>jar</goal>
- </goals>
- <configuration>
- <classifier>StockPrices</classifier>
-
- <archive>
- <manifestEntries>
- <program-class>org.apache.flink.streaming.examples.windowing.StockPrices</program-class>
- </manifestEntries>
- </archive>
-
- <includes>
- <include>org/apache/flink/streaming/examples/windowing/StockPrices.class</include>
- <include>org/apache/flink/streaming/examples/windowing/StockPrices$*.class</include>
- </includes>
- </configuration>
- </execution>
-
<!-- TopSpeedWindowing -->
<execution>
<id>TopSpeedWindowing</id>
http://git-wip-us.apache.org/repos/asf/flink/blob/39010dce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
deleted file mode 100644
index 56abb12..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.util.Collector;
-
-/**
- * This example showcases a moderately complex Flink Streaming pipeline.
- * It to computes statistics on stock market data that arrive continuously,
- * and combines the stock market data with tweet streams.
- * For a detailed explanation of the job, check out the blog post unrolling it.
- * To run the example make sure that the service providing the text data
- * is already up and running.
- *
- * <p>
- * To start an example socket text stream on your local machine run netcat from
- * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
- * port number.
- *
- *
- * <p>
- * Usage:
- * <code>StockPrices <hostname> <port> <result path></code>
- * <br>
- *
- * <p>
- * This example shows how to:
- * <ul>
- * <li>union and join data streams,
- * <li>use different windowing policies,
- * <li>define windowing aggregations.
- * </ul>
- *
- * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
- * @see <a href="http://flink.apache.org/news/2015/02/09/streaming-example.html">blogpost</a>
- */
-public class StockPrices {
-
- private static final ArrayList<String> SYMBOLS = new ArrayList<String>(Arrays.asList("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG"));
- private static final Double DEFAULT_PRICE = 1000.;
- private static final StockPrice DEFAULT_STOCK_PRICE = new StockPrice("", DEFAULT_PRICE);
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- @SuppressWarnings({ "serial", "unused" })
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- //Step 1
- //Read a stream of stock prices from different sources and union it into one stream
-
- //Read from a socket stream at map it to StockPrice objects
- DataStream<StockPrice> socketStockStream = env.socketTextStream(hostName, port)
- .map(new MapFunction<String, StockPrice>() {
- private String[] tokens;
-
- @Override
- public StockPrice map(String value) throws Exception {
- tokens = value.split(",");
- return new StockPrice(tokens[0], Double.parseDouble(tokens[1]));
- }
- });
-
- //Generate other stock streams
- DataStream<StockPrice> SPX_stream = env.addSource(new StockSource("SPX", 10));
- DataStream<StockPrice> FTSE_stream = env.addSource(new StockSource("FTSE", 20));
- DataStream<StockPrice> DJI_stream = env.addSource(new StockSource("DJI", 30));
- DataStream<StockPrice> BUX_stream = env.addSource(new StockSource("BUX", 40));
-
- //Merge all stock streams together
- @SuppressWarnings("unchecked")
- DataStream<StockPrice> stockStream = socketStockStream.union(SPX_stream, FTSE_stream, DJI_stream, BUX_stream);
-
- //Step 2
- //Compute some simple statistics on a rolling window
- WindowedDataStream<StockPrice> windowedStream = stockStream
- .window(Time.of(10, TimeUnit.SECONDS))
- .every(Time.of(5, TimeUnit.SECONDS));
-
- DataStream<StockPrice> lowest = windowedStream.minBy("price").flatten();
- DataStream<StockPrice> maxByStock = windowedStream.groupBy("symbol").maxBy("price").flatten();
- DataStream<StockPrice> rollingMean = windowedStream.groupBy("symbol").mapWindow(new WindowMean()).flatten();
-
- //Step 3
- //Use delta policy to create price change warnings, and also count the number of warning every half minute
-
- DataStream<String> priceWarnings = stockStream.groupBy("symbol")
- .window(Delta.of(0.05, new DeltaFunction<StockPrice>() {
- @Override
- public double getDelta(StockPrice oldDataPoint, StockPrice newDataPoint) {
- return Math.abs(oldDataPoint.price - newDataPoint.price);
- }
- }, DEFAULT_STOCK_PRICE))
- .mapWindow(new SendWarning()).flatten();
-
-
- DataStream<Count> warningsPerStock = priceWarnings.map(new MapFunction<String, Count>() {
- @Override
- public Count map(String value) throws Exception {
- return new Count(value, 1);
- }
- }).groupBy("symbol").window(Time.of(30, TimeUnit.SECONDS)).sum("count").flatten();
-
- //Step 4
- //Read a stream of tweets and extract the stock symbols
- DataStream<String> tweetStream = env.addSource(new TweetSource());
-
- DataStream<String> mentionedSymbols = tweetStream.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String value, Collector<String> out) throws Exception {
- String[] words = value.split(" ");
- for (String word : words) {
- out.collect(word.toUpperCase());
- }
- }
- }).filter(new FilterFunction<String>() {
- @Override
- public boolean filter(String value) throws Exception {
- return SYMBOLS.contains(value);
- }
- });
-
- DataStream<Count> tweetsPerStock = mentionedSymbols.map(new MapFunction<String, Count>() {
- @Override
- public Count map(String value) throws Exception {
- return new Count(value, 1);
- }
- }).groupBy("symbol")
- .window(Time.of(30, TimeUnit.SECONDS))
- .sum("count").flatten();
-
- //Step 5
- //For advanced analysis we join the number of tweets and the number of price change warnings by stock
- //for the last half minute, we keep only the counts. We use this information to compute rolling correlations
- //between the tweets and the price changes
-
- DataStream<Tuple2<Integer, Integer>> tweetsAndWarning = warningsPerStock.join(tweetsPerStock)
- .onWindow(30, TimeUnit.SECONDS)
- .where("symbol")
- .equalTo("symbol")
- .with(new JoinFunction<Count, Count, Tuple2<Integer, Integer>>() {
- @Override
- public Tuple2<Integer, Integer> join(Count first, Count second) throws Exception {
- return new Tuple2<Integer, Integer>(first.count, second.count);
- }
- });
-
- DataStream<Double> rollingCorrelation = tweetsAndWarning
- .window(Time.of(30, TimeUnit.SECONDS))
- .mapWindow(new WindowCorrelation()).flatten();
-
- if (fileOutput) {
- rollingCorrelation.writeAsText(outputPath, 1);
- } else {
- rollingCorrelation.print();
- }
-
- env.execute("Stock stream");
-
- }
-
- // *************************************************************************
- // DATA TYPES
- // *************************************************************************
-
- public static class StockPrice implements Serializable {
-
- private static final long serialVersionUID = 1L;
- public String symbol;
- public Double price;
-
- public StockPrice() {
- }
-
- public StockPrice(String symbol, Double price) {
- this.symbol = symbol;
- this.price = price;
- }
-
- @Override
- public String toString() {
- return "StockPrice{" +
- "symbol='" + symbol + '\'' +
- ", count=" + price +
- '}';
- }
- }
-
- public static class Count implements Serializable{
-
- private static final long serialVersionUID = 1L;
- public String symbol;
- public Integer count;
-
- public Count() {
- }
-
- public Count(String symbol, Integer count) {
- this.symbol = symbol;
- this.count = count;
- }
-
- @Override
- public String toString() {
- return "Count{" +
- "symbol='" + symbol + '\'' +
- ", count=" + count +
- '}';
- }
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- public final static class StockSource extends RichSourceFunction<StockPrice> {
-
- private static final long serialVersionUID = 1L;
- private Double price;
- private String symbol;
- private Integer sigma;
- private transient Random random;
-
-
-
- public StockSource(String symbol, Integer sigma) {
- this.symbol = symbol;
- this.sigma = sigma;
- price = DEFAULT_PRICE;
-
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- random = new Random();
-
- }
-
- @Override
- public boolean reachedEnd() throws Exception {
- return false;
- }
-
- @Override
- public StockPrice next() throws Exception {
- price = price + random.nextGaussian() * sigma;
- Thread.sleep(random.nextInt(200));
- return new StockPrice(symbol, price);
- }
-
- }
-
- public final static class WindowMean implements WindowMapFunction<StockPrice, StockPrice> {
-
- private static final long serialVersionUID = 1L;
- private Double sum = 0.0;
- private Integer count = 0;
- private String symbol = "";
-
- @Override
- public void mapWindow(Iterable<StockPrice> values, Collector<StockPrice> out) throws Exception {
- if (values.iterator().hasNext()) {
-
- for (StockPrice sp : values) {
- sum += sp.price;
- symbol = sp.symbol;
- count++;
- }
- out.collect(new StockPrice(symbol, sum / count));
- }
- }
- }
-
- public static final class TweetSource extends RichSourceFunction<String> {
-
- private static final long serialVersionUID = 1L;
- private transient Random random;
- private transient StringBuilder stringBuilder;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- random = new Random();
- stringBuilder = new StringBuilder();
- }
-
- @Override
- public boolean reachedEnd() throws Exception {
- return false;
- }
-
- @Override
- public String next() throws Exception {
- stringBuilder.setLength(0);
- for (int i = 0; i < 3; i++) {
- stringBuilder.append(" ");
- stringBuilder.append(SYMBOLS.get(random.nextInt(SYMBOLS.size())));
- }
- Thread.sleep(500);
- return stringBuilder.toString();
- }
-
- }
-
- public static final class SendWarning implements WindowMapFunction<StockPrice, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void mapWindow(Iterable<StockPrice> values, Collector<String> out) throws Exception {
- if (values.iterator().hasNext()) {
- out.collect(values.iterator().next().symbol);
- }
- }
- }
-
- public static final class WindowCorrelation implements WindowMapFunction<Tuple2<Integer, Integer>, Double> {
-
- private static final long serialVersionUID = 1L;
- private Integer leftSum;
- private Integer rightSum;
- private Integer count;
-
- private Double leftMean;
- private Double rightMean;
-
- private Double cov;
- private Double leftSd;
- private Double rightSd;
-
- @Override
- public void mapWindow(Iterable<Tuple2<Integer, Integer>> values, Collector<Double> out) throws Exception {
-
- leftSum = 0;
- rightSum = 0;
- count = 0;
-
- cov = 0.;
- leftSd = 0.;
- rightSd = 0.;
-
- //compute mean for both sides, save count
- for (Tuple2<Integer, Integer> pair : values) {
- leftSum += pair.f0;
- rightSum += pair.f1;
- count++;
- }
-
- leftMean = leftSum.doubleValue() / count;
- rightMean = rightSum.doubleValue() / count;
-
- //compute covariance & std. deviations
- for (Tuple2<Integer, Integer> pair : values) {
- cov += (pair.f0 - leftMean) * (pair.f1 - rightMean) / count;
- }
-
- for (Tuple2<Integer, Integer> pair : values) {
- leftSd += Math.pow(pair.f0 - leftMean, 2) / count;
- rightSd += Math.pow(pair.f1 - rightMean, 2) / count;
- }
- leftSd = Math.sqrt(leftSd);
- rightSd = Math.sqrt(rightSd);
-
- out.collect(cov / (leftSd * rightSd));
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String hostName;
- private static int port;
- private static String outputPath;
-
- private static boolean parseParameters(String[] args) {
-
- // parse input arguments
- if (args.length == 3) {
- fileOutput = true;
- hostName = args[0];
- port = Integer.valueOf(args[1]);
- outputPath = args[2];
- } else if (args.length == 2) {
- hostName = args[0];
- port = Integer.valueOf(args[1]);
- } else {
- System.err.println("Usage: StockPrices <hostname> <port> [<output path>]");
- return false;
- }
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/39010dce/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 9032abd..239f1fa 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -121,7 +121,8 @@ object WindowJoin {
salariesPath = args(1)
outputPath = args(2)
} else {
- System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> <input path 2> " + "<result path>")
+ System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> " +
+ "<input path 2> <result path>")
return false
}
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/39010dce/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
deleted file mode 100644
index 4940c6c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples.windowing
-
-import java.util.concurrent.TimeUnit._
-
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.windowing.Delta
-import org.apache.flink.streaming.api.windowing.helper.Time
-import org.apache.flink.util.Collector
-
-import scala.util.Random
-
-/**
- * This example showcases a moderately complex Flink Streaming pipeline.
- * It to computes statistics on stock market data that arrive continuously,
- * and combines the stock market data with tweet streams.
- * For a detailed explanation of the job, check out the
- * [[http://flink.apache.org/news/2015/02/09/streaming-example.html blog post]]
- * unrolling it. To run the example make sure that the service providing
- * the text data is already up and running.
- *
- * To start an example socket text stream on your local machine run netcat
- * from a command line, where the parameter specifies the port number:
- *
- * {{{
- * nc -lk 9999
- * }}}
- *
- * Usage:
- * {{{
- * StockPrices <hostname> <port> <output path>
- * }}}
- *
- * This example shows how to:
- *
- * - union and join data streams,
- * - use different windowing policies,
- * - define windowing aggregations.
- */
-object StockPrices {
-
- case class StockPrice(symbol: String, price: Double)
- case class Count(symbol: String, count: Int)
-
- val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")
-
- val defaultPrice = StockPrice("", 1000)
-
- private var fileOutput: Boolean = false
- private var hostName: String = null
- private var port: Int = 0
- private var outputPath: String = null
-
- def main(args: Array[String]) {
-
- if (!parseParameters(args)) {
- return
- }
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- //Step 1
- //Read a stream of stock prices from different sources and union it into one stream
-
- //Read from a socket stream at map it to StockPrice objects
- val socketStockStream = env.socketTextStream(hostName, port).map(x => {
- val split = x.split(",")
- StockPrice(split(0), split(1).toDouble)
- })
-
- //Generate other stock streams
- val SPX_Stream = env.addSource(generateStock("SPX")(10))
- val FTSE_Stream = env.addSource(generateStock("FTSE")(20))
- val DJI_Stream = env.addSource(generateStock("DJI")(30))
- val BUX_Stream = env.addSource(generateStock("BUX")(40))
-
- //Union all stock streams together
- val stockStream = socketStockStream.union(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream)
-
- //Step 2
- //Compute some simple statistics on a rolling window
- val windowedStream = stockStream.window(Time.of(10, SECONDS)).every(Time.of(5, SECONDS))
-
- val lowest = windowedStream.minBy("price")
- val maxByStock = windowedStream.groupBy("symbol").maxBy("price").getDiscretizedStream
- val rollingMean = windowedStream.groupBy("symbol").mapWindow(mean _).getDiscretizedStream
-
- //Step 3
- //Use delta policy to create price change warnings,
- // and also count the number of warning every half minute
-
- val priceWarnings = stockStream.groupBy("symbol")
- .window(Delta.of(0.05, priceChange, defaultPrice))
- .mapWindow(sendWarning _)
- .flatten()
-
- val warningsPerStock = priceWarnings
- .map(Count(_, 1))
- .window(Time.of(30, SECONDS))
- .groupBy("symbol")
- .sum("count")
- .flatten()
-
- //Step 4
- //Read a stream of tweets and extract the stock symbols
-
- val tweetStream = env.addSource(generateTweets)
-
- val mentionedSymbols = tweetStream.flatMap(tweet => tweet.split(" "))
- .map(_.toUpperCase())
- .filter(symbols.contains(_))
-
- val tweetsPerStock = mentionedSymbols.map(Count(_, 1))
- .window(Time.of(30, SECONDS))
- .groupBy("symbol")
- .sum("count")
- .flatten()
-
- //Step 5
- //For advanced analysis we join the number of tweets and
- //the number of price change warnings by stock
- //for the last half minute, we keep only the counts.
- //This information is used to compute rolling correlations
- //between the tweets and the price changes
-
- val tweetsAndWarning = warningsPerStock
- .join(tweetsPerStock)
- .onWindow(30, SECONDS)
- .where("symbol")
- .equalTo("symbol") { (c1, c2) => (c1.count, c2.count) }
-
-
- val rollingCorrelation = tweetsAndWarning.window(Time.of(30, SECONDS))
- .mapWindow(computeCorrelation _)
- .flatten()
-
- if (fileOutput) {
- rollingCorrelation.writeAsText(outputPath, 1)
- } else {
- rollingCorrelation.print
- }
-
- env.execute("Stock stream")
- }
-
- def priceChange(p1: StockPrice, p2: StockPrice): Double = {
- Math.abs(p1.price / p2.price - 1)
- }
-
- def mean(ts: Iterable[StockPrice], out: Collector[StockPrice]) = {
- if (ts.nonEmpty) {
- out.collect(StockPrice(ts.head.symbol, ts.foldLeft(0: Double)(_ + _.price) / ts.size))
- }
- }
-
- def sendWarning(ts: Iterable[StockPrice], out: Collector[String]) = {
- if (ts.nonEmpty) out.collect(ts.head.symbol)
- }
-
- def computeCorrelation(input: Iterable[(Int, Int)], out: Collector[Double]) = {
- if (input.nonEmpty) {
- val var1 = input.map(_._1)
- val mean1 = average(var1)
- val var2 = input.map(_._2)
- val mean2 = average(var2)
-
- val cov = average(var1.zip(var2).map(xy => (xy._1 - mean1) * (xy._2 - mean2)))
- val d1 = Math.sqrt(average(var1.map(x => Math.pow((x - mean1), 2))))
- val d2 = Math.sqrt(average(var2.map(x => Math.pow((x - mean2), 2))))
-
- out.collect(cov / (d1 * d2))
- }
- }
-
- def generateStock(symbol: String)(sigma: Int) = {
- var price = 1000.0
- () =>
- price = price + Random.nextGaussian * sigma
- Thread.sleep(Random.nextInt(200))
- StockPrice(symbol, price)
-
- }
-
- def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = {
- num.toDouble(ts.sum) / ts.size
- }
-
- def generateTweets = {
- () =>
- val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size)))
- Thread.sleep(Random.nextInt(500))
- s.mkString(" ")
- }
-
- private def parseParameters(args: Array[String]): Boolean = {
- if (args.length == 3) {
- fileOutput = true
- hostName = args(0)
- port = args(1).toInt
- outputPath = args(2)
- } else if (args.length == 2) {
- hostName = args(0)
- port = args(1).toInt
- } else {
- System.err.println("Usage: StockPrices <hostname> <port> [<output path>]")
- return false
- }
- true
- }
-
-}