You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ch...@apache.org on 2017/09/28 15:55:00 UTC

[19/57] [abbrv] incubator-predictionio git commit: [PIO-107] Removal of examples under examples/experimental.

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/README.md
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/README.md b/examples/experimental/scala-stock/README.md
deleted file mode 100644
index d780820..0000000
--- a/examples/experimental/scala-stock/README.md
+++ /dev/null
@@ -1,388 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-# Using predictionIO to run predictions using Yahoo Finance
-
-This tutorial assumes you have gone through the quickstart guide for
-PredictionIO.
-
-## Setting up the environment
-
-### Step 1: Get your Pandas
-Where: PredictionIO-Python-SDK
-
-Run: sudo pip install pandas==0.13.1
-
-pip command not found? install python from curl -O
-
-https://bootstrap.pypa.io/get-pip.py
-
-python get-pip.py
-
-and then run sudo pip install pandas
-
-### Step 2: Edit import_yahoo.py
-Where: PredictionIO-Python-SDK/examples/import_yahoo.py
-
-At the end of file, find the following:
-```
-if __name__ == '__main__':
-  #import_all(app_id=?)
-  import_data_with_gaps(app_id=1)
-  #import_one(app_id=1)
-```
-And, uncomment the first import, replacing app_id with your own id. Next, comment the second import statement (import_data_with_gaps).
-
-### Step 3: Import Yahoo Finance data.
-Where: PredictionIO-Python-SDK/examples
-
-Run: sudo python -m examples.import_yahoo
-
-
-### Step 4: Now make the distribution of PredictionIO
-Where: cloned PredictionIO directory (with source code, make sure code is updated, git pull)
-```
-./make-distribution.sh
-```
-### Step 5: Edit scala-stock
-go to examples/scala-stock/src/main/scala
-
-Edit YahooDataSource.scala
-
-Go to end of file to PredefinedDSP object
-
-Edit app_id to match the one from step 2
-
-### Step 6: Run scala-stock
-Go to PredictionIO/examples/scala-stock
-
-Now type:
-```
-../../bin/pio run --asm org.apache.predictionio.examples.stock.YahooDataSourceRun -- --master <Your spark master address found at http:local8080> --driver-memory <4-12G>
-```
-### Step 7: Open dashboard and view results
-In PredictionIO folder
-
-Type /bin/pio dashboard
-
-go to url: http://localhost:9000 to view output
-
-
----
-#PredictionIO Scala Stock Tutorial
-
-##Implementing New Indicators
-
-If you don't need to introduce any new indicators, skip this step. To introduce a new indicator, go to Indicators.scala and implement the `BaseIndicator` class. 'ShiftsIndicator' can serve as an example for how to do this:
-
-```
-abstract class BaseIndicator extends Serializable {
-    def getTraining(logPrice: Series[DateTime, Double]): Series[DateTime, Double]
-	def getOne(input: Series[DateTime, Double]): Double
-	def minWindowSize(): Int
-}
-```
-####`getTraining`
-######Parameters:
-This function takes in a Series of `logPrices` created from the closing prices of the YahooFinanace Data imported in YahooFinance.scala. `logPrices` refers to the logarithm of each price in the series.
-######Functionality:
-Performs a transformation over every value in the series to return a training series that will be used in the regression.
-####`getOne()`
-######Parameters:
-This function takes in a smaller window of the price series representing the closing price of a single stock over `minWindowSize()`. The size of this series is defined by the return value of `minWindowSize()`
-######Functionality:
-This function performs the same function as `getTraining()` over a smaller window defined by the return value of `minWindowSize()`. 
-####`minWindowSize()`
-######Functionality:
-Returns the minimum window sized required to do a single calculation of the Indicator function implemented by `getTraining()`.
-For example, an indicator that requires seeing the previous day to make a calculation would have a `minWindowSize` of 2 days.
-
-##Running New Indicators
-To change which indicators to include in your run, open YahooDataSource.scala. This file imports the Yahoo finance data, prepares it, and runs the predictive engine.
-
-Navigate to Workflow.run() in the `YahooDataSourceRun` object:
-```
-   Workflow.run(
-      dataSourceClassOpt = Some(classOf[YahooDataSource]),
-      dataSourceParams = dsp,
-      preparatorClassOpt = Some(IdentityPreparator(classOf[DataSource])),
-      algorithmClassMapOpt = Some(Map(
-        //"" -> classOf[MomentumStrategy]
-        "" -> classOf[RegressionStrategy]
-      )),
-      //algorithmParamsList = Seq(("", momentumParams)),
-      algorithmParamsList = Seq(("", RegressionStrategyParams(Seq[(String, BaseIndicator)](
-        ("RSI1", new RSIIndicator(period=1)), 
-        ("RSI5", new RSIIndicator(period=5)), 
-        ("RSI22", new RSIIndicator(period=22))), 
-      200))),
-      servingClassOpt = Some(FirstServing(classOf[EmptyStrategy])),
-      metricsClassOpt = Some(classOf[BacktestingMetrics]),
-      metricsParams = metricsParams,
-      params = WorkflowParams(
-        verbose = 0,
-        saveModel = false,
-```
-####  `algorithmParamsList ()`
-Edit these parameters to include any newly implemented indicator functions the predictive model should include. In this example you would change 'RSIIndicator' to your new indicator. If you would like to modify the strategy used, make sure to also change the constructor parameter, i.e. 'RegressionStrategyParams'.
-
-### Viewing Your Results
-To view the backtesting metrics, open up localhost:9000 in your browser and click on the 'HTML' button for the run you want to see the results of. On this page, the sharpe ratio is indicative of how effective your indicator implementations are in predictions.
-
-### Tips and Tricks
-To reduce run time run your code on a smaller dataset:  
-1. Open `YahooDataSource.scala`  
-2. Look for line `val dsp = PredefinedDSP.BigSP500` and comment it out
-3. Uncomment out the line: `val dsp = PredefinedDSP.SmallSP500`  
-4. In the `PredefinedDSP` object, switch the `appId` for `BigSP500`with the `appId` for `SmallSP500`. *Note: It should match the appID in your runnable  
-
-To view standard output:  
-1. Open localhost:8080 (spark)  
-2. Click on the most recent worker  
-3. Click on stdout or stderr to view the data in 100kb increments  
-
-
----
-
-
-# OLD DOCUMENTATION
-
-# (This doc is out-of-sync with the actual implementation)
-
-## How to implement a stock prediction algorithm
-
-### Fetch data from external source
-You only need to do it once. (Unless you want to update your data.)
-
-We provide a script which extracts historical data of SP500 stocks from Yahoo
-and store the data in your local data storage. Make sure you have setup storage
-according to [storage setup instructions](/README.md). Feel free to substitute
-with your favorite stock data source.
-
-Specify environment. (Temporary.)
-```
-$ cd $PIO_HOME/examples
-$ set -a
-$ source ../conf/pio-env.sh
-$ set +a
-```
-where `$PIO_HOME` is the root directory of PredictionIO's code tree.
-
-Run the fetch script.
-```
-$ ../sbt/sbt "runMain org.apache.predictionio.examples.stock.FetchMain"
-```
-As SP500 constituents change all the time, the hardcoded list may not reflect
-the current state and the script may fail to extract delisted tickers. Whilst
-the stock engine is designed to accomodate missing / incomplete data, you may as
-well update the ticker list in [stock engine
-settings](/engines/src/main/scala/stock/Stock.scala).
-
-### High Level Description
-A stock prediction algorithms employs a *rolling window* estimation method. For
-a given window with `testingWindowSize`, the algorithm trains a model with data
-in the window, and then the model is evaluated with another set of data.
-
-For example, it builds a model with data from Aug 31, 2012 to Aug 31, 2013, and
-evaluate the model with data from Sept 1, 2013 to Oct 1, 2013. And then, the
-training is rolled one month forward: model training with data from Sept 30,
-2012 to Sept 30, 2013, and evaluation with data from Oct 1, 2013 to Nov 1, 2013,
-and so on.
-
-Training Data | Testing Data
---------------|-------------
-2012-08-31 -> 2013-08-31 | 2013-09-01 -> 2013-10-01
-2012-09-30 -> 2013-09-30 | 2013-10-01 -> 2013-11-01
-2012-10-31 -> 2013-10-31 | 2013-11-01 -> 2013-12-01
-2012-11-30 -> 2013-11-30 | 2013-12-01 -> 2014-01-01
-2012-12-31 -> 2013-12-31 | 2014-01-01 -> 2014-02-01
-... | ...
-
-For an algorithm developer, the task is to create a model with the training
-data, and then make prediction based on testing data.
-
-### Key Data Structures
-
-#### Training Data
-[Training data](Data.scala) has 4 main fields. `tickers` is the list of tickers
-that is used for training. `mktTicker` is the ticker of market, for calculating
-beta related metrics. `timeIndex` is a list of dates representing the time
-window used for training. `price` is a map from ticker to array of stock price,
-the array is of the same length as `timeIndex`. Notice that only tickers that
-are active through out the whole time window is included, i.e. for example, if
-the `TrainingData`'s time window is from 2012-01-01 to 2013-01-01, Facebook (FB)
-will not be included in the training data.
-
-```scala
-class TrainingData(
-  val tickers: Seq[String],
-  val mktTicker: String,
-  val timeIndex: Array[DateTime],
-  val price: Array[(String, Array[Double])])
-```
-
-#### Query
-[Query](Data.scala) is the input for prediction. Most of the fields resemble
-`TrainingData`, with an additional field `tomorrow` indicating the date of the
-prediction output. Algorithm builders take this as input, together with the
-trained model created with `TrainingData` to make prediction.
-
-```scala
-class Query(
-  val mktTicker: String,
-  val tickerList: Seq[String],
-  val timeIndex: Array[DateTime],
-  val price: Array[(String, Array[Double])],
-  val tomorrow: DateTime)
-```
-
-#### Target
-[Target](Data.scala) is the output for prediction. It is essentially a map from
-ticker to predicted return. Notice that the prediction need not to match
-`Query`, if an algorithm cannot make prediction some symbols, it can just leave
-them out.
-
-### The Algorithm
-Stock prediction algorithms should extends [StockAlgorithm](Stock.scala) class.
-
-```scala
-abstract class StockAlgorithm[P <: Params : ClassTag, M : ClassTag]
-  extends LAlgorithm[P, TrainingData, M, Query, Target] {
-  def train(trainingData: TrainingData): M
-  def predict(model: M, query: Query): Target
-}
-```
-
-#### RegressionAlgorithm
-[RegressionAlgrotihm](RegressionAlgorithm.scala) creates a linear model for each
-stock using a vector comprised of the 1-day, 1-week, and 1-month return of the
-stock.
-
-#### RandomAlgorithm
-[RandomAlgorithm](RandomAlgorithm.scala) produces a gaussian random variable
-with scaling and drift.
-
-### Evaluation: Backtesting Metrics
-This is the most common method in quantitative equity research. We test the
-stock prediction algorithm against historical data. For each day in the
-evaluation period, we open or close positions according to the prediction of the
-algorithm. This allows us to simulate the daily P/L, volatility, and drawdown of
-the prediction algorithm.
-
-[BacktestingMetrics](BacktestingMetrics.scala) takes three parameters:
-`enterThreshold` the minimum predicted return to open a new position,
-`exitThreshold` the maximum predicted return to close an existing position, and
-`maxPositions` is the maximum number of open positions. Everyday, this metrics
-adjusts its portfolio based on the stock algorithm's prediction `Target`. For a
-current position, if its predicted return is lower than `exitThreshold`, then
-metrics will close this positions. On the other hand, metrics will look at all
-stocks that have predicted return higher than `enterThreshold`, and will
-repeatedly open new ones with maximum predicted value until it reaches
-`maxPositions`.
-
-##### First Evaluation: Demo1
-[stock.Demo1](Demo1.scala) shows a sample Runner program. To run this
-evaluation, you have to specify two sets of parameters:
-
-1. `DataSourceParams` governs the ticker and the time window you wish to
-   evaluate. Here is the parameter we used in stock.Demo1.
-   ```scala
-   val dataSourceParams = new DataSourceParams(
-     baseDate = new DateTime(2004, 1, 1, 0, 0),
-     fromIdx = 400,
-     untilIdx = 1000,
-     trainingWindowSize = 300,
-     evaluationInterval = 20,
-     marketTicker = "SPY",
-     tickerList = Seq("AAPL"))
-   ```
-   This means we start our evaluation at 400 market days after the first day of
-   2004 until 1200 days. Our `evalutionInterval` is 20 and `trainingWindowSize`
-   is 300, meaning that we use day 100 until day 400 as *the first slice* of
-   training data, and evaluate with data from day 400 until 420. Then, this
-   process repeats, day 120 until 420 for training, and evaluation with data
-   from day 420 until 440, until it reaches `untilIdx`. We only specify one
-   ticker GOOGL for now, but our stock engine actually supports multiple
-   tickers.
-
-2. `BacktestingParams` governs the backtesting evaluation.
-   ```scala
-   val backtestingParams = BacktestingParams(enterThreshold = 0.001,
-                                             exitThreshold = 0.0)
-   ```
-   As explained above, the backtesting evaluator opens a new long position when
-   the prediction of stock is higher than 0.001, and will exit such position
-   when the prediction is lower than 0.0.
-
-You can run the evaluation with the following command.
-```
-$ cd $PIO_HOME/examples
-$ ../bin/pio-run org.apache.predictionio.examples.stock.Demo1
-```
-
-You should see that we are trading from April 2005 until Dec 2007, the NAV went
-from $1,000,000 to $1,433,449.24. YaY!!!
-
-(**Disclaimer**: I cherrypicked this parameter to make the demo looks nice. A
-buy-and-hold strategy of AAPL from April 2005 until 2007 performs even better.
-And, you will lose money if you let it run for longer: set `untilIdx = 1400`.)
-
-##### Second Evaluation
-[stock.Demo2](Demo2.scala) shows us how to run backtesting against a basket of
-stocks. You simply specify a list of tickers in the tickerList. `DataSource` wil
-l handle cases where the ticker was absent.
-
-In `BacktestingParams`, you may allow more stocks to be held concurrently. The
-backtesting class essentially divides the current NAV by the `maxPositions`. The
-demo is run the same way, by specifying the running main class.
-```
-$ cd $PIO_HOME/examples
-$ ../bin/pio-run org.apache.predictionio.examples.stock.Demo2
-```
-
-The result is not as great, of course.
-
-##### Third Evaluation
-Now, you may start wondering what actually contribute to the profit, and may
-want to dive deeper into the data. [DailyMetrics](DailyMetrics.scala) is a
-helper metrics which helps you to understand the performance of different
-parameter settings. It aggregates the prediction results for different
-`enterThreshold`, and output the average / stdev return of the prediction
-algorithm.
-
-All you need is to change the `metrics` variable to `DailyMetrics`.
-[Demo3](Demo3.scala) shows the actual code. Try it out with:
-```
-$ cd $PIO_HOME/examples
-$ ../bin/pio-run org.apache.predictionio.examples.stock.Demo3
-```
-
-### Last Words
-The current version is only a proof-of-concept for a stock engine using
-PredictionIO infrastructure. *A lot of* possible improvements can be done:
-- Use spark broadcast variable to deliver the whole dataset, instead of
-  repeatedly copy to each Query and TrainingData. StockEngine should wrap around
-  the data object and provide the correct slice of data and prevent look-ahead
-  bias.
-- Better backtesting metrics report. Should also calculate various important
-  metrics like daily / annual return and volatility, sharpe and other metrics,
-  drawdown, etc.
-- Better backtesting method. Should be able to handle cost of capital, margin,
-  transaction costs, variable position sizes, etc.
-- And a lot more.....
-
-

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/build.sbt
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/build.sbt b/examples/experimental/scala-stock/build.sbt
deleted file mode 100644
index d12157f..0000000
--- a/examples/experimental/scala-stock/build.sbt
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "example-scala-stock"
-
-organization := "org.apache.predictionio"
-
-libraryDependencies ++= Seq(
-  "org.apache.predictionio"     %% "core"           % "0.9.1" % "provided",
-  "org.apache.predictionio"     %% "engines"        % "0.9.1" % "provided",
-  "com.github.scopt"  %% "scopt"          % "3.2.0",
-  "commons-io"         % "commons-io"     % "2.4",
-  "org.apache.commons" % "commons-math3"  % "3.3",
-  "org.apache.mahout"  % "mahout-core"    % "0.9",
-  "org.apache.spark"  %% "spark-core"     % "1.2.0" % "provided",
-  "org.apache.spark"  %% "spark-mllib"    % "1.2.0"
-    exclude("org.apache.spark", "spark-core_2.10")
-    exclude("org.eclipse.jetty", "jetty-server"),
-  "org.clapper"       %% "grizzled-slf4j" % "1.0.2",
-  "org.json4s"        %% "json4s-native"  % "3.2.10",
-  "org.scala-saddle"  %% "saddle-core"    % "1.3.2"
-    exclude("ch.qos.logback", "logback-classic"),
-  "org.scalanlp"      %% "breeze"         % "0.9",
-  "org.scalanlp"      %% "breeze-natives" % "0.9",
-  "org.scalanlp"      %% "nak"            % "1.3",
-  "org.scalatest"     %% "scalatest"      % "2.2.0" % "test")
-
-
-mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
-  {
-    case PathList("scala", xs @ _*) => MergeStrategy.discard
-    case PathList("org", "xmlpull", xs @ _*) => MergeStrategy.last
-    case x => old(x)
-  }
-}
-
-lazy val root = (project in file(".")).enablePlugins(SbtTwirl)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/project/assembly.sbt
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/project/assembly.sbt b/examples/experimental/scala-stock/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/experimental/scala-stock/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/project/plugins.sbt
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/project/plugins.sbt b/examples/experimental/scala-stock/project/plugins.sbt
deleted file mode 100644
index 66ee665..0000000
--- a/examples/experimental/scala-stock/project/plugins.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.typesafe.sbt" % "sbt-twirl" % "1.0.2")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/src/main/scala/Algorithm.scala
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/src/main/scala/Algorithm.scala b/examples/experimental/scala-stock/src/main/scala/Algorithm.scala
deleted file mode 100644
index 7512da4..0000000
--- a/examples/experimental/scala-stock/src/main/scala/Algorithm.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.examples.stock
-
-import org.apache.predictionio.controller.LAlgorithm
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.broadcast.Broadcast
-import org.apache.predictionio.controller.EmptyParams
-import org.saddle._
-
-import scala.reflect._
-import scala.reflect.runtime.universe._
-
-import scala.collection.immutable.HashMap
-
-abstract class StockStrategy[M: ClassTag]
-  extends LAlgorithm[
-      TrainingData, 
-      (TrainingData, M), 
-      QueryDate, 
-      Prediction] {
-  def train(trainingData: TrainingData): (TrainingData, M) = {
-    (trainingData, createModel(trainingData.view))
-  }
-
-  def createModel(dataView: DataView): M
-
-  def predict(dataModel: (TrainingData, M), queryDate: QueryDate)
-  : Prediction = {
-    val (trainingData, model) = dataModel
-
-    val rawData = trainingData.rawDataB.value
-
-    val dataView: DataView = 
-      rawData.view(queryDate.idx, trainingData.maxWindowSize)
-
-    val active = rawData._activeFrame
-
-    val activeTickers = dataView
-      .activeFrame()
-      .rowAt(0)
-      .filter(identity)
-      .index.toVec.contents
-
-
-    val query = Query(
-      idx = queryDate.idx, 
-      dataView = dataView,
-      tickers = activeTickers,
-      mktTicker = rawData.mktTicker)
-
-    val prediction: Prediction = onClose(model, query)
-
-    return prediction
-  }
-
-  def onClose(model: M, query: Query): Prediction
-}
-
-class EmptyStrategy extends StockStrategy[AnyRef] {
-  def createModel(dataView: DataView): AnyRef = None
-
-  def onClose(model: AnyRef, query: Query): Prediction = 
-    Prediction(HashMap[String, Double]())
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/src/main/scala/BackTestingMetrics.scala
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/src/main/scala/BackTestingMetrics.scala b/examples/experimental/scala-stock/src/main/scala/BackTestingMetrics.scala
deleted file mode 100644
index c2fbe4b..0000000
--- a/examples/experimental/scala-stock/src/main/scala/BackTestingMetrics.scala
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.examples.stock
-
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.controller.Evaluator
-import org.apache.predictionio.controller.NiceRendering
-import com.github.nscala_time.time.Imports._
-import scala.collection.mutable.{ Map => MMap, ArrayBuffer }
-
-import org.json4s._
-import org.json4s.JsonDSL._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization
-//import org.json4s.native.Serialization.{read, write}
-
-import org.apache.predictionio.engines.util.{ EvaluatorVisualization => MV }
-
-import breeze.stats.{ mean, meanAndVariance, MeanAndVariance }
-
-case class BacktestingParams(
-  val enterThreshold: Double,
-  val exitThreshold: Double,
-  val maxPositions: Int = 1,
-  val optOutputPath: Option[String] = None
-) extends Params {}
-
-// prediction is Ticker -> ({1:Enter, -1:Exit}, ActualReturn)
-class DailyResult(
-  //val date: DateTime,
-  val dateIdx: Int,
-  val toEnter: Seq[String],
-  val toExit: Seq[String])
-extends Serializable {}
-
-case class DailyStat (
-  time: Long,
-  nav: Double,
-  ret: Double,
-  market: Double,
-  positionCount: Int
-)
-
-case class OverallStat (
-  ret: Double,
-  vol: Double,
-  sharpe: Double,
-  days: Int
-)
-
-case class BacktestingResult(
-  daily: Seq[DailyStat],
-  overall: OverallStat
-) with NiceRendering {
-  override def toString(): String = overall.toString
-
-  def toHTML(): String = {
-    implicit val formats = DefaultFormats
-    html.backtesting().toString
-  }
-
-  def toJSON(): String = {
-    implicit val formats = DefaultFormats
-    Serialization.write(this)
-  }
-}
-
-class BacktestingEvaluator(val params: BacktestingParams)
-  extends Evaluator[
-      DataParams, QueryDate, Prediction, AnyRef,
-      DailyResult, Seq[DailyResult], BacktestingResult] {
-
-  def evaluateUnit(queryDate: QueryDate, prediction: Prediction,
-    unusedActual: AnyRef)
-    : DailyResult = {
-
-    val todayIdx = queryDate.idx
-
-    // Decide enter / exit, also sort by pValue desc
-    val data = prediction.data
-    .map { case (ticker, pValue) => {
-      val dir = pValue match {
-        case p if p >= params.enterThreshold => 1
-        case p if p <= params.exitThreshold => -1
-        case _ => 0
-      }
-      (ticker, dir, pValue)
-    }}
-    .toArray
-    .sortBy(-_._3)
-
-    val toEnter = data.filter(_._2 == 1).map(_._1)
-    val toExit = data.filter(_._2 == -1).map(_._1)
-
-    new DailyResult(
-      dateIdx = todayIdx,
-      toEnter = toEnter,
-      toExit = toExit)
-  }
-
-  def evaluateSet(dp: DataParams, input: Seq[DailyResult])
-    : Seq[DailyResult] = input
-
-  def evaluateAll(input: Seq[(DataParams, Seq[DailyResult])])
-  : BacktestingResult = {
-    var dailyResultsSeq = input
-      .map(_._2)
-      .flatten
-      .toArray
-      .sortBy(_.dateIdx)
-
-    var rawData = input.head._1.rawDataB.value
-    val retFrame = rawData._retFrame
-    val priceFrame = rawData._priceFrame
-    val mktTicker = rawData.mktTicker
-
-    val dailyNavs = ArrayBuffer[Double]()
-
-    val dailyStats = ArrayBuffer[DailyStat]()
-
-    val initCash = 1000000.0
-    var cash = initCash
-    // Ticker to current size
-    val positions = MMap[String, Double]()
-    val maxPositions = params.maxPositions
-
-    for (daily <- dailyResultsSeq) {
-      val todayIdx = daily.dateIdx
-      val today = rawData.timeIndex(todayIdx)
-      val todayRet = retFrame.rowAt(todayIdx)
-      val todayPrice = priceFrame.rowAt(todayIdx)
-
-      // Update price change
-      positions.keys.foreach { ticker => {
-        positions(ticker) *= todayRet.first(ticker).get
-      }}
-
-      // Determine exit
-      daily.toExit.foreach { ticker => {
-        if (positions.contains(ticker)) {
-          val money = positions.remove(ticker).get
-          cash += money
-        }
-      }}
-
-      // Determine enter
-      val slack = maxPositions - positions.size
-      val money = cash / slack
-      daily.toEnter
-      .filter(t => !positions.contains(t))
-      .take(slack)
-      .map{ ticker => {
-        cash -= money
-        positions += (ticker -> money)
-      }}
-
-      // Book keeping
-      val nav = cash + positions.values.sum
-
-      val ret = (if (dailyStats.isEmpty) 0 else {
-        val yestStats = dailyStats.last
-        val yestNav = yestStats.nav
-        (nav - yestNav) / nav - 1
-        })
-
-      dailyStats.append(DailyStat(
-        time = today.getMillis(),
-        nav = nav,
-        ret = ret,
-        market = todayPrice.first(mktTicker).get,
-        positionCount = positions.size
-      ))
-    }
-    // FIXME. Force Close the last day.
-
-    val lastStat = dailyStats.last
-
-    //val dailyVariance = meanAndVariance(dailyStats.map(_.ret))._2
-    //val dailyVariance = meanAndVariance(dailyStats.map(_.ret))._2
-    val retStats: MeanAndVariance = meanAndVariance(dailyStats.map(_.ret))
-    //val dailyVol = math.sqrt(dailyVariance)
-    //val annualVol = dailyVariance * math.sqrt(252.0)
-    val annualVol = retStats.stdDev * math.sqrt(252.0)
-    val n = dailyStats.size
-    val totalReturn = lastStat.nav / initCash
-
-    val annualReturn = math.pow(totalReturn, 252.0 / n) - 1
-    val sharpe = annualReturn / annualVol
-
-    val overall = OverallStat(
-      annualReturn,
-      annualVol,
-      sharpe,
-      n)
-
-    val result = BacktestingResult(
-      daily = dailyStats,
-      overall = overall
-    )
-
-    params.optOutputPath.map { path => MV.save(result, path) }
-
-    result
-  }
-}
-
-object RenderMain {
-  def main(args: Array[String]) {
-    MV.render(MV.load[BacktestingResult](args(0)), args(0))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/src/main/scala/Data.scala
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/src/main/scala/Data.scala b/examples/experimental/scala-stock/src/main/scala/Data.scala
deleted file mode 100644
index 56b80c7..0000000
--- a/examples/experimental/scala-stock/src/main/scala/Data.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.examples.stock
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.broadcast.Broadcast
-import org.saddle._
-import org.saddle.index.IndexTime
-import com.github.nscala_time.time.Imports._
-import scala.collection.immutable.HashMap
-
-class RawData(
-  val tickers: Array[String],
-  val mktTicker: String,
-  val timeIndex: Array[DateTime],
-  private[stock] val _price: Array[(String, Array[Double])],
-  private[stock] val _active: Array[(String, Array[Boolean])])
-    extends Serializable {
-
-  @transient lazy val _priceFrame: Frame[DateTime, String, Double] =
-    SaddleWrapper.ToFrame(timeIndex, _price)
-
-  // FIXME. Fill NA of result.
-  @transient lazy val _retFrame: Frame[DateTime, String, Double] = 
-    _priceFrame.shift(1) / _priceFrame
-  
-  @transient lazy val _activeFrame: Frame[DateTime, String, Boolean] =
-    SaddleWrapper.ToFrame(timeIndex, _active)
-
-  def view(idx: Int, maxWindowSize: Int): DataView = 
-    DataView(this, idx, maxWindowSize)
-
-  override def toString(): String = {
-    val timeHead = timeIndex.head
-    val timeLast = timeIndex.last
-    s"RawData[$timeHead, $timeLast, $mktTicker, size=${tickers.size}]"
-  }
-}
-
-// A data view of RawData from [idx - maxWindowSize + 1 : idx]
-// Notice that the last day is *inclusive*.
-// This clas takes the whole RawData reference, hence should *not* be serialized
-case class DataView(val rawData: RawData, val idx: Int, val maxWindowSize: Int) {
-  def today(): DateTime = rawData.timeIndex(idx)
-
-  val tickers = rawData.tickers
-  val mktTicker = rawData.mktTicker
-  
-  def priceFrame(windowSize: Int = 1)
-  : Frame[DateTime, String, Double] = {
-    // Check windowSize <= maxWindowSize
-    rawData._priceFrame.rowSlice(idx - windowSize + 1, idx + 1)
-  }
-  
-  def retFrame(windowSize: Int = 1)
-  : Frame[DateTime, String, Double] = {
-    // Check windowSize <= maxWindowSize
-    rawData._retFrame.rowSlice(idx - windowSize + 1, idx + 1)
-  }
-  
-  def activeFrame(windowSize: Int = 1)
-  : Frame[DateTime, String, Boolean] = {
-    // Check windowSize <= maxWindowSize
-    rawData._activeFrame.rowSlice(idx - windowSize + 1, idx + 1)
-  }
-
-  override def toString(): String = {
-    priceFrame().toString
-  }
-}
-
-
-// Training data visible to the user is [untilIdx - windowSize, untilIdx).
-case class TrainingData(
-  untilIdx: Int,
-  maxWindowSize: Int,
-  rawDataB: Broadcast[RawData]) {
- 
-  def view(): DataView = DataView(rawDataB.value, untilIdx - 1, maxWindowSize)
-}
-
-case class DataParams(rawDataB: Broadcast[RawData])
-
-// Date
-case class QueryDate(idx: Int)
-
-case class Query(
-  val idx: Int,
-  val dataView: DataView,
-  val tickers: Array[String],
-  val mktTicker: String)
-
-// Prediction
-case class Prediction(data: HashMap[String, Double])
-
-object SaddleWrapper {
-  def ToFrame[A](
-    timeIndex: Array[DateTime],
-    tickerPriceSeq: Array[(String, Array[A])]
-    )(implicit st: ST[A])
-  : Frame[DateTime, String, A] = {
-    val index = IndexTime(timeIndex:_ *)
-    val seriesList = tickerPriceSeq.map{ case(ticker, price) => {
-      val series = Series(Vec(price), index)
-      (ticker, series)
-    }}
-    Frame(seriesList:_*)
-  }
-
-  def FromFrame[A](data: Frame[DateTime, String, A]
-    ): (Array[DateTime], Array[(String, Array[A])]) = {
-    val timeIndex = data.rowIx.toVec.contents
-    val tickers = data.colIx.toVec.contents
-
-    val tickerDataSeq = tickers.map{ ticker => {
-      (ticker, data.firstCol(ticker).toVec.contents)
-    }}
-
-    (timeIndex, tickerDataSeq)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/src/main/scala/DataSource.scala b/examples/experimental/scala-stock/src/main/scala/DataSource.scala
deleted file mode 100644
index b8436e7..0000000
--- a/examples/experimental/scala-stock/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.examples.stock
-
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.LDataSource
-import org.apache.predictionio.controller.EmptyParams
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.broadcast.Broadcast
-
-import com.mongodb.casbah.Imports._
-import org.saddle._
-import org.saddle.index.IndexTime
-import com.github.nscala_time.time.Imports._
-
-
-/** Primary parameter for [[[DataSource]]].
-  *
-  * @param baseDate identify the beginning of our global time window, and
-  * the rest are use index.
-  * @param fromIdx the first date for testing
-  * @param untilIdx the last date (exclusive) for testing
-  * @param trainingWindowSize number of days used for training
-  * @param testingWindowSize  number of days for each testing data
-  *
-  * [[[DataSource]]] chops data into (overlapping) multiple
-  * pieces. Within each piece, it is further splitted into training and testing
-  * set. The testing sets is from <code>fromIdx</code> until
-  * <code>untilIdx</code> with a step size of <code>testingWindowSize</code>.
-  * A concrete example: (from, until, windowSize) = (100, 150, 20), it generates
-  * three testing sets corresponding to time range: [100, 120), [120, 140), [140, 150).
-  * For each testing sets, it also generates the training data set using
-  * <code>maxTrainingWindowSize</code>. Suppose trainingWindowSize = 50 and testing set =
-  * [100, 120), the training set draws data in time range [50, 100).
-  */
-
-case class DataSourceParams(
-  val appid: Int = 1008,
-  val baseDate: DateTime,
-  val fromIdx: Int,
-  val untilIdx: Int,
-  val trainingWindowSize: Int,
-  val maxTestingWindowSize: Int,
-  val marketTicker: String,
-  val tickerList: Seq[String]) extends Params {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/src/main/scala/Indicators.scala
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/src/main/scala/Indicators.scala b/examples/experimental/scala-stock/src/main/scala/Indicators.scala
deleted file mode 100644
index a8a0ac1..0000000
--- a/examples/experimental/scala-stock/src/main/scala/Indicators.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.examples.stock
-
-import org.saddle._
-import org.saddle.index.IndexTime
-
-import scala.collection.immutable.HashMap
-import breeze.linalg.DenseMatrix
-import breeze.linalg.DenseVector
-
-import com.github.nscala_time.time.Imports._
-
-import scala.math
-
-import math._
-
-import nak.regress.LinearRegression
-
-/**
-  * Base class for an indicator.
-  *
-  * All indicators should be defined as classes that extend
-  * this base class. See RSIIndicator as an example. These indicators can then
-  * be instantiated and passed into a StockStrategy class. Refer to tutorial 
-  * for further explanation (found in the README.md file).
-  */
-@SerialVersionUID(100L)
-abstract class BaseIndicator extends Serializable {
-  /** Calculates training series for a particular stock.
-    *
-    * @param logPrice series of logarithm of all prices for a particular stock.
-    *         Logarithm values are recommended for more accurate results.
-    * @return the training series of the stock
-    */
-  def getTraining(logPrice: Series[DateTime, Double]): Series[DateTime, Double]
-
-  /** Applies indicator on a window size of the value returned by 
-    * getMinWindowSize() and returns the last value in the resulting series to
-    * be used for prediction in RegressionStrategy.
-    *
-    * @param logPrice series of logarithm of all prices for a particular stock
-    * @return the last value in the resulting series from the feature 
-    *           calculation
-    */
-  def getOne(input: Series[DateTime, Double]): Double
-
-  /** Returns window size to be used in getOne()
-    *
-    * @return the window size
-    */
-  def getMinWindowSize(): Int
-}
-
-/** Indicator that implements a relative strength index formula
-  *
-  * @constructor create an instance of an RSIIndicator
-  * @param period number of days to use for each of the 14 periods
-  *         that are used in the RSI calculation
-  */
-class RSIIndicator(rsiPeriod: Int = 14) extends BaseIndicator {
-
-  private def getRet(dailyReturn: Series[DateTime, Double]) =
-    (dailyReturn - dailyReturn.shift(1)).fillNA(_ => 0.0)
-
-  def getMinWindowSize(): Int = rsiPeriod + 1
-
-  private def calcRS(logPrice: Series[DateTime, Double])
-    : Series[DateTime, Double] = {
-    //Positive and Negative Vecs
-    val posSeries = logPrice.mapValues[Double]((x: Double) 
-      => if (x > 0) x else 0)
-    val negSeries = logPrice.mapValues[Double]((x: Double) 
-      => if (x < 0) x else 0)
-    
-    //Get the sum of positive/negative Frame
-    val avgPosSeries = 
-      posSeries.rolling[Double] (rsiPeriod, (f: Series[DateTime,Double]) 
-        => f.mean)
-    val avgNegSeries = 
-      negSeries.rolling[Double] (rsiPeriod, (f: Series[DateTime,Double]) 
-        => f.mean)
-
-    val rsSeries = avgPosSeries / avgNegSeries
-    rsSeries
-  }
-
-  // Computes RSI of price data over the defined training window time frame
-  def getTraining(logPrice: Series[DateTime, Double])
-    : Series[DateTime, Double] = {
-    val rsSeries = calcRS(getRet(logPrice))
-    val rsiSeries = rsSeries.mapValues[Double]( 
-        (x:Double) => 100 - ( 100 / (1 + x)))
-
-    // Fill in first 14 days offset with 50 to maintain results
-    rsiSeries.reindex(logPrice.rowIx).fillNA(_  => 50.0)
-  }
-
-    // Computes the RSI for the most recent time frame, returns single double
-  def getOne(logPrice: Series[DateTime, Double]): Double = {
-    getTraining(logPrice).last
-  }
-}
-
-/** Indicator that calcuate differences of closing prices
-  *
-  * @constructor create an instance of a ShiftsIndicator
-  * @param period number of days between any 2 closing prices to consider for 
-  *          calculating a return
-  */
-class ShiftsIndicator(period: Int) extends BaseIndicator {
-
-  private def getRet(logPrice: Series[DateTime, Double], frame: Int = period) =
-   (logPrice - logPrice.shift(frame)).fillNA(_ => 0.0)
-
-  def getMinWindowSize(): Int = period + 1
-
-  def getTraining(logPrice: Series[DateTime, Double])
-    : Series[DateTime, Double] = {
-    getRet(logPrice)
-  }
-
-  def getOne(logPrice: Series[DateTime, Double]): Double = {
-    getRet(logPrice).last
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/src/main/scala/RegressionStrategy.scala
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/src/main/scala/RegressionStrategy.scala b/examples/experimental/scala-stock/src/main/scala/RegressionStrategy.scala
deleted file mode 100644
index 7647f6d..0000000
--- a/examples/experimental/scala-stock/src/main/scala/RegressionStrategy.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.examples.stock
-
-import org.apache.predictionio.controller.Params
-
-import org.saddle._
-import org.saddle.index.IndexTime
-
-import scala.collection.immutable.HashMap
-import breeze.linalg.DenseMatrix
-import breeze.linalg.DenseVector
-
-import com.github.nscala_time.time.Imports._
-
-import scala.math
-
-import nak.regress.LinearRegression
-
-/** Regression Strategy parameters case class
-  *
-  * @param indicators a sequence of tuples. The first element is a string that
-  *        can be helpful in acting as a label for the indicator,
-  *        such as for debugging. Pass in empty string if no label
-  *        desired.
-  * @param maxTrainingWindowSize maximum window size of price frame desired
-  *        for training
-  */
-case class RegressionStrategyParams (
-  indicators: Seq[(String, BaseIndicator)],
-  maxTrainingWindowSize: Int
-) extends Params
-
-class RegressionStrategy (params: RegressionStrategyParams) 
-  extends StockStrategy[Map[String, DenseVector[Double]]] {
-
-  private def getRet(logPrice: Frame[DateTime, String, Double], d: Int) =
-    (logPrice - logPrice.shift(d)).mapVec[Double](_.fillNA(_ => 0.0))
-
-  // Regress on specific ticker
-  private def regress(
-    calculatedData: Seq[Series[DateTime, Double]],
-    retF1d: Series[DateTime, Double]) = {
-    val array = (
-      calculatedData.map(_.toVec.contents).reduce(_++_) ++
-      Array.fill(retF1d.length)(1.0)).toArray[Double]
-    val target = DenseVector[Double](retF1d.toVec.contents)
-    val m = DenseMatrix.create[Double](
-      retF1d.length, 
-      calculatedData.length + 1, 
-      array
-    )
-    val result = LinearRegression.regress(m, target)
-    result
-  }
-
-  // Compute each indicator value for training the model
-  private def calcIndicator(logPrice: Series[DateTime, Double]):
-     Seq[Series[DateTime, Double]] = {
-    params.indicators.map { 
-      case(name, indicator) => indicator.getTraining(logPrice)
-    }
-  }
-
-  // Get max period from series of indicators
-  private def getMaxPeriod() : Int = {
-    // create an array of periods
-    val periods = params.indicators.map { 
-      case(name, indicator) => indicator.getMinWindowSize() 
-    }
-    periods.max
-  }
-
-  // Apply regression algorithm on complete dataset to create a model
-  def createModel(dataView: DataView): Map[String, DenseVector[Double]] = {
-    // price: row is time, col is ticker, values are prices
-    val price = dataView.priceFrame(params.maxTrainingWindowSize)
-    val logPrice = price.mapValues(math.log)
-    val active = dataView.activeFrame(params.maxTrainingWindowSize)
-
-    // value used to query prediction results
-    val retF1d = getRet(logPrice, -1)
-
-    val timeIndex = price.rowIx
-    val firstIdx = getMaxPeriod() + 3
-    val lastIdx = timeIndex.length
-
-    // Get array of ticker strings
-    val tickers = price.colIx.toVec.contents
-
-    // For each active ticker, pass in trained series into regress
-    val tickerModelMap = tickers
-    .filter(ticker => (active.firstCol(ticker).findOne(_ == false) == -1))
-    .map(ticker => {
-      val model = regress(
-        calcIndicator(price.firstCol(ticker)).map(_.slice(firstIdx, lastIdx)),
-        retF1d.firstCol(ticker).slice(firstIdx, lastIdx))
-      (ticker, model)
-    }).toMap
-
-    // tickers mapped to model
-    tickerModelMap
-  }
-
-  // returns a prediction for a specific ticker
-  private def predictOne(
-    coef: DenseVector[Double],
-    ticker: String,
-    dataView: DataView): Double = {
-
-    val vecArray = params.indicators.map { case (name, indicator) => {
-      val price = dataView.priceFrame(indicator.getMinWindowSize())
-      val logPrice = price.mapValues(math.log)
-      indicator.getOne(logPrice.firstCol(ticker))
-    }}.toArray
-
-    val densVecArray = vecArray ++ Array[Double](1)
-    val vec = DenseVector[Double](densVecArray)
-  
-    val p = coef.dot(vec)
-    return p
-  }
-
-  // Returns a mapping of tickers to predictions
-  def onClose(model: Map[String, DenseVector[Double]], query: Query)
-  : Prediction = {
-    val dataView = query.dataView
-
-    val prediction = query.tickers
-      .filter(ticker => model.contains(ticker))
-      .map { ticker => {
-        val p = predictOne(
-          model(ticker),
-          ticker,
-          dataView)
-        (ticker, p)
-      }}
-
-    Prediction(HashMap[String, Double](prediction:_*))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/src/main/scala/Run.scala
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/src/main/scala/Run.scala b/examples/experimental/scala-stock/src/main/scala/Run.scala
deleted file mode 100644
index 32c5d14..0000000
--- a/examples/experimental/scala-stock/src/main/scala/Run.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.examples.stock
-
-import org.apache.predictionio.controller.Workflow
-import org.apache.predictionio.controller.WorkflowParams
-import org.apache.predictionio.controller.PIdentityPreparator
-import org.apache.predictionio.controller.EmptyParams
-import org.apache.predictionio.controller.LFirstServing
-import org.apache.predictionio.controller.Params
-import com.github.nscala_time.time.Imports._
-import scala.collection.immutable.HashMap
-import java.io.File
-
-// Buy if l-days daily return is high than s-days daily-return
-case class MomentumStrategyParams(val l: Int, val s: Int) extends Params
-
-class MomentumStrategy(val p: MomentumStrategyParams)
-  extends StockStrategy[AnyRef] {
-
-  def createModel(dataView: DataView): AnyRef = None
-
-  def onClose(model: AnyRef, query: Query): Prediction = {
-    val dataView = query.dataView
-
-    val priceFrame = dataView.priceFrame(p.l + 1)
-    val todayLgPrice = priceFrame.rowAt(p.l).mapValues(math.log)
-    val lLgPrice = priceFrame.rowAt(0).mapValues(math.log)
-    val sLgPrice = priceFrame.rowAt(p.l - p.s).mapValues(math.log)
-
-    val sLgRet = (todayLgPrice - sLgPrice) / p.s
-    val lLgRet = (todayLgPrice - lLgPrice) / p.l
-
-    val output = query.tickers
-    .map { ticker => {
-      val s = sLgRet.first(ticker)
-      val l = lLgRet.first(ticker)
-      val p = l - s
-      (ticker, p)
-    }}
-
-    Prediction(data = HashMap(output:_*))
-  }
-}
-
-
-object Run {
-  val sp500List = Vector(
-    "A", "AA", "AAPL", "ABBV", "ABC", "ABT", "ACE", "ACN", "ACT", "ADBE", "ADI",
-    "ADM", "ADP", "ADS", "ADSK", "ADT", "AEE", "AEP", "AES", "AET", "AFL",
-    "AGN", "AIG", "AIV", "AIZ", "AKAM", "ALL", "ALLE", "ALTR", "ALXN", "AMAT",
-    "AME", "AMGN", "AMP", "AMT", "AMZN", "AN", "AON", "APA", "APC", "APD",
-    "APH", "ARG", "ATI", "AVB", "AVP", "AVY", "AXP", "AZO", "BA", "BAC", "BAX",
-    "BBBY", "BBT", "BBY", "BCR", "BDX", "BEAM", "BEN", "BF-B", "BHI", "BIIB",
-    "BK", "BLK", "BLL", "BMS", "BMY", "BRCM", "BRK-B", "BSX", "BTU", "BWA",
-    "BXP", "C", "CA", "CAG", "CAH", "CAM", "CAT", "CB", "CBG", "CBS", "CCE",
-    "CCI", "CCL", "CELG", "CERN", "CF", "CFN", "CHK", "CHRW", "CI", "CINF",
-    "CL", "CLX", "CMA", "CMCSA", "CME", "CMG", "CMI", "CMS", "CNP", "CNX",
-    "COF", "COG", "COH", "COL", "COP", "COST", "COV", "CPB", "CRM", "CSC",
-    "CSCO", "CSX", "CTAS", "CTL", "CTSH", "CTXS", "CVC", "CVS", "CVX", "D",
-    "DAL", "DD", "DE", "DFS", "DG", "DGX", "DHI", "DHR", "DIS", "DISCA", "DLPH",
-    "DLTR", "DNB", "DNR", "DO", "DOV", "DOW", "DPS", "DRI", "DTE", "DTV", "DUK",
-    "DVA", "DVN", "EA", "EBAY", "ECL", "ED", "EFX", "EIX", "EL", "EMC", "EMN",
-    "EMR", "EOG", "EQR", "EQT", "ESRX", "ESS", "ESV", "ETFC", "ETN", "ETR",
-    "EW", "EXC", "EXPD", "EXPE", "F", "FAST", "FB", "FCX", "FDO", "FDX", "FE",
-    "FFIV", "FIS", "FISV", "FITB", "FLIR", "FLR", "FLS", "FMC", "FOSL", "FOXA",
-    "FRX", "FSLR", "FTI", "FTR", "GAS", "GCI", "GD", "GE", "GGP", "GHC", "GILD",
-    "GIS", "GLW", "GM", "GMCR", "GME", "GNW", "GOOG", "GOOGL", "GPC", "GPS",
-    "GRMN", "GS", "GT", "GWW", "HAL", "HAR", "HAS", "HBAN", "HCBK", "HCN",
-    "HCP", "HD", "HES", "HIG", "HOG", "HON", "HOT", "HP", "HPQ", "HRB", "HRL",
-    "HRS", "HSP", "HST", "HSY", "HUM", "IBM", "ICE", "IFF", "IGT", "INTC",
-    "INTU", "IP", "IPG", "IR", "IRM", "ISRG", "ITW", "IVZ", "JBL", "JCI", "JEC",
-    "JNJ", "JNPR", "JOY", "JPM", "JWN", "K", "KEY", "KIM", "KLAC", "KMB", "KMI",
-    "KMX", "KO", "KORS", "KR", "KRFT", "KSS", "KSU", "L", "LB", "LEG", "LEN",
-    "LH", "LLL", "LLTC", "LLY", "LM", "LMT", "LNC", "LO", "LOW", "LRCX", "LSI",
-    "LUK", "LUV", "LYB", "M", "MA", "MAC", "MAR", "MAS", "MAT", "MCD", "MCHP",
-    "MCK", "MCO", "MDLZ", "MDT", "MET", "MHFI", "MHK", "MJN", "MKC", "MMC",
-    "MMM", "MNST", "MO", "MON", "MOS", "MPC", "MRK", "MRO", "MS", "MSFT", "MSI",
-    "MTB", "MU", "MUR", "MWV", "MYL", "NBL", "NBR", "NDAQ", "NE", "NEE", "NEM",
-    "NFLX", "NFX", "NI", "NKE", "NLSN", "NOC", "NOV", "NRG", "NSC", "NTAP",
-    "NTRS", "NU", "NUE", "NVDA", "NWL", "NWSA", "OI", "OKE", "OMC", "ORCL",
-    "ORLY", "OXY", "PAYX", "PBCT", "PBI", "PCAR", "PCG", "PCL", "PCLN", "PCP",
-    "PDCO", "PEG", "PEP", "PETM", "PFE", "PFG", "PG", "PGR", "PH", "PHM", "PKI",
-    "PLD", "PLL", "PM", "PNC", "PNR", "PNW", "POM", "PPG", "PPL", "PRGO", "PRU",
-    "PSA", "PSX", "PVH", "PWR", "PX", "PXD", "QCOM", "QEP", "R", "RAI", "RDC",
-    "REGN", "RF", "RHI", "RHT", "RIG", "RL", "ROK", "ROP", "ROST", "RRC", "RSG",
-    "RTN", "SBUX", "SCG", "SCHW", "SE", "SEE", "SHW", "SIAL", "SJM", "SLB",
-    "SLM", "SNA", "SNDK", "SNI", "SO", "SPG", "SPLS", "SRCL", "SRE", "STI",
-    "STJ", "STT", "STX", "STZ", "SWK", "SWN", "SWY", "SYK", "SYMC", "SYY", "T",
-    "TAP", "TDC", "TE", "TEG", "TEL", "TGT", "THC", "TIF", "TJX", "TMK", "TMO",
-    "TRIP", "TROW", "TRV", "TSCO", "TSN", "TSO", "TSS", "TWC", "TWX", "TXN",
-    "TXT", "TYC", "UNH", "UNM", "UNP", "UPS", "URBN", "USB", "UTX", "V", "VAR",
-    "VFC", "VIAB", "VLO", "VMC", "VNO", "VRSN", "VRTX", "VTR", "VZ", "WAG",
-    "WAT", "WDC", "WEC", "WFC", "WFM", "WHR", "WIN", "WLP", "WM", "WMB", "WMT",
-    "WU", "WY", "WYN", "WYNN", "X", "XEL", "XL", "XLNX", "XOM", "XRAY", "XRX",
-    "XYL", "YHOO", "YUM", "ZION", "ZMH", "ZTS")
-
-  val tickerList = Seq(
-      "GOOG", "GOOGL", "FB", "AAPL", "AMZN", "MSFT", "IBM", "HPQ", "INTC",
-      "NTAP", "CSCO", "ORCL", "XRX", "YHOO", "AMAT", "QCOM", "TXN", "CRM",
-      "INTU", "WDC", "SNDK")
-
-  def main(args: Array[String]) {
-    val dataSourceParams = (if (false) {
-        new DataSourceParams(
-          baseDate = new DateTime(2002, 1, 1, 0, 0),
-          fromIdx = 300,
-          untilIdx = 2000,
-          trainingWindowSize = 200,
-          maxTestingWindowSize = 20,
-          marketTicker = "SPY",
-          tickerList = tickerList)
-      } else {
-        // Need to pass "--driver-memory 8G" to pio-run since it requires a lot
-        // of driver memory.
-        new DataSourceParams(
-          baseDate = new DateTime(2002, 1, 1, 0, 0),
-          fromIdx = 300,
-          untilIdx = 2000,
-          trainingWindowSize = 200,
-          maxTestingWindowSize = 20,
-          marketTicker = "SPY",
-          tickerList = sp500List)
-      })
-
-    val momentumParams = MomentumStrategyParams(20, 3)
-
-    val evaluatorParams = BacktestingParams(
-      enterThreshold = 0.01,
-      exitThreshold = 0.0,
-      maxPositions = 10,
-      optOutputPath = Some(new File("metrics_results").getCanonicalPath)
-    )
-
-    Workflow.run(
-      dataSourceClassOpt = Some(classOf[YahooDataSource]),
-      dataSourceParams = dataSourceParams,
-      preparatorClassOpt = Some(PIdentityPreparator(classOf[YahooDataSource])),
-      algorithmClassMapOpt = Some(Map(
-        //"" -> classOf[MomentumStrategy]
-        "" -> classOf[RegressionStrategy]
-      )),
-      algorithmParamsList = Seq(("", momentumParams)),
-      servingClassOpt = Some(LFirstServing(classOf[EmptyStrategy])),
-      evaluatorClassOpt = Some(classOf[BacktestingEvaluator]),
-      evaluatorParams = evaluatorParams,
-      params = WorkflowParams(
-        verbose = 0,
-        batch = "Imagine: Stock II"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/src/main/scala/YahooDataSource.scala
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/src/main/scala/YahooDataSource.scala b/examples/experimental/scala-stock/src/main/scala/YahooDataSource.scala
deleted file mode 100644
index d23d402..0000000
--- a/examples/experimental/scala-stock/src/main/scala/YahooDataSource.scala
+++ /dev/null
@@ -1,483 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.examples.stock
-
-// YahooDataSource reads PredictionIO event store directly.
-
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-import org.apache.predictionio.data.view.LBatchView
-import org.apache.predictionio.data.storage.DataMap
-
-import org.joda.time.DateTime
-import org.joda.time.DateTimeZone
-import com.github.nscala_time.time.Imports._
-
-import scala.collection.mutable.{ Map => MMap }
-import scala.collection.GenMap
-
-import org.apache.predictionio.controller._
-import org.apache.predictionio.controller.{ Params => BaseParams }
-
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.broadcast.Broadcast
-
-import org.json4s._
-//import org.saddle._
-
-case class HistoricalData(
-  ticker: String,
-  timeIndex: Array[DateTime],
-  close: Array[Double],
-  adjClose: Array[Double],
-  adjReturn: Array[Double],
-  volume: Array[Double],
-  active: Array[Boolean]) {
-
-  override def toString(): String = {
-    s"HistoricalData($ticker, ${timeIndex.head}, ${timeIndex.last}, " +
-      s"${close.last})"
-  }
-
-  def toDetailedString(): String = {
-    val adjCloseStr = adjClose.mkString("[", ", ", "]")
-    val adjReturnStr = adjReturn.mkString("[", ", ", "]")
-    val activeStr = active.mkString("[", ", ", "]")
-
-    (s"HistoricalData($ticker, ${timeIndex.head}, ${timeIndex.last}, \n" +
-      s"  adjClose=$adjCloseStr\n" +
-      s"  adjReturn=$adjReturnStr\n" +
-      s"  active=$activeStr)")
-  }
-}
-
-object HistoricalData {
-  def apply(ticker: String, timeIndex: Array[DateTime]): HistoricalData = {
-    val n = timeIndex.size
-    HistoricalData(
-      ticker,
-      timeIndex,
-      close = Array.fill(n)(0.0),
-      adjClose = Array.fill(n)(0.0),
-      adjReturn = Array.fill(n)(0.0),
-      volume = Array.fill(n)(0.0),
-      active = Array.fill(n)(false))
-  }
-}
-
-
-class YahooDataSource(val params: YahooDataSource.Params)
-  extends PDataSource[
-      RDD[TrainingData],
-      DataParams,
-      QueryDate,
-      AnyRef] {
-  @transient lazy val batchView = new LBatchView(
-    params.appId, params.startTime, params.untilTime)
-
-  val timezone = DateTimeZone.forID("US/Eastern")
-  val windowParams = params.windowParams
-  val marketTicker = windowParams.marketTicker
-
-  @transient lazy val market: HistoricalData = getTimeIndex()
-  @transient lazy val timeIndex: Array[DateTime] = market.timeIndex
-  @transient lazy val timeIndexSet: Set[DateTime] = timeIndex.toSet
-
-  def merge(intermediate: YahooDataSource.Intermediate, e: Event,
-    timeIndexSetOpt: Option[Set[DateTime]])
-  : YahooDataSource.Intermediate = {
-    val dm: DataMap = e.properties
-
-    // TODO: Check ticker in intermediate
-
-    val yahooData = dm.get[JObject]("yahoo")
-
-    // used by json4s "extract" method.
-    implicit val formats = DefaultFormats
-
-    val closeList = (yahooData \ "close").extract[Array[Double]]
-    val adjCloseList = (yahooData \ "adjclose").extract[Array[Double]]
-    val volumeList = (yahooData \ "volume").extract[Array[Double]]
-
-    val tList: Array[DateTime] = (yahooData \ "t").extract[Array[Long]]
-      .map(t => new DateTime(t * 1000, timezone))
-
-    // Add data either
-    // 1. timeIndex exists and t is in timeIndex, or
-    // 2. timeIndex is None.
-    val newDailyMap: Map[DateTime, YahooDataSource.Daily] =
-      tList.zipWithIndex.drop(1)
-      .filter { case (t, idx) => timeIndexSetOpt.map(_(t)).getOrElse(true) }
-      .map { case (t, idx) =>
-        val adjReturn = (adjCloseList(idx) / adjCloseList(idx - 1)) - 1
-
-        val daily = YahooDataSource.Daily(
-          close = closeList(idx),
-          adjClose = adjCloseList(idx),
-          adjReturn = adjReturn,
-          volume = volumeList(idx),
-          active = true,
-          prevDate = tList(idx - 1))
-
-        (t -> daily)
-      }
-      .toMap
-
-    YahooDataSource.Intermediate(
-      ticker = e.entityId,
-      dailyMap = intermediate.dailyMap ++ newDailyMap)
-  }
-
-  def mergeTimeIndex(intermediate: YahooDataSource.Intermediate, e: Event)
-  : YahooDataSource.Intermediate = merge(intermediate, e, None)
-
-  def mergeStock(intermediate: YahooDataSource.Intermediate, e: Event)
-  : YahooDataSource.Intermediate = merge(intermediate, e, Some(timeIndexSet))
-
-  def finalizeTimeIndex(intermediate: YahooDataSource.Intermediate)
-  : HistoricalData = {
-    val dailyMap = intermediate.dailyMap
-    val ticker = intermediate.ticker
-
-    // Construct the time index with windowParams
-    val timeIndex: Array[DateTime] = dailyMap.keys.toArray
-      .filter(_.isAfter(params.windowParams.baseDate))
-      .sortBy(identity)
-      .take(params.windowParams.untilIdx)
-
-    // Check if the time is continuous
-    (1 until timeIndex.size).foreach { idx => {
-      require(dailyMap(timeIndex(idx)).prevDate == timeIndex(idx - 1),
-        s"Time must be continuous. " +
-        s"For ticker $ticker, there is a gap between " +
-        s"${timeIndex(idx - 1)} and ${timeIndex(idx)}. " +
-        s"Please import data to cover the gap or use a shorter range.")
-    }}
-
-    val adjReturn = timeIndex.map(t => dailyMap(t).adjReturn)
-
-    HistoricalData(
-      ticker = ticker,
-      timeIndex = timeIndex,
-      close = timeIndex.map(t => dailyMap(t).close),
-      adjClose = return2Close(adjReturn),
-      adjReturn = adjReturn,
-      volume = timeIndex.map(t => dailyMap(t).volume),
-      active = Array.fill(timeIndex.size)(true))
-  }
-
-  def return2Close(returns: Array[Double], base: Double = 100.0)
-  : Array[Double] = {
-    var v = base
-    returns.map { ret =>
-      v *= (1 + ret)
-      v
-    }
-  }
-
-  // Traverse the timeIndex to construct the actual time series using dailyMap
-  // and extra fillNA logic.
-  //
-  // The time series is constructed in the same order as the global timeIndex
-  // array. For a datetime t, if dailyMap contains the data, it calls valueFunc
-  // to extract the value; otherwise, it calls fillNaFunc with the optional last
-  // extracted value and get the default value.
-  def activeFilter[A : Manifest](
-    dailyMap: GenMap[DateTime, YahooDataSource.Daily],
-    valueFunc: YahooDataSource.Daily => A,
-    fillNAFunc: Option[A] => A) : Array[A] = {
-
-    var lastOpt: Option[A] = None
-
-    timeIndex
-    .map { t =>
-      if (dailyMap.contains(t)) {
-        val v = valueFunc(dailyMap(t))
-        lastOpt = Some(v)
-        v
-      } else {
-        fillNAFunc(lastOpt)
-      }
-    }
-    .toArray
-  }
-
-  def finalizeStock(intermediate: YahooDataSource.Intermediate)
-  : HistoricalData = {
-    val dailyMap = intermediate.dailyMap
-
-    val adjReturn = activeFilter[Double](dailyMap, _.adjReturn, _ => 0.0)
-
-    HistoricalData(
-      ticker = intermediate.ticker,
-      timeIndex = timeIndex,
-      close = activeFilter[Double](dailyMap, _.close, _.getOrElse(0.0)),
-      adjClose = return2Close(adjReturn),
-      adjReturn = adjReturn,
-      volume = activeFilter[Double](dailyMap, _.adjReturn, _ => 0.0),
-      active = activeFilter[Boolean](dailyMap, _.active, _ => false))
-  }
-
-  def getTimeIndex(): HistoricalData = {
-    // Only extracts market ticker as the main reference of market hours
-    val predicate = (e: Event) =>
-      (e.entityType == params.entityType && e.entityId == marketTicker)
-
-    val tickerMap: Map[String, HistoricalData] = batchView
-      .events
-      .filter(predicate)
-      .aggregateByEntityOrdered(
-        //predicate,
-        YahooDataSource.Intermediate(),
-        mergeTimeIndex)
-      .mapValues(finalizeTimeIndex)
-
-    tickerMap(marketTicker)
-  }
-
-  def getHistoricalDataSet()
-  : Map[String, HistoricalData] = {
-    // Also extract market ticker again, just as a normal stock.
-    val tickerSet = (windowParams.tickerList :+ windowParams.marketTicker).toSet
-    val predicate = (e: Event) =>
-      (e.entityType == params.entityType && tickerSet(e.entityId))
-
-    val defaultTickerMap: Map[String, HistoricalData] =
-      params.windowParams.tickerList.map {
-        ticker => (ticker -> HistoricalData(ticker, timeIndex))
-      }
-      .toMap
-
-    val tickerMap: Map[String, HistoricalData] = batchView
-      .events
-      .filter(predicate)
-      .aggregateByEntityOrdered(
-        //predicate,
-        YahooDataSource.Intermediate(),
-        mergeStock)
-      .mapValues(finalizeStock)
-
-    /*
-    tickerMap.map { case (ticker, data) =>
-      println(ticker)
-      println(data.toDetailedString)
-    }
-    */
-
-    defaultTickerMap ++ tickerMap
-  }
-
-  def getRawData(tickerMap: Map[String, HistoricalData]): RawData = {
-    val allTickers = windowParams.tickerList :+ windowParams.marketTicker
-
-    val price: Array[(String, Array[Double])] = allTickers
-      .map { ticker => (ticker, tickerMap(ticker).adjClose) }
-      .toArray
-
-    val active: Array[(String, Array[Boolean])] = allTickers
-      .map { ticker => (ticker, tickerMap(ticker).active) }
-      .toArray
-
-    new RawData(
-      tickers = windowParams.tickerList.toArray,
-      mktTicker = windowParams.marketTicker,
-      timeIndex = timeIndex,
-      _price = price,
-      _active = active)
-  }
-
-  override
-  def read(sc: SparkContext)
-  : Seq[(RDD[TrainingData], DataParams, RDD[(QueryDate, AnyRef)])] = {
-    val historicalSet = getHistoricalDataSet()
-    //data.foreach { println }
-    val rawData = getRawData(historicalSet)
-    //println(rawData)
-
-    // Broadcast it.
-    val rawDataB = sc.broadcast(rawData)
-
-    val dataParams = DataParams(rawDataB)
-
-    val dsp: DataSourceParams = windowParams
-
-    val dataSet: Seq[(TrainingData, Seq[(QueryDate, AnyRef)])] =
-      Range(dsp.fromIdx, dsp.untilIdx, dsp.maxTestingWindowSize).map { idx => {
-        val trainingData = TrainingData(
-          untilIdx = idx,
-          maxWindowSize = dsp.trainingWindowSize,
-          rawDataB = rawDataB)
-
-        // cannot evaluate the last item as data view only last until untilIdx.
-        val testingUntilIdx = math.min(
-          idx + dsp.maxTestingWindowSize,
-          dsp.untilIdx - 1)
-
-        val queries = (idx until testingUntilIdx)
-          .map { idx => (QueryDate(idx), None) }
-        (trainingData, queries)
-      }}
-
-    dataSet.map { case (trainingData, queries) =>
-      /*
-      (dataParams,
-        sc.parallelize(Array(trainingData)),
-        sc.parallelize(queries))
-      */
-      (
-        sc.parallelize(Array(trainingData)),
-        dataParams,
-        sc.parallelize(queries))
-    }
-  }
-}
-
-object YahooDataSource {
-  case class Params(
-    windowParams: DataSourceParams,
-    // Below filters with DataAPISpecific details
-    appId: Int,  // Ignore appId in DataSourceParams
-    entityType: String,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None
-  ) extends BaseParams
-
-  case class Daily(
-    close: Double,
-    adjClose: Double,
-    adjReturn: Double,
-    volume: Double,
-    active: Boolean,
-    // prevDate is used to verify continuity
-    prevDate: DateTime)
-
-  /** Intermediate storage for constructing historical data
-    * @param timeIndexSet Only datetime in this set is used to create historical
-    * data.
-    */
-  case class Intermediate(
-    ticker: String = "",
-    dailyMap: Map[DateTime, Daily] = Map[DateTime, Daily]()
-    ) {
-    override def toString(): String =
-      s"YDS.Intermediate($ticker, size=${dailyMap.size})"
-  }
-
-
-  /*
-  def main(args: Array[String]) {
-    val params = Params(
-      appId = 1,
-      untilTime = Some(new DateTime(2014, 5, 1, 0, 0)))
-      //untilTime = None)
-
-    val ds = new YahooDataSource(params)
-    //ds.read
-  }
-  */
-}
-
-object PredefinedDSP {
-  val BigSP500 = YahooDataSource.Params(
-    appId = 2,
-    entityType = "yahoo",
-    untilTime = None,
-    windowParams = DataSourceParams(
-      baseDate = new DateTime(2000, 1, 1, 0, 0),
-      fromIdx = 250,
-      untilIdx = 3500,
-      trainingWindowSize = 200,
-      maxTestingWindowSize = 30,
-      marketTicker = "SPY",
-      tickerList = Run.sp500List))
-
-  val SmallSP500 = YahooDataSource.Params(
-    appId = 4,
-    entityType = "yahoo",
-    untilTime = None,
-    windowParams = DataSourceParams(
-      baseDate = new DateTime(2000, 1, 1, 0, 0),
-      fromIdx = 250,
-      untilIdx = 3500,
-      trainingWindowSize = 200,
-      maxTestingWindowSize = 30,
-      marketTicker = "SPY",
-      tickerList = Run.sp500List.take(25)))
-
-  val Test = YahooDataSource.Params(
-    appId = 4,
-    entityType = "yahoo",
-    untilTime = Some(new DateTime(2014, 5, 1, 0, 0)),
-    windowParams = DataSourceParams(
-      baseDate = new DateTime(2014, 1, 1, 0, 0),
-      fromIdx = 20,
-      untilIdx = 50,
-      trainingWindowSize = 15,
-      maxTestingWindowSize = 10,
-      marketTicker = "SPY",
-      tickerList = Seq("AAPL", "MSFT", "IBM", "FB", "AMZN", "IRONMAN")))
-}
-
-object YahooDataSourceRun {
-
-  def main(args: Array[String]) {
-    // Make sure you have a lot of memory.
-    // --driver-memory 12G
-
-    // val dsp = PredefinedDSP.BigSP500
-    val dsp = PredefinedDSP.SmallSP500
-    //val dsp = PredefinedDSP.Test
-
-    val momentumParams = MomentumStrategyParams(20, 3)
-
-    //val x =  Series(Vec(1,2,3))
-    //println(x)
-
-    val metricsParams = BacktestingParams(
-      enterThreshold = 0.01,
-      exitThreshold = 0.0,
-      maxPositions = 10//,
-      //optOutputPath = Some(new File("metrics_results").getCanonicalPath)
-    )
-
-    Workflow.run(
-      dataSourceClassOpt = Some(classOf[YahooDataSource]),
-      dataSourceParams = dsp,
-      preparatorClassOpt = Some(IdentityPreparator(classOf[YahooDataSource])),
-      algorithmClassMapOpt = Some(Map(
-        //"" -> classOf[MomentumStrategy]
-        "" -> classOf[RegressionStrategy]
-      )),
-      //algorithmParamsList = Seq(("", momentumParams)),
-      algorithmParamsList = Seq(("", RegressionStrategyParams(Seq[(String, BaseIndicator)](
-        ("RSI1", new RSIIndicator(rsiPeriod=1)), 
-        ("RSI5", new RSIIndicator(rsiPeriod=5)), 
-        ("RSI22", new RSIIndicator(rsiPeriod=22))), 
-      200))),
-      servingClassOpt = Some(LFirstServing(classOf[EmptyStrategy])),
-      evaluatorClassOpt = Some(classOf[BacktestingEvaluator]),
-      evaluatorParams = metricsParams,
-      params = WorkflowParams(
-        verbose = 0,
-        saveModel = false,
-        batch = "Imagine: Stock III"))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-stock/src/main/twirl/io/prediction/examples/stock/backtesting.scala.html
----------------------------------------------------------------------
diff --git a/examples/experimental/scala-stock/src/main/twirl/io/prediction/examples/stock/backtesting.scala.html b/examples/experimental/scala-stock/src/main/twirl/io/prediction/examples/stock/backtesting.scala.html
deleted file mode 100644
index ad0552f..0000000
--- a/examples/experimental/scala-stock/src/main/twirl/io/prediction/examples/stock/backtesting.scala.html
+++ /dev/null
@@ -1,125 +0,0 @@
-@import org.apache.predictionio.examples.stock.BacktestingResult
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<html>
-  <head>
-    <script type='text/javascript' src='http://www.google.com/jsapi'></script>
-    <script
-      src="http://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script> 
-    <script type='text/javascript'>
-      google.load('visualization', '1', 
-          {'packages':['annotatedtimeline', 'table', 'corechart']});
-    </script>
-  </head>
-  <body>
-    <h3>Backtesting Result</h3>
-    <div id='overall' style='width:1024px;'></div>
-
-    <div id='t_timeline' style='width: 1024px; height: 360px;'></div>
-    <div id='b_timeline' style='width: 1024px; height: 200px;'></div>
-    <!-- Event listener only works when served from http server --> 
-
-    <script>
-      google.setOnLoadCallback(draw);
-      var tChart;
-      var bChart;
-      
-      var rawData; 
-
-      function draw() {
-        var jsonData = $.ajax({
-          url: 'evaluator_results.json',
-          dataType: 'json',
-          async: false,
-        }).responseText; 
-
-        rawData = JSON.parse(jsonData);
-
-        drawOverall();
-        drawTimeline();
-      }
-
-      function drawOverall() {
-        var data = new google.visualization.DataTable();
-        data.addColumn('number', 'ret');
-        data.addColumn('number', 'vol');
-        data.addColumn('number', 'sharpe');
-        data.addColumn('number', 'days');
-
-        var overall = rawData.overall;
-        data.addRow([overall.ret, overall.vol, overall.sharpe, overall.days]);
-
-        var formatter = new google.visualization.NumberFormat(
-          {fractionDigits: 4});
-        formatter.format(data, 0); 
-        formatter.format(data, 1); 
-        formatter.format(data, 2); 
-
-        var overallDiv = document.getElementById('overall');
-        var overallTable = new google.visualization.Table(overallDiv);
-        overallTable.draw(data, {showRowNumber: false});
-      }
-
-      function drawTimeline() {
-        var tData = new google.visualization.DataTable();
-        tData.addColumn('date', 'Date');
-        tData.addColumn('number', 'Nav');
-        tData.addColumn('number', 'Market');
-
-        var dailyStats = rawData.daily;
-        for (i = 0; i < dailyStats.length; i++) {
-          var stat = dailyStats[i];
-          var row = [new Date(stat.time), stat.nav, stat.market];
-          tData.addRow(row);
-        }
-
-        var tTimelineDiv = document.getElementById('t_timeline');
-        tChart = new google.visualization.AnnotatedTimeLine(tTimelineDiv);
-        tChart.draw(tData, {
-          'displayRangeSelector': false,
-          'displayZoomButtons': false,
-          'scaleType': 'allmaximized',
-          'scaleColumns': [0,1],
-        });
-
-        var bData = new google.visualization.DataTable();
-        bData.addColumn('date', 'Date');
-        bData.addColumn('number', 'Position Count');
-
-        var dailyStats = rawData.daily;
-        for (i = 0; i < dailyStats.length; i++) {
-          var stat = dailyStats[i];
-          var row = [new Date(stat.time), stat.positionCount];
-          bData.addRow(row);
-        }
-
-        var bTimelineDiv = document.getElementById('b_timeline');
-        bChart = new google.visualization.AnnotatedTimeLine(bTimelineDiv);
-        bChart.draw(bData);
-
-        google.visualization.events.addListener(
-        bChart, 'rangechange', lineChartRangeChange);
-      }
-
-      function lineChartRangeChange() {
-        var newRange = bChart.getVisibleChartRange();
-        tChart.setVisibleChartRange(newRange.start, newRange.end);
-      } 
-    </script>
-     
-  </body>
-</html>