You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/04 15:50:06 UTC

[2/2] flink git commit: [streaming] Removed StockPrices example

[streaming] Removed StockPrices example

The purpose of this was to serve as an example for a blogpost.
Moved it to another branch and removed it from the master.

Closes #777


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39010dce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39010dce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39010dce

Branch: refs/heads/master
Commit: 39010dce426936f66a3f9440e77d5c4edadd7bd9
Parents: 23a3646
Author: mbalassi <mb...@apache.org>
Authored: Thu Jun 4 10:11:42 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Thu Jun 4 15:48:48 2015 +0200

----------------------------------------------------------------------
 .../flink-streaming-examples/pom.xml            |  23 -
 .../examples/windowing/StockPrices.java         | 438 -------------------
 .../scala/examples/join/WindowJoin.scala        |   3 +-
 .../scala/examples/windowing/StockPrices.scala  | 228 ----------
 4 files changed, 2 insertions(+), 690 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/39010dce/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
index 2ebd606..6c75dc6 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-examples/pom.xml
@@ -314,29 +314,6 @@ under the License.
 						</configuration>
 					</execution>
 
-					<!-- StockPrices -->
-					<execution>
-						<id>StockPrices</id>
-						<phase>package</phase>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<configuration>
-							<classifier>StockPrices</classifier>
-
-							<archive>
-								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.windowing.StockPrices</program-class>
-								</manifestEntries>
-							</archive>
-
-							<includes>
-								<include>org/apache/flink/streaming/examples/windowing/StockPrices.class</include>
-								<include>org/apache/flink/streaming/examples/windowing/StockPrices$*.class</include>
-							</includes>
-						</configuration>
-					</execution>
-
 					<!-- TopSpeedWindowing -->
 					<execution>
 						<id>TopSpeedWindowing</id>

http://git-wip-us.apache.org/repos/asf/flink/blob/39010dce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
deleted file mode 100644
index 56abb12..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.windowing;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.WindowedDataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.util.Collector;
-
-/**
- * This example showcases a moderately complex Flink Streaming pipeline.
- * It to computes statistics on stock market data that arrive continuously,
- * and combines the stock market data with tweet streams.
- * For a detailed explanation of the job, check out the blog post unrolling it.
- * To run the example make sure that the service providing the text data
- * is already up and running.
- *
- * <p>
- * To start an example socket text stream on your local machine run netcat from
- * a command line: <code>nc -lk 9999</code>, where the parameter specifies the
- * port number.
- *
- *
- * <p>
- * Usage:
- * <code>StockPrices &lt;hostname&gt; &lt;port&gt; &lt;result path&gt;</code>
- * <br>
- *
- * <p>
- * This example shows how to:
- * <ul>
- * <li>union and join data streams,
- * <li>use different windowing policies,
- * <li>define windowing aggregations.
- * </ul>
- *
- * @see <a href="www.openbsd.org/cgi-bin/man.cgi?query=nc">netcat</a>
- * @see <a href="http://flink.apache.org/news/2015/02/09/streaming-example.html">blogpost</a>
- */
-public class StockPrices {
-
-	private static final ArrayList<String> SYMBOLS = new ArrayList<String>(Arrays.asList("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG"));
-	private static final Double DEFAULT_PRICE = 1000.;
-	private static final StockPrice DEFAULT_STOCK_PRICE = new StockPrice("", DEFAULT_PRICE);
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	@SuppressWarnings({ "serial", "unused" })
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		//Step 1 
-	    //Read a stream of stock prices from different sources and union it into one stream
-		
-		//Read from a socket stream at map it to StockPrice objects
-		DataStream<StockPrice> socketStockStream = env.socketTextStream(hostName, port)
-				.map(new MapFunction<String, StockPrice>() {
-					private String[] tokens;
-
-					@Override
-					public StockPrice map(String value) throws Exception {
-						tokens = value.split(",");
-						return new StockPrice(tokens[0], Double.parseDouble(tokens[1]));
-					}
-				});
-
-		//Generate other stock streams
-		DataStream<StockPrice> SPX_stream = env.addSource(new StockSource("SPX", 10));
-		DataStream<StockPrice> FTSE_stream = env.addSource(new StockSource("FTSE", 20));
-		DataStream<StockPrice> DJI_stream = env.addSource(new StockSource("DJI", 30));
-		DataStream<StockPrice> BUX_stream = env.addSource(new StockSource("BUX", 40));
-
-		//Merge all stock streams together
-		@SuppressWarnings("unchecked")
-		DataStream<StockPrice> stockStream = socketStockStream.union(SPX_stream, FTSE_stream, DJI_stream, BUX_stream);
-		
-		//Step 2
-	    //Compute some simple statistics on a rolling window
-		WindowedDataStream<StockPrice> windowedStream = stockStream
-				.window(Time.of(10, TimeUnit.SECONDS))
-				.every(Time.of(5, TimeUnit.SECONDS));
-
-		DataStream<StockPrice> lowest = windowedStream.minBy("price").flatten();
-		DataStream<StockPrice> maxByStock = windowedStream.groupBy("symbol").maxBy("price").flatten();
-		DataStream<StockPrice> rollingMean = windowedStream.groupBy("symbol").mapWindow(new WindowMean()).flatten();
-
-		//Step 3
-		//Use  delta policy to create price change warnings, and also count the number of warning every half minute
-
-		DataStream<String> priceWarnings = stockStream.groupBy("symbol")
-				.window(Delta.of(0.05, new DeltaFunction<StockPrice>() {
-					@Override
-					public double getDelta(StockPrice oldDataPoint, StockPrice newDataPoint) {
-						return Math.abs(oldDataPoint.price - newDataPoint.price);
-					}
-				}, DEFAULT_STOCK_PRICE))
-				.mapWindow(new SendWarning()).flatten();
-
-
-		DataStream<Count> warningsPerStock = priceWarnings.map(new MapFunction<String, Count>() {
-			@Override
-			public Count map(String value) throws Exception {
-				return new Count(value, 1);
-			}
-		}).groupBy("symbol").window(Time.of(30, TimeUnit.SECONDS)).sum("count").flatten();
-
-		//Step 4
-		//Read a stream of tweets and extract the stock symbols
-		DataStream<String> tweetStream = env.addSource(new TweetSource());
-
-		DataStream<String> mentionedSymbols = tweetStream.flatMap(new FlatMapFunction<String, String>() {
-			@Override
-			public void flatMap(String value, Collector<String> out) throws Exception {
-				String[] words = value.split(" ");
-				for (String word : words) {
-					out.collect(word.toUpperCase());
-				}
-			}
-		}).filter(new FilterFunction<String>() {
-			@Override
-			public boolean filter(String value) throws Exception {
-				return SYMBOLS.contains(value);
-			}
-		});
-
-		DataStream<Count> tweetsPerStock = mentionedSymbols.map(new MapFunction<String, Count>() {
-			@Override
-			public Count map(String value) throws Exception {
-				return new Count(value, 1);
-			}
-		}).groupBy("symbol")
-				.window(Time.of(30, TimeUnit.SECONDS))
-				.sum("count").flatten();
-
-		//Step 5
-		//For advanced analysis we join the number of tweets and the number of price change warnings by stock
-		//for the last half minute, we keep only the counts. We use this information to compute rolling correlations
-		//between the tweets and the price changes
-
-		DataStream<Tuple2<Integer, Integer>> tweetsAndWarning = warningsPerStock.join(tweetsPerStock)
-				.onWindow(30, TimeUnit.SECONDS)
-				.where("symbol")
-				.equalTo("symbol")
-				.with(new JoinFunction<Count, Count, Tuple2<Integer, Integer>>() {
-					@Override
-					public Tuple2<Integer, Integer> join(Count first, Count second) throws Exception {
-						return new Tuple2<Integer, Integer>(first.count, second.count);
-					}
-				});
-
-		DataStream<Double> rollingCorrelation = tweetsAndWarning
-				.window(Time.of(30, TimeUnit.SECONDS))
-				.mapWindow(new WindowCorrelation()).flatten();
-
-		if (fileOutput) {
-			rollingCorrelation.writeAsText(outputPath, 1);
-		} else {
-			rollingCorrelation.print();
-		}
-
-		env.execute("Stock stream");
-
-	}
-
-	// *************************************************************************
-	// DATA TYPES
-	// *************************************************************************
-
-	public static class StockPrice implements Serializable {
-
-		private static final long serialVersionUID = 1L;
-		public String symbol;
-		public Double price;
-
-		public StockPrice() {
-		}
-
-		public StockPrice(String symbol, Double price) {
-			this.symbol = symbol;
-			this.price = price;
-		}
-
-		@Override
-		public String toString() {
-			return "StockPrice{" +
-					"symbol='" + symbol + '\'' +
-					", count=" + price +
-					'}';
-		}
-	}
-
-	public static class Count implements Serializable{
-		
-		private static final long serialVersionUID = 1L;
-		public String symbol;
-		public Integer count;
-
-		public Count() {
-		}
-
-		public Count(String symbol, Integer count) {
-			this.symbol = symbol;
-			this.count = count;
-		}
-
-		@Override
-		public String toString() {
-			return "Count{" +
-					"symbol='" + symbol + '\'' +
-					", count=" + count +
-					'}';
-		}
-	}
-
-	// *************************************************************************
-	// USER FUNCTIONS
-	// *************************************************************************
-
-	public final static class StockSource extends RichSourceFunction<StockPrice> {
-
-		private static final long serialVersionUID = 1L;
-		private Double price;
-		private String symbol;
-		private Integer sigma;
-		private transient Random random;
-
-
-
-		public StockSource(String symbol, Integer sigma) {
-			this.symbol = symbol;
-			this.sigma = sigma;
-			price = DEFAULT_PRICE;
-
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			random = new Random();
-
-		}
-
-		@Override
-		public boolean reachedEnd() throws Exception {
-			return false;
-		}
-
-		@Override
-		public StockPrice next() throws Exception {
-			price = price + random.nextGaussian() * sigma;
-			Thread.sleep(random.nextInt(200));
-			return new StockPrice(symbol, price);
-		}
-
-	}
-
-	public final static class WindowMean implements WindowMapFunction<StockPrice, StockPrice> {
-
-		private static final long serialVersionUID = 1L;
-		private Double sum = 0.0;
-		private Integer count = 0;
-		private String symbol = "";
-
-		@Override
-		public void mapWindow(Iterable<StockPrice> values, Collector<StockPrice> out) throws Exception {
-			if (values.iterator().hasNext()) {
-
-				for (StockPrice sp : values) {
-					sum += sp.price;
-					symbol = sp.symbol;
-					count++;
-				}
-				out.collect(new StockPrice(symbol, sum / count));
-			}
-		}
-	}
-
-	public static final class TweetSource extends RichSourceFunction<String> {
-
-		private static final long serialVersionUID = 1L;
-		private transient Random random;
-		private transient StringBuilder stringBuilder;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			random = new Random();
-			stringBuilder = new StringBuilder();
-		}
-
-		@Override
-		public boolean reachedEnd() throws Exception {
-			return false;
-		}
-
-		@Override
-		public String next() throws Exception {
-			stringBuilder.setLength(0);
-			for (int i = 0; i < 3; i++) {
-				stringBuilder.append(" ");
-				stringBuilder.append(SYMBOLS.get(random.nextInt(SYMBOLS.size())));
-			}
-			Thread.sleep(500);
-			return stringBuilder.toString();
-		}
-
-	}
-
-	public static final class SendWarning implements WindowMapFunction<StockPrice, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void mapWindow(Iterable<StockPrice> values, Collector<String> out) throws Exception {
-			if (values.iterator().hasNext()) {
-				out.collect(values.iterator().next().symbol);
-			}
-		}
-	}
-
-	public static final class WindowCorrelation implements WindowMapFunction<Tuple2<Integer, Integer>, Double> {
-
-		private static final long serialVersionUID = 1L;
-		private Integer leftSum;
-		private Integer rightSum;
-		private Integer count;
-
-		private Double leftMean;
-		private Double rightMean;
-
-		private Double cov;
-		private Double leftSd;
-		private Double rightSd;
-
-		@Override
-		public void mapWindow(Iterable<Tuple2<Integer, Integer>> values, Collector<Double> out) throws Exception {
-
-			leftSum = 0;
-			rightSum = 0;
-			count = 0;
-
-			cov = 0.;
-			leftSd = 0.;
-			rightSd = 0.;
-
-			//compute mean for both sides, save count
-			for (Tuple2<Integer, Integer> pair : values) {
-				leftSum += pair.f0;
-				rightSum += pair.f1;
-				count++;
-			}
-
-			leftMean = leftSum.doubleValue() / count;
-			rightMean = rightSum.doubleValue() / count;
-
-			//compute covariance & std. deviations
-			for (Tuple2<Integer, Integer> pair : values) {
-				cov += (pair.f0 - leftMean) * (pair.f1 - rightMean) / count;
-			}
-
-			for (Tuple2<Integer, Integer> pair : values) {
-				leftSd += Math.pow(pair.f0 - leftMean, 2) / count;
-				rightSd += Math.pow(pair.f1 - rightMean, 2) / count;
-			}
-			leftSd = Math.sqrt(leftSd);
-			rightSd = Math.sqrt(rightSd);
-
-			out.collect(cov / (leftSd * rightSd));
-		}
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String hostName;
-	private static int port;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		// parse input arguments
-		if (args.length == 3) {
-			fileOutput = true;
-			hostName = args[0];
-			port = Integer.valueOf(args[1]);
-			outputPath = args[2];
-		} else if (args.length == 2) {
-			hostName = args[0];
-			port = Integer.valueOf(args[1]);
-		} else {
-			System.err.println("Usage: StockPrices <hostname> <port> [<output path>]");
-			return false;
-		}
-		return true;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/39010dce/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 9032abd..239f1fa 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -121,7 +121,8 @@ object WindowJoin {
         salariesPath = args(1)
         outputPath = args(2)
       } else {
-        System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> <input path 2> " + "<result path>")
+        System.err.println("Usage: WindowJoin <result path> or WindowJoin <input path 1> " +
+          "<input path 2> <result path>")
         return false
       }
     } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/39010dce/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
deleted file mode 100644
index 4940c6c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.scala.examples.windowing
-
-import java.util.concurrent.TimeUnit._
-
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.windowing.Delta
-import org.apache.flink.streaming.api.windowing.helper.Time
-import org.apache.flink.util.Collector
-
-import scala.util.Random
-
-/**
- * This example showcases a moderately complex Flink Streaming pipeline.
- * It to computes statistics on stock market data that arrive continuously,
- * and combines the stock market data with tweet streams.
- * For a detailed explanation of the job, check out the
- * [[http://flink.apache.org/news/2015/02/09/streaming-example.html blog post]]
- * unrolling it. To run the example make sure that the service providing
- * the text data is already up and running.
- *
- * To start an example socket text stream on your local machine run netcat
- * from a command line, where the parameter specifies the port number:
- *
- * {{{
- *   nc -lk 9999
- * }}}
- *
- * Usage:
- * {{{
- *   StockPrices <hostname> <port> <output path>
- * }}}
- *
- * This example shows how to:
- *
- *   - union and join data streams,
- *   - use different windowing policies,
- *   - define windowing aggregations.
- */
-object StockPrices {
-
-  case class StockPrice(symbol: String, price: Double)
-  case class Count(symbol: String, count: Int)
-
-  val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")
-
-  val defaultPrice = StockPrice("", 1000)
-
-  private var fileOutput: Boolean = false
-  private var hostName: String = null
-  private var port: Int = 0
-  private var outputPath: String = null
-
-  def main(args: Array[String]) {
-
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    //Step 1 
-    //Read a stream of stock prices from different sources and union it into one stream
-
-    //Read from a socket stream at map it to StockPrice objects
-    val socketStockStream = env.socketTextStream(hostName, port).map(x => {
-      val split = x.split(",")
-      StockPrice(split(0), split(1).toDouble)
-    })
-
-    //Generate other stock streams
-    val SPX_Stream = env.addSource(generateStock("SPX")(10))
-    val FTSE_Stream = env.addSource(generateStock("FTSE")(20))
-    val DJI_Stream = env.addSource(generateStock("DJI")(30))
-    val BUX_Stream = env.addSource(generateStock("BUX")(40))
-
-    //Union all stock streams together
-    val stockStream = socketStockStream.union(SPX_Stream, FTSE_Stream, DJI_Stream, BUX_Stream)
-
-    //Step 2
-    //Compute some simple statistics on a rolling window
-    val windowedStream = stockStream.window(Time.of(10, SECONDS)).every(Time.of(5, SECONDS))
-
-    val lowest = windowedStream.minBy("price")
-    val maxByStock = windowedStream.groupBy("symbol").maxBy("price").getDiscretizedStream
-    val rollingMean = windowedStream.groupBy("symbol").mapWindow(mean _).getDiscretizedStream
-
-    //Step 3 
-    //Use delta policy to create price change warnings,
-    // and also count the number of warning every half minute
-
-    val priceWarnings = stockStream.groupBy("symbol")
-      .window(Delta.of(0.05, priceChange, defaultPrice))
-      .mapWindow(sendWarning _)
-      .flatten()
-      
-    val warningsPerStock = priceWarnings
-      .map(Count(_, 1))
-      .window(Time.of(30, SECONDS))
-      .groupBy("symbol")
-      .sum("count")
-      .flatten()
-      
-    //Step 4 
-    //Read a stream of tweets and extract the stock symbols
-
-    val tweetStream = env.addSource(generateTweets)
-
-    val mentionedSymbols = tweetStream.flatMap(tweet => tweet.split(" "))
-      .map(_.toUpperCase())
-      .filter(symbols.contains(_))                     
-                    
-    val tweetsPerStock = mentionedSymbols.map(Count(_, 1))
-      .window(Time.of(30, SECONDS))
-      .groupBy("symbol")
-      .sum("count")
-      .flatten()
-      
-    //Step 5
-    //For advanced analysis we join the number of tweets and
-    //the number of price change warnings by stock
-    //for the last half minute, we keep only the counts.
-    //This information is used to compute rolling correlations
-    //between the tweets and the price changes                              
-
-    val tweetsAndWarning = warningsPerStock
-      .join(tweetsPerStock)
-      .onWindow(30, SECONDS)
-      .where("symbol")
-      .equalTo("symbol") { (c1, c2) => (c1.count, c2.count) }
-    
-
-    val rollingCorrelation = tweetsAndWarning.window(Time.of(30, SECONDS))
-      .mapWindow(computeCorrelation _)
-      .flatten()
-
-    if (fileOutput) {
-      rollingCorrelation.writeAsText(outputPath, 1)
-    } else {
-      rollingCorrelation.print
-    }
-
-    env.execute("Stock stream")
-  }
-
-  def priceChange(p1: StockPrice, p2: StockPrice): Double = {
-    Math.abs(p1.price / p2.price - 1)
-  }
-
-  def mean(ts: Iterable[StockPrice], out: Collector[StockPrice]) = {
-    if (ts.nonEmpty) {
-      out.collect(StockPrice(ts.head.symbol, ts.foldLeft(0: Double)(_ + _.price) / ts.size))
-    }
-  }
-
-  def sendWarning(ts: Iterable[StockPrice], out: Collector[String]) = {
-    if (ts.nonEmpty) out.collect(ts.head.symbol)
-  }
-
-  def computeCorrelation(input: Iterable[(Int, Int)], out: Collector[Double]) = {
-    if (input.nonEmpty) {
-      val var1 = input.map(_._1)
-      val mean1 = average(var1)
-      val var2 = input.map(_._2)
-      val mean2 = average(var2)
-
-      val cov = average(var1.zip(var2).map(xy => (xy._1 - mean1) * (xy._2 - mean2)))
-      val d1 = Math.sqrt(average(var1.map(x => Math.pow((x - mean1), 2))))
-      val d2 = Math.sqrt(average(var2.map(x => Math.pow((x - mean2), 2))))
-
-      out.collect(cov / (d1 * d2))
-    }
-  }
-
-  def generateStock(symbol: String)(sigma: Int) = {
-    var price = 1000.0
-    () =>
-      price = price + Random.nextGaussian * sigma
-      Thread.sleep(Random.nextInt(200))
-      StockPrice(symbol, price)
-
-  }
-
-  def average[T](ts: Iterable[T])(implicit num: Numeric[T]) = {
-    num.toDouble(ts.sum) / ts.size
-  }
-
-  def generateTweets = {
-    () =>
-      val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size)))
-      Thread.sleep(Random.nextInt(500))
-      s.mkString(" ")
-  }
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length == 3) {
-      fileOutput = true
-      hostName = args(0)
-      port = args(1).toInt
-      outputPath = args(2)
-    } else if (args.length == 2) {
-      hostName = args(0)
-      port = args(1).toInt
-    } else {
-      System.err.println("Usage: StockPrices <hostname> <port> [<output path>]")
-      return false
-    }
-    true
-  }
-
-}