You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:48 UTC

[17/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala b/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala
new file mode 100644
index 0000000..2d5939f
--- /dev/null
+++ b/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala
@@ -0,0 +1,61 @@
+package org.apache.predictionio.workflow
+
+import org.apache.predictionio.controller._
+
+import org.scalatest.FunSuite
+import org.scalatest.Matchers._
+
+class EvaluationWorkflowSuite extends FunSuite with SharedSparkContext {
+
+  test("Evaluation return best engine params, simple result type: Double") {
+    val engine = new Engine1()
+    val ep0 = EngineParams(dataSourceParams = Engine1.DSP(0.2))
+    val ep1 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
+    val ep2 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
+    val ep3 = EngineParams(dataSourceParams = Engine1.DSP(-0.2))
+    val engineParamsList = Seq(ep0, ep1, ep2, ep3)
+
+    val evaluator = MetricEvaluator(new Metric0())
+  
+    object Eval extends Evaluation {
+      engineEvaluator = (new Engine1(), MetricEvaluator(new Metric0()))
+    }
+
+    val result = EvaluationWorkflow.runEvaluation(
+      sc,
+      Eval,
+      engine,
+      engineParamsList,
+      evaluator,
+      WorkflowParams())
+
+    result.bestScore.score shouldBe 0.3
+    result.bestEngineParams shouldBe ep1
+  }
+
+  test("Evaluation return best engine params, complex result type") {
+    val engine = new Engine1()
+    val ep0 = EngineParams(dataSourceParams = Engine1.DSP(0.2))
+    val ep1 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
+    val ep2 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
+    val ep3 = EngineParams(dataSourceParams = Engine1.DSP(-0.2))
+    val engineParamsList = Seq(ep0, ep1, ep2, ep3)
+
+    val evaluator = MetricEvaluator(new Metric1())
+    
+    object Eval extends Evaluation {
+      engineEvaluator = (new Engine1(), MetricEvaluator(new Metric1()))
+    }
+
+    val result = EvaluationWorkflow.runEvaluation(
+      sc,
+      Eval,
+      engine,
+      engineParamsList,
+      evaluator,
+      WorkflowParams())
+  
+    result.bestScore.score shouldBe Metric1.Result(0, 0.3)
+    result.bestEngineParams shouldBe ep1
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala b/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala
new file mode 100644
index 0000000..217f416
--- /dev/null
+++ b/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala
@@ -0,0 +1,383 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.workflow
+
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.controller.Utils
+import org.json4s.CustomSerializer
+import org.json4s.JsonAST.JField
+import org.json4s.JsonAST.JObject
+import org.json4s.JsonAST.JString
+import org.json4s.MappingException
+import org.json4s.native.JsonMethods.compact
+import org.json4s.native.JsonMethods.render
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+
+class JsonExtractorSuite extends FunSuite with Matchers {
+
+  test("Extract Scala object using option Json4sNative works with optional and default value " +
+    "provided") {
+
+    val json = """{"string": "query string", "optional": "optional string", "default": "d"}"""
+
+    val query = JsonExtractor.extract(
+      JsonExtractorOption.Json4sNative,
+      json,
+      classOf[ScalaQuery])
+
+    query should be (ScalaQuery("query string", Some("optional string"), "d"))
+  }
+
+  test("Extract Scala object using option Json4sNative works with no optional and no default " +
+    "value provided") {
+
+    val json = """{"string": "query string"}"""
+
+    val query = JsonExtractor.extract(
+      JsonExtractorOption.Json4sNative,
+      json,
+      classOf[ScalaQuery])
+
+    query should be (ScalaQuery("query string", None, "default"))
+  }
+
+  test("Extract Scala object using option Json4sNative works with null optional and null default" +
+    " value") {
+
+    val json = """{"string": "query string", "optional": null, "default": null}"""
+
+    val query = JsonExtractor.extract(
+      JsonExtractorOption.Json4sNative,
+      json,
+      classOf[ScalaQuery])
+
+    query should be (ScalaQuery("query string", None, "default"))
+  }
+
+  test("Extract Scala object using option Both works with optional and default value provided") {
+
+    val json = """{"string": "query string", "optional": "optional string", "default": "d"}"""
+
+    val query = JsonExtractor.extract(
+      JsonExtractorOption.Json4sNative,
+      json,
+      classOf[ScalaQuery])
+
+    query should be (ScalaQuery("query string", Some("optional string"), "d"))
+  }
+
+  test("Extract Scala object using option Both works with no optional and no default value " +
+    "provided") {
+
+    val json = """{"string": "query string"}"""
+
+    val query = JsonExtractor.extract(
+      JsonExtractorOption.Json4sNative,
+      json,
+      classOf[ScalaQuery])
+
+    query should be (ScalaQuery("query string", None, "default"))
+  }
+
+  test("Extract Scala object using option Both works with null optional and null default value") {
+
+    val json = """{"string": "query string", "optional": null, "default": null}"""
+
+    val query = JsonExtractor.extract(
+      JsonExtractorOption.Json4sNative,
+      json,
+      classOf[ScalaQuery])
+
+    query should be (ScalaQuery("query string", None, "default"))
+  }
+
+  test("Extract Scala object using option Gson should not get default value and optional none" +
+    " value") {
+
+    val json = """{"string": "query string"}"""
+    val query = JsonExtractor.extract(
+      JsonExtractorOption.Gson,
+      json,
+      classOf[ScalaQuery])
+
+    query should be (ScalaQuery("query string", null, null))
+  }
+
+  test("Extract Scala object using option Gson should throw an exception with optional " +
+    "value provided") {
+
+    val json = """{"string": "query string", "optional": "o", "default": "d"}"""
+    intercept[RuntimeException] {
+      JsonExtractor.extract(
+        JsonExtractorOption.Gson,
+        json,
+        classOf[ScalaQuery])
+    }
+  }
+
+  test("Extract Java object using option Gson works") {
+
+    val json = """{"q": "query string"}"""
+
+    val query = JsonExtractor.extract(
+      JsonExtractorOption.Gson,
+      json,
+      classOf[JavaQuery])
+
+    query should be (new JavaQuery("query string"))
+  }
+
+  test("Extract Java object using option Both works") {
+
+    val json = """{"q": "query string"}"""
+
+    val query = JsonExtractor.extract(
+      JsonExtractorOption.Both,
+      json,
+      classOf[JavaQuery])
+
+    query should be (new JavaQuery("query string"))
+  }
+
+  test("Extract Java object using option Json4sNative should throw an exception") {
+
+    val json = """{"q": "query string"}"""
+
+    intercept[MappingException] {
+      JsonExtractor.extract(
+        JsonExtractorOption.Json4sNative,
+        json,
+        classOf[JavaQuery])
+    }
+  }
+
+  test("Extract Scala object using option Json4sNative with custom deserializer") {
+    val json = """{"string": "query string", "optional": "o", "default": "d"}"""
+
+    val query = JsonExtractor.extract(
+      JsonExtractorOption.Json4sNative,
+      json,
+      classOf[ScalaQuery],
+      Utils.json4sDefaultFormats + new UpperCaseFormat
+    )
+
+    query should be(ScalaQuery("QUERY STRING", Some("O"), "D"))
+  }
+
+  test("Extract Java object usingoption Gson with custom deserializer") {
+    val json = """{"q": "query string"}"""
+
+    val query = JsonExtractor.extract(
+      extractorOption = JsonExtractorOption.Gson,
+      json = json,
+      clazz = classOf[JavaQuery],
+      gsonTypeAdapterFactories = Seq(new JavaQueryTypeAdapterFactory)
+    )
+
+    query should be(new JavaQuery("QUERY STRING"))
+  }
+
+  test("Java object to JValue using option Both works") {
+    val query = new JavaQuery("query string")
+    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Both, query)
+
+    compact(render(jValue)) should be ("""{"q":"query string"}""")
+  }
+
+  test("Java object to JValue using option Gson works") {
+    val query = new JavaQuery("query string")
+    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Gson, query)
+
+    compact(render(jValue)) should be ("""{"q":"query string"}""")
+  }
+
+  test("Java object to JValue using option Json4sNative results in empty Json") {
+    val query = new JavaQuery("query string")
+    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, query)
+
+    compact(render(jValue)) should be ("""{}""")
+  }
+
+  test("Scala object to JValue using option Both works") {
+    val query = new ScalaQuery("query string", Some("option"))
+    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Both, query)
+
+    compact(render(jValue)) should
+      be ("""{"string":"query string","optional":"option","default":"default"}""")
+  }
+
+  test("Scala object to JValue using option Gson does not serialize optional") {
+    val query = new ScalaQuery("query string", Some("option"))
+    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Gson, query)
+
+    compact(render(jValue)) should
+      be ("""{"string":"query string","optional":{},"default":"default"}""")
+  }
+
+  test("Scala object to JValue using option Json4sNative works") {
+    val query = new ScalaQuery("query string", Some("option"))
+    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, query)
+
+    compact(render(jValue)) should
+      be ("""{"string":"query string","optional":"option","default":"default"}""")
+  }
+
+  test("Scala object to JValue using option Json4sNative with custom serializer") {
+    val query = new ScalaQuery("query string", Some("option"))
+    val jValue = JsonExtractor.toJValue(
+      JsonExtractorOption.Json4sNative,
+      query,
+      Utils.json4sDefaultFormats + new UpperCaseFormat
+    )
+
+    compact(render(jValue)) should
+      be ("""{"string":"QUERY STRING","optional":"OPTION","default":"DEFAULT"}""")
+  }
+
+  test("Java object to JValue using option Gson with custom serializer") {
+    val query = new JavaQuery("query string")
+    val jValue = JsonExtractor.toJValue(
+      extractorOption = JsonExtractorOption.Gson,
+      o = query,
+      gsonTypeAdapterFactories = Seq(new JavaQueryTypeAdapterFactory)
+    )
+
+    compact(render(jValue)) should be ("""{"q":"QUERY STRING"}""")
+  }
+
+  test("Java Param to Json using option Both") {
+    val param = ("algo", new JavaParams("parameter"))
+    val json = JsonExtractor.paramToJson(JsonExtractorOption.Both, param)
+
+    json should be ("""{"algo":{"p":"parameter"}}""")
+  }
+
+  test("Java Param to Json using option Gson") {
+    val param = ("algo", new JavaParams("parameter"))
+    val json = JsonExtractor.paramToJson(JsonExtractorOption.Gson, param)
+
+    json should be ("""{"algo":{"p":"parameter"}}""")
+  }
+
+  test("Scala Param to Json using option Both") {
+    val param = ("algo", AlgorithmParams("parameter"))
+    val json = JsonExtractor.paramToJson(JsonExtractorOption.Both, param)
+
+    json should be ("""{"algo":{"a":"parameter"}}""")
+  }
+
+  test("Scala Param to Json using option Json4sNative") {
+    val param = ("algo", AlgorithmParams("parameter"))
+    val json = JsonExtractor.paramToJson(JsonExtractorOption.Json4sNative, param)
+
+    json should be ("""{"algo":{"a":"parameter"}}""")
+  }
+
+  test("Java Params to Json using option Both") {
+    val params = Seq(("algo", new JavaParams("parameter")), ("algo2", new JavaParams("parameter2")))
+    val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params)
+
+    json should be ("""[{"algo":{"p":"parameter"}},{"algo2":{"p":"parameter2"}}]""")
+  }
+
+  test("Java Params to Json using option Gson") {
+    val params = Seq(("algo", new JavaParams("parameter")), ("algo2", new JavaParams("parameter2")))
+    val json = JsonExtractor.paramsToJson(JsonExtractorOption.Gson, params)
+
+    json should be ("""[{"algo":{"p":"parameter"}},{"algo2":{"p":"parameter2"}}]""")
+  }
+
+  test("Scala Params to Json using option Both") {
+    val params =
+      Seq(("algo", AlgorithmParams("parameter")), ("algo2", AlgorithmParams("parameter2")))
+    val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params)
+
+    json should be (org.json4s.native.Serialization.write(params)(Utils.json4sDefaultFormats))
+  }
+
+  test("Scala Params to Json using option Json4sNative") {
+    val params =
+      Seq(("algo", AlgorithmParams("parameter")), ("algo2", AlgorithmParams("parameter2")))
+    val json = JsonExtractor.paramsToJson(JsonExtractorOption.Json4sNative, params)
+
+    json should be (org.json4s.native.Serialization.write(params)(Utils.json4sDefaultFormats))
+  }
+
+  test("Mixed Java and Scala Params to Json using option Both") {
+    val params =
+      Seq(("scala", AlgorithmParams("parameter")), ("java", new JavaParams("parameter2")))
+    val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params)
+
+    json should be ("""[{"scala":{"a":"parameter"}},{"java":{"p":"parameter2"}}]""")
+  }
+
+  test("Serializing Scala EngineParams works using option Json4sNative") {
+    val ep = new EngineParams(
+      dataSourceParams = ("ds", DataSourceParams("dsp")),
+      algorithmParamsList = Seq(("a0", AlgorithmParams("ap"))))
+
+    val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Json4sNative, ep)
+
+    json should be (
+      """{"dataSourceParams":{"ds":{"a":"dsp"}},"preparatorParams":{"":{}},""" +
+        """"algorithmParamsList":[{"a0":{"a":"ap"}}],"servingParams":{"":{}}}""")
+  }
+
+  test("Serializing Java EngineParams works using option Gson") {
+    val ep = new EngineParams(
+      dataSourceParams = ("ds", new JavaParams("dsp")),
+      algorithmParamsList = Seq(("a0", new JavaParams("ap")), ("a1", new JavaParams("ap2"))))
+
+    val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Gson, ep)
+
+    json should be (
+      """{"dataSourceParams":{"ds":{"p":"dsp"}},"preparatorParams":{"":{}},""" +
+        """"algorithmParamsList":[{"a0":{"p":"ap"}},{"a1":{"p":"ap2"}}],"servingParams":{"":{}}}""")
+  }
+
+  test("Serializing Java EngineParams works using option Both") {
+    val ep = new EngineParams(
+      dataSourceParams = ("ds", new JavaParams("dsp")),
+      algorithmParamsList = Seq(("a0", new JavaParams("ap")), ("a1", new JavaParams("ap2"))))
+
+    val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Both, ep)
+
+    json should be (
+      """{"dataSourceParams":{"ds":{"p":"dsp"}},"preparatorParams":{"":{}},""" +
+        """"algorithmParamsList":[{"a0":{"p":"ap"}},{"a1":{"p":"ap2"}}],"servingParams":{"":{}}}""")
+  }
+}
+
+private case class AlgorithmParams(a: String) extends Params
+
+private case class DataSourceParams(a: String) extends Params
+
+private case class ScalaQuery(string: String, optional: Option[String], default: String = "default")
+
+private class UpperCaseFormat extends CustomSerializer[ScalaQuery](format => ( {
+  case JObject(JField("string", JString(string)) ::
+    JField("optional", JString(optional)) ::
+    JField("default", JString(default)) ::
+    Nil) => ScalaQuery(string.toUpperCase, Some(optional.toUpperCase), default.toUpperCase)
+}, {
+  case x: ScalaQuery =>
+    JObject(
+      JField("string", JString(x.string.toUpperCase)),
+      JField("optional", JString(x.optional.get.toUpperCase)),
+      JField("default", JString(x.default.toUpperCase)))
+}))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/Utils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/Utils.scala b/data/src/main/scala/io/prediction/data/Utils.scala
deleted file mode 100644
index 78b71cc..0000000
--- a/data/src/main/scala/io/prediction/data/Utils.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data
-
-import org.joda.time.DateTime
-import org.joda.time.format.ISODateTimeFormat
-
-import java.lang.IllegalArgumentException
-
-private[prediction] object Utils {
-
-  // use dateTime() for strict ISO8601 format
-  val dateTimeFormatter = ISODateTimeFormat.dateTime().withOffsetParsed()
-
-  val dateTimeNoMillisFormatter =
-    ISODateTimeFormat.dateTimeNoMillis().withOffsetParsed()
-
-  def stringToDateTime(dt: String): DateTime = {
-    // We accept two formats.
-    // 1. "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"
-    // 2. "yyyy-MM-dd'T'HH:mm:ssZZ"
-    // The first one also takes milliseconds into account.
-    try {
-      // formatting for "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"
-      dateTimeFormatter.parseDateTime(dt)
-    } catch {
-      case e: IllegalArgumentException => {
-        // handle when the datetime string doesn't specify milliseconds.
-        dateTimeNoMillisFormatter.parseDateTime(dt)
-      }
-    }
-  }
-
-  def dateTimeToString(dt: DateTime): String = dateTimeFormatter.print(dt)
-    // dt.toString
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/Common.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/Common.scala b/data/src/main/scala/io/prediction/data/api/Common.scala
deleted file mode 100644
index 6681a1d..0000000
--- a/data/src/main/scala/io/prediction/data/api/Common.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.api
-
-import io.prediction.data.webhooks.ConnectorException
-import io.prediction.data.storage.StorageException
-
-import spray.routing._
-import spray.routing.Directives._
-import spray.routing.Rejection
-import spray.http.StatusCodes
-import spray.http.StatusCode
-import spray.httpx.Json4sSupport
-
-import org.json4s.Formats
-import org.json4s.DefaultFormats
-
-object Common {
-
-  object Json4sProtocol extends Json4sSupport {
-    implicit def json4sFormats: Formats = DefaultFormats
-  }
-
-  import Json4sProtocol._
-
-  val rejectionHandler = RejectionHandler {
-    case MalformedRequestContentRejection(msg, _) :: _ =>
-      complete(StatusCodes.BadRequest, Map("message" -> msg))
-    case MissingQueryParamRejection(msg) :: _ =>
-      complete(StatusCodes.NotFound,
-        Map("message" -> s"missing required query parameter ${msg}."))
-    case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => {
-      val msg = cause match {
-        case AuthenticationFailedRejection.CredentialsRejected =>
-          "Invalid accessKey."
-        case AuthenticationFailedRejection.CredentialsMissing =>
-          "Missing accessKey."
-      }
-      complete(StatusCodes.Unauthorized, challengeHeaders, Map("message" -> msg))
-    }
-    case ChannelRejection(msg) :: _ =>
-      complete(StatusCodes.Unauthorized, Map("message" -> msg))
-    case NonExistentAppRejection(msg) :: _ =>
-      complete(StatusCodes.Unauthorized, Map("message" -> msg))
-  }
-
-  val exceptionHandler = ExceptionHandler {
-    case e: ConnectorException => {
-      val msg = s"${e.getMessage()}"
-      complete(StatusCodes.BadRequest, Map("message" -> msg))
-    }
-    case e: StorageException => {
-      val msg = s"${e.getMessage()}"
-      complete(StatusCodes.InternalServerError, Map("message" -> msg))
-    }
-    case e: Exception => {
-      val msg = s"${e.getMessage()}"
-      complete(StatusCodes.InternalServerError, Map("message" -> msg))
-    }
-  }
-}
-
-/** invalid channel */
-case class ChannelRejection(msg: String) extends Rejection
-
-/** the app doesn't exist */
-case class NonExistentAppRejection(msg: String) extends Rejection

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventInfo.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/EventInfo.scala b/data/src/main/scala/io/prediction/data/api/EventInfo.scala
deleted file mode 100644
index 1e324c2..0000000
--- a/data/src/main/scala/io/prediction/data/api/EventInfo.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.api
-
-import io.prediction.data.storage.Event
-
-case class EventInfo(
-  appId: Int,
-  channelId: Option[Int],
-  event: Event)
-

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventServer.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/EventServer.scala b/data/src/main/scala/io/prediction/data/api/EventServer.scala
deleted file mode 100644
index 139f964..0000000
--- a/data/src/main/scala/io/prediction/data/api/EventServer.scala
+++ /dev/null
@@ -1,640 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.api
-
-import akka.event.Logging
-import sun.misc.BASE64Decoder
-
-import java.util.concurrent.TimeUnit
-
-import akka.actor._
-import akka.io.IO
-import akka.pattern.ask
-import akka.util.Timeout
-import io.prediction.data.Utils
-import io.prediction.data.storage.AccessKeys
-import io.prediction.data.storage.Channels
-import io.prediction.data.storage.DateTimeJson4sSupport
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventJson4sSupport
-import io.prediction.data.storage.BatchEventsJson4sSupport
-import io.prediction.data.storage.LEvents
-import io.prediction.data.storage.Storage
-import org.json4s.DefaultFormats
-import org.json4s.Formats
-import org.json4s.JObject
-import org.json4s.native.JsonMethods.parse
-import spray.can.Http
-import spray.http.FormData
-import spray.http.MediaTypes
-import spray.http.StatusCodes
-import spray.httpx.Json4sSupport
-import spray.routing._
-import spray.routing.authentication.Authentication
-
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.util.{Try, Success, Failure}
-
-class  EventServiceActor(
-    val eventClient: LEvents,
-    val accessKeysClient: AccessKeys,
-    val channelsClient: Channels,
-    val config: EventServerConfig) extends HttpServiceActor {
-
-  object Json4sProtocol extends Json4sSupport {
-    implicit def json4sFormats: Formats = DefaultFormats +
-      new EventJson4sSupport.APISerializer +
-      new BatchEventsJson4sSupport.APISerializer +
-      // NOTE: don't use Json4s JodaTimeSerializers since it has issues,
-      // some format not converted, or timezone not correct
-      new DateTimeJson4sSupport.Serializer
-  }
-
-
-  val MaxNumberOfEventsPerBatchRequest = 50
-
-  val logger = Logging(context.system, this)
-
-  // we use the enclosing ActorContext's or ActorSystem's dispatcher for our
-  // Futures
-  implicit def executionContext: ExecutionContext = context.dispatcher
-
-  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-
-  val rejectionHandler = Common.rejectionHandler
-
-  val jsonPath = """(.+)\.json$""".r
-  val formPath = """(.+)\.form$""".r
-
-  val pluginContext = EventServerPluginContext(logger)
-
-  private lazy val base64Decoder = new BASE64Decoder
-
-  case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String])
-
-  /* with accessKey in query/header, return appId if succeed */
-  def withAccessKey: RequestContext => Future[Authentication[AuthData]] = {
-    ctx: RequestContext =>
-      val accessKeyParamOpt = ctx.request.uri.query.get("accessKey")
-      val channelParamOpt = ctx.request.uri.query.get("channel")
-      Future {
-        // with accessKey in query, return appId if succeed
-        accessKeyParamOpt.map { accessKeyParam =>
-          accessKeysClient.get(accessKeyParam).map { k =>
-            channelParamOpt.map { ch =>
-              val channelMap =
-                channelsClient.getByAppid(k.appid)
-                .map(c => (c.name, c.id)).toMap
-              if (channelMap.contains(ch)) {
-                Right(AuthData(k.appid, Some(channelMap(ch)), k.events))
-              } else {
-                Left(ChannelRejection(s"Invalid channel '$ch'."))
-              }
-            }.getOrElse{
-              Right(AuthData(k.appid, None, k.events))
-            }
-          }.getOrElse(FailedAuth)
-        }.getOrElse {
-          // with accessKey in header, return appId if succeed
-          ctx.request.headers.find(_.name == "Authorization").map { authHeader \u21d2
-            authHeader.value.split("Basic ") match {
-              case Array(_, value) \u21d2
-                val appAccessKey =
-                  new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0)
-                accessKeysClient.get(appAccessKey) match {
-                  case Some(k) \u21d2 Right(AuthData(k.appid, None, k.events))
-                  case None \u21d2 FailedAuth
-                }
-
-              case _ \u21d2 FailedAuth
-            }
-          }.getOrElse(MissedAuth)
-        }
-      }
-  }
-
-  private val FailedAuth = Left(
-    AuthenticationFailedRejection(
-      AuthenticationFailedRejection.CredentialsRejected, List()
-    )
-  )
-
-  private val MissedAuth = Left(
-    AuthenticationFailedRejection(
-      AuthenticationFailedRejection.CredentialsMissing, List()
-    )
-  )
-
-  lazy val statsActorRef = actorRefFactory.actorSelection("/user/StatsActor")
-  lazy val pluginsActorRef = actorRefFactory.actorSelection("/user/PluginsActor")
-
-  val route: Route =
-    pathSingleSlash {
-      import Json4sProtocol._
-
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete(Map("status" -> "alive"))
-        }
-      }
-    } ~
-    path("plugins.json") {
-      import Json4sProtocol._
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete {
-            Map("plugins" -> Map(
-              "inputblockers" -> pluginContext.inputBlockers.map { case (n, p) =>
-                n -> Map(
-                  "name" -> p.pluginName,
-                  "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName)
-              },
-              "inputsniffers" -> pluginContext.inputSniffers.map { case (n, p) =>
-                n -> Map(
-                  "name" -> p.pluginName,
-                  "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName)
-              }
-            ))
-          }
-        }
-      }
-    } ~
-    path("plugins" / Segments) { segments =>
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          authenticate(withAccessKey) { authData =>
-            respondWithMediaType(MediaTypes.`application/json`) {
-              complete {
-                val pluginArgs = segments.drop(2)
-                val pluginType = segments(0)
-                val pluginName = segments(1)
-                pluginType match {
-                  case EventServerPlugin.inputBlocker =>
-                    pluginContext.inputBlockers(pluginName).handleREST(
-                      authData.appId,
-                      authData.channelId,
-                      pluginArgs)
-                  case EventServerPlugin.inputSniffer =>
-                    pluginsActorRef ? PluginsActor.HandleREST(
-                      appId = authData.appId,
-                      channelId = authData.channelId,
-                      pluginName = pluginName,
-                      pluginArgs = pluginArgs) map {
-                      _.asInstanceOf[String]
-                    }
-                }
-              }
-            }
-          }
-        }
-      }
-    } ~
-    path("events" / jsonPath ) { eventId =>
-
-      import Json4sProtocol._
-
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  logger.debug(s"GET event ${eventId}.")
-                  val data = eventClient.futureGet(eventId, appId, channelId).map { eventOpt =>
-                    eventOpt.map( event =>
-                      (StatusCodes.OK, event)
-                    ).getOrElse(
-                      (StatusCodes.NotFound, Map("message" -> "Not Found"))
-                    )
-                  }
-                  data
-                }
-              }
-            }
-          }
-        }
-      } ~
-      delete {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  logger.debug(s"DELETE event ${eventId}.")
-                  val data = eventClient.futureDelete(eventId, appId, channelId).map { found =>
-                    if (found) {
-                      (StatusCodes.OK, Map("message" -> "Found"))
-                    } else {
-                      (StatusCodes.NotFound, Map("message" -> "Not Found"))
-                    }
-                  }
-                  data
-                }
-              }
-            }
-          }
-        }
-      }
-    } ~
-    path("events.json") {
-
-      import Json4sProtocol._
-
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              val events = authData.events
-              entity(as[Event]) { event =>
-                complete {
-                  if (events.isEmpty || authData.events.contains(event.event)) {
-                    pluginContext.inputBlockers.values.foreach(
-                      _.process(EventInfo(
-                        appId = appId,
-                        channelId = channelId,
-                        event = event), pluginContext))
-                    val data = eventClient.futureInsert(event, appId, channelId).map { id =>
-                      pluginsActorRef ! EventInfo(
-                        appId = appId,
-                        channelId = channelId,
-                        event = event)
-                      val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
-                      if (config.stats) {
-                        statsActorRef ! Bookkeeping(appId, result._1, event)
-                      }
-                      result
-                    }
-                    data
-                  } else {
-                    (StatusCodes.Forbidden,
-                      Map("message" -> s"${event.event} events are not allowed"))
-                  }
-                }
-              }
-            }
-          }
-        }
-      } ~
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              parameters(
-                'startTime.as[Option[String]],
-                'untilTime.as[Option[String]],
-                'entityType.as[Option[String]],
-                'entityId.as[Option[String]],
-                'event.as[Option[String]],
-                'targetEntityType.as[Option[String]],
-                'targetEntityId.as[Option[String]],
-                'limit.as[Option[Int]],
-                'reversed.as[Option[Boolean]]) {
-                (startTimeStr, untilTimeStr, entityType, entityId,
-                  eventName,  // only support one event name
-                  targetEntityType, targetEntityId,
-                  limit, reversed) =>
-                respondWithMediaType(MediaTypes.`application/json`) {
-                  complete {
-                    logger.debug(
-                      s"GET events of appId=${appId} " +
-                      s"st=${startTimeStr} ut=${untilTimeStr} " +
-                      s"et=${entityType} eid=${entityId} " +
-                      s"li=${limit} rev=${reversed} ")
-
-                    require(!((reversed == Some(true))
-                      && (entityType.isEmpty || entityId.isEmpty)),
-                      "the parameter reversed can only be used with" +
-                      " both entityType and entityId specified.")
-
-                    val parseTime = Future {
-                      val startTime = startTimeStr.map(Utils.stringToDateTime(_))
-                      val untilTime = untilTimeStr.map(Utils.stringToDateTime(_))
-                      (startTime, untilTime)
-                    }
-
-
-                    parseTime.flatMap { case (startTime, untilTime) =>
-                      val data = eventClient.futureFind(
-                        appId = appId,
-                        channelId = channelId,
-                        startTime = startTime,
-                        untilTime = untilTime,
-                        entityType = entityType,
-                        entityId = entityId,
-                        eventNames = eventName.map(List(_)),
-                        targetEntityType = targetEntityType.map(Some(_)),
-                        targetEntityId = targetEntityId.map(Some(_)),
-                        limit = limit.orElse(Some(20)),
-                        reversed = reversed)
-                        .map { eventIter =>
-                          if (eventIter.hasNext) {
-                            (StatusCodes.OK, eventIter.toArray)
-                          } else {
-                            (StatusCodes.NotFound,
-                              Map("message" -> "Not Found"))
-                          }
-                        }
-                      data
-                    }.recover {
-                      case e: Exception =>
-                        (StatusCodes.BadRequest, Map("message" -> s"${e}"))
-                    }
-                  }
-                }
-              }
-            }
-          }
-        }
-      }
-    } ~
-    path("batch" / "events.json") {
-
-      import Json4sProtocol._
-
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              val allowedEvents = authData.events
-              val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = {
-                case Success(event) => {
-                  if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) {
-                    pluginContext.inputBlockers.values.foreach(
-                      _.process(EventInfo(
-                        appId = appId,
-                        channelId = channelId,
-                        event = event), pluginContext))
-                    val data = eventClient.futureInsert(event, appId, channelId).map { id =>
-                      pluginsActorRef ! EventInfo(
-                        appId = appId,
-                        channelId = channelId,
-                        event = event)
-                      val status = StatusCodes.Created
-                      val result = Map(
-                        "status" -> status.intValue,
-                        "eventId" -> s"${id}")
-                      if (config.stats) {
-                        statsActorRef ! Bookkeeping(appId, status, event)
-                      }
-                      result
-                    }.recover { case exception =>
-                      Map(
-                        "status" -> StatusCodes.InternalServerError.intValue,
-                        "message" -> s"${exception.getMessage()}")
-                    }
-                    data
-                  } else {
-                    Future.successful(Map(
-                      "status" -> StatusCodes.Forbidden.intValue,
-                      "message" -> s"${event.event} events are not allowed"))
-                  }
-                }
-                case Failure(exception) => {
-                  Future.successful(Map(
-                    "status" -> StatusCodes.BadRequest.intValue,
-                    "message" -> s"${exception.getMessage()}"))
-                }
-              }
-
-              entity(as[Seq[Try[Event]]]) { events =>
-                complete {
-                  if (events.length <= MaxNumberOfEventsPerBatchRequest) {
-                    Future.traverse(events)(handleEvent)
-                  } else {
-                    (StatusCodes.BadRequest,
-                      Map("message" -> (s"Batch request must have less than or equal to " +
-                        s"${MaxNumberOfEventsPerBatchRequest} events")))
-                  }
-                }
-              }
-            }
-          }
-        }
-      }
-    } ~
-    path("stats.json") {
-
-      import Json4sProtocol._
-
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                if (config.stats) {
-                  complete {
-                    statsActorRef ? GetStats(appId) map {
-                      _.asInstanceOf[Map[String, StatsSnapshot]]
-                    }
-                  }
-                } else {
-                  complete(
-                    StatusCodes.NotFound,
-                    parse("""{"message": "To see stats, launch Event Server """ +
-                      """with --stats argument."}"""))
-                }
-              }
-            }
-          }
-        }
-      }  // stats.json get
-    } ~
-    path("webhooks" / jsonPath ) { web =>
-      import Json4sProtocol._
-
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                entity(as[JObject]) { jObj =>
-                  complete {
-                    Webhooks.postJson(
-                      appId = appId,
-                      channelId = channelId,
-                      web = web,
-                      data = jObj,
-                      eventClient = eventClient,
-                      log = logger,
-                      stats = config.stats,
-                      statsActorRef = statsActorRef)
-                  }
-                }
-              }
-            }
-          }
-        }
-      } ~
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  Webhooks.getJson(
-                    appId = appId,
-                    channelId = channelId,
-                    web = web,
-                    log = logger)
-                }
-              }
-            }
-          }
-        }
-      }
-    } ~
-    path("webhooks" / formPath ) { web =>
-      post {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                entity(as[FormData]){ formData =>
-                  // logger.debug(formData.toString)
-                  complete {
-                    // respond with JSON
-                    import Json4sProtocol._
-
-                    Webhooks.postForm(
-                      appId = appId,
-                      channelId = channelId,
-                      web = web,
-                      data = formData,
-                      eventClient = eventClient,
-                      log = logger,
-                      stats = config.stats,
-                      statsActorRef = statsActorRef)
-                  }
-                }
-              }
-            }
-          }
-        }
-      } ~
-      get {
-        handleExceptions(Common.exceptionHandler) {
-          handleRejections(rejectionHandler) {
-            authenticate(withAccessKey) { authData =>
-              val appId = authData.appId
-              val channelId = authData.channelId
-              respondWithMediaType(MediaTypes.`application/json`) {
-                complete {
-                  // respond with JSON
-                  import Json4sProtocol._
-
-                  Webhooks.getForm(
-                    appId = appId,
-                    channelId = channelId,
-                    web = web,
-                    log = logger)
-                }
-              }
-            }
-          }
-        }
-      }
-
-    }
-
-  def receive: Actor.Receive = runRoute(route)
-}
-
-
-
-/* message */
-case class StartServer(host: String, port: Int)
-
-class EventServerActor(
-    val eventClient: LEvents,
-    val accessKeysClient: AccessKeys,
-    val channelsClient: Channels,
-    val config: EventServerConfig) extends Actor with ActorLogging {
-  val child = context.actorOf(
-    Props(classOf[EventServiceActor],
-      eventClient,
-      accessKeysClient,
-      channelsClient,
-      config),
-    "EventServiceActor")
-  implicit val system = context.system
-
-  def receive: Actor.Receive = {
-    case StartServer(host, portNum) => {
-      IO(Http) ! Http.Bind(child, interface = host, port = portNum)
-    }
-    case m: Http.Bound => log.info("Bound received. EventServer is ready.")
-    case m: Http.CommandFailed => log.error("Command failed.")
-    case _ => log.error("Unknown message.")
-  }
-}
-
-case class EventServerConfig(
-  ip: String = "localhost",
-  port: Int = 7070,
-  plugins: String = "plugins",
-  stats: Boolean = false)
-
-object EventServer {
-  def createEventServer(config: EventServerConfig): Unit = {
-    implicit val system = ActorSystem("EventServerSystem")
-
-    val eventClient = Storage.getLEvents()
-    val accessKeysClient = Storage.getMetaDataAccessKeys()
-    val channelsClient = Storage.getMetaDataChannels()
-
-    val serverActor = system.actorOf(
-      Props(
-        classOf[EventServerActor],
-        eventClient,
-        accessKeysClient,
-        channelsClient,
-        config),
-      "EventServerActor"
-    )
-    if (config.stats) system.actorOf(Props[StatsActor], "StatsActor")
-    system.actorOf(Props[PluginsActor], "PluginsActor")
-    serverActor ! StartServer(config.ip, config.port)
-    system.awaitTermination()
-  }
-}
-
-object Run {
-  def main(args: Array[String]) {
-    EventServer.createEventServer(EventServerConfig(
-      ip = "0.0.0.0",
-      port = 7070))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala b/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala
deleted file mode 100644
index a87fc84..0000000
--- a/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.api
-
-trait EventServerPlugin {
-  val pluginName: String
-  val pluginDescription: String
-  val pluginType: String
-
-  def start(context: EventServerPluginContext): Unit
-
-  def process(eventInfo: EventInfo, context: EventServerPluginContext)
-
-  def handleREST(appId: Int, channelId: Option[Int], arguments: Seq[String]): String
-}
-
-object EventServerPlugin {
-  val inputBlocker = "inputblocker"
-  val inputSniffer = "inputsniffer"
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala b/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala
deleted file mode 100644
index 1d8d36e..0000000
--- a/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.api
-
-import java.util.ServiceLoader
-
-import akka.event.LoggingAdapter
-import grizzled.slf4j.Logging
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-
-class EventServerPluginContext(
-    val plugins: mutable.Map[String, mutable.Map[String, EventServerPlugin]],
-    val log: LoggingAdapter) {
-  def inputBlockers: Map[String, EventServerPlugin] =
-    plugins.getOrElse(EventServerPlugin.inputBlocker, Map()).toMap
-
-  def inputSniffers: Map[String, EventServerPlugin] =
-    plugins.getOrElse(EventServerPlugin.inputSniffer, Map()).toMap
-}
-
-object EventServerPluginContext extends Logging {
-  def apply(log: LoggingAdapter): EventServerPluginContext = {
-    val plugins = mutable.Map[String, mutable.Map[String, EventServerPlugin]](
-      EventServerPlugin.inputBlocker -> mutable.Map(),
-      EventServerPlugin.inputSniffer -> mutable.Map())
-    val serviceLoader = ServiceLoader.load(classOf[EventServerPlugin])
-    serviceLoader foreach { service =>
-      plugins(service.pluginType) += service.pluginName -> service
-    }
-    new EventServerPluginContext(
-      plugins,
-      log)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/PluginsActor.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/PluginsActor.scala b/data/src/main/scala/io/prediction/data/api/PluginsActor.scala
deleted file mode 100644
index 7883adf..0000000
--- a/data/src/main/scala/io/prediction/data/api/PluginsActor.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.api
-
-import akka.actor.Actor
-import akka.event.Logging
-
-class PluginsActor() extends Actor {
-  implicit val system = context.system
-  val log = Logging(system, this)
-
-  val pluginContext = EventServerPluginContext(log)
-
-  def receive: PartialFunction[Any, Unit] = {
-    case e: EventInfo =>
-      pluginContext.inputSniffers.values.foreach(_.process(e, pluginContext))
-    case h: PluginsActor.HandleREST =>
-      try {
-        sender() ! pluginContext.inputSniffers(h.pluginName).handleREST(
-          h.appId,
-          h.channelId,
-          h.pluginArgs)
-      } catch {
-        case e: Exception =>
-          sender() ! s"""{"message":"${e.getMessage}"}"""
-      }
-    case _ =>
-      log.error("Unknown message sent to Event Server input sniffer plugin host.")
-  }
-}
-
-object PluginsActor {
-  case class HandleREST(
-    pluginName: String,
-    appId: Int,
-    channelId: Option[Int],
-    pluginArgs: Seq[String])
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/Stats.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/Stats.scala b/data/src/main/scala/io/prediction/data/api/Stats.scala
deleted file mode 100644
index ca5f05e..0000000
--- a/data/src/main/scala/io/prediction/data/api/Stats.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.api
-
-import io.prediction.data.storage.Event
-
-import spray.http.StatusCode
-
-import scala.collection.mutable.{ HashMap => MHashMap }
-import scala.collection.mutable
-
-import com.github.nscala_time.time.Imports.DateTime
-
-case class EntityTypesEvent(
-  val entityType: String,
-  val targetEntityType: Option[String],
-  val event: String) {
-
-  def this(e: Event) = this(
-    e.entityType,
-    e.targetEntityType,
-    e.event)
-}
-
-case class KV[K, V](key: K, value: V)
-
-case class StatsSnapshot(
-  val startTime: DateTime,
-  val endTime: Option[DateTime],
-  val basic: Seq[KV[EntityTypesEvent, Long]],
-  val statusCode: Seq[KV[StatusCode, Long]]
-)
-
-
-class Stats(val startTime: DateTime) {
-  private[this] var _endTime: Option[DateTime] = None
-  var statusCodeCount = MHashMap[(Int, StatusCode), Long]().withDefaultValue(0L)
-  var eteCount = MHashMap[(Int, EntityTypesEvent), Long]().withDefaultValue(0L)
-
-  def cutoff(endTime: DateTime) {
-    _endTime = Some(endTime)
-  }
-
-  def update(appId: Int, statusCode: StatusCode, event: Event) {
-    statusCodeCount((appId, statusCode)) += 1
-    eteCount((appId, new EntityTypesEvent(event))) += 1
-  }
-
-  def extractByAppId[K, V](appId: Int, m: mutable.Map[(Int, K), V])
-  : Seq[KV[K, V]] = {
-    m
-    .toSeq
-    .flatMap { case (k, v) =>
-      if (k._1 == appId) { Seq(KV(k._2, v)) } else { Seq() }
-    }
-  }
-
-  def get(appId: Int): StatsSnapshot = {
-    StatsSnapshot(
-      startTime,
-      _endTime,
-      extractByAppId(appId, eteCount),
-      extractByAppId(appId, statusCodeCount)
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/StatsActor.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/StatsActor.scala b/data/src/main/scala/io/prediction/data/api/StatsActor.scala
deleted file mode 100644
index 857352f..0000000
--- a/data/src/main/scala/io/prediction/data/api/StatsActor.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.api
-
-import io.prediction.data.storage.Event
-
-import spray.http.StatusCode
-
-import akka.actor.Actor
-import akka.event.Logging
-
-import com.github.nscala_time.time.Imports.DateTime
-
-/* message to StatsActor */
-case class Bookkeeping(val appId: Int, statusCode: StatusCode, event: Event)
-
-/* message to StatsActor */
-case class GetStats(val appId: Int)
-
-class StatsActor extends Actor {
-  implicit val system = context.system
-  val log = Logging(system, this)
-
-  def getCurrent: DateTime = {
-    DateTime.now.
-      withMinuteOfHour(0).
-      withSecondOfMinute(0).
-      withMillisOfSecond(0)
-  }
-
-  var longLiveStats = new Stats(DateTime.now)
-  var hourlyStats = new Stats(getCurrent)
-
-  var prevHourlyStats = new Stats(getCurrent.minusHours(1))
-  prevHourlyStats.cutoff(hourlyStats.startTime)
-
-  def bookkeeping(appId: Int, statusCode: StatusCode, event: Event) {
-    val current = getCurrent
-    // If the current hour is different from the stats start time, we create
-    // another stats instance, and move the current to prev.
-    if (current != hourlyStats.startTime) {
-      prevHourlyStats = hourlyStats
-      prevHourlyStats.cutoff(current)
-      hourlyStats = new Stats(current)
-    }
-
-    hourlyStats.update(appId, statusCode, event)
-    longLiveStats.update(appId, statusCode, event)
-  }
-
-  def receive: Actor.Receive = {
-    case Bookkeeping(appId, statusCode, event) =>
-      bookkeeping(appId, statusCode, event)
-    case GetStats(appId) => sender() ! Map(
-      "time" -> DateTime.now,
-      "currentHour" -> hourlyStats.get(appId),
-      "prevHour" -> prevHourlyStats.get(appId),
-      "longLive" -> longLiveStats.get(appId))
-    case _ => log.error("Unknown message.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/Webhooks.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/Webhooks.scala b/data/src/main/scala/io/prediction/data/api/Webhooks.scala
deleted file mode 100644
index ff18888..0000000
--- a/data/src/main/scala/io/prediction/data/api/Webhooks.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.api
-
-import io.prediction.data.webhooks.JsonConnector
-import io.prediction.data.webhooks.FormConnector
-import io.prediction.data.webhooks.ConnectorUtil
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventJson4sSupport
-import io.prediction.data.storage.LEvents
-
-import spray.routing._
-import spray.routing.Directives._
-import spray.http.StatusCodes
-import spray.http.StatusCode
-import spray.http.FormData
-import spray.httpx.Json4sSupport
-
-import org.json4s.Formats
-import org.json4s.DefaultFormats
-import org.json4s.JObject
-
-import akka.event.LoggingAdapter
-import akka.actor.ActorSelection
-
-import scala.concurrent.{ExecutionContext, Future}
-
-
-private[prediction] object Webhooks {
-
-  def postJson(
-    appId: Int,
-    channelId: Option[Int],
-    web: String,
-    data: JObject,
-    eventClient: LEvents,
-    log: LoggingAdapter,
-    stats: Boolean,
-    statsActorRef: ActorSelection
-  )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
-
-    val eventFuture = Future {
-      WebhooksConnectors.json.get(web).map { connector =>
-        ConnectorUtil.toEvent(connector, data)
-      }
-    }
-
-    eventFuture.flatMap { eventOpt =>
-      if (eventOpt.isEmpty) {
-        Future successful {
-          val message = s"webhooks connection for ${web} is not supported."
-          (StatusCodes.NotFound, Map("message" -> message))
-        }
-      } else {
-        val event = eventOpt.get
-        val data = eventClient.futureInsert(event, appId, channelId).map { id =>
-          val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
-
-          if (stats) {
-            statsActorRef ! Bookkeeping(appId, result._1, event)
-          }
-          result
-        }
-        data
-      }
-    }
-  }
-
-  def getJson(
-    appId: Int,
-    channelId: Option[Int],
-    web: String,
-    log: LoggingAdapter
-  )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
-    Future {
-      WebhooksConnectors.json.get(web).map { connector =>
-        (StatusCodes.OK, Map("message" -> "Ok"))
-      }.getOrElse {
-        val message = s"webhooks connection for ${web} is not supported."
-        (StatusCodes.NotFound, Map("message" -> message))
-      }
-    }
-  }
-
-  def postForm(
-    appId: Int,
-    channelId: Option[Int],
-    web: String,
-    data: FormData,
-    eventClient: LEvents,
-    log: LoggingAdapter,
-    stats: Boolean,
-    statsActorRef: ActorSelection
-  )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
-    val eventFuture = Future {
-      WebhooksConnectors.form.get(web).map { connector =>
-        ConnectorUtil.toEvent(connector, data.fields.toMap)
-      }
-    }
-
-    eventFuture.flatMap { eventOpt =>
-      if (eventOpt.isEmpty) {
-        Future {
-          val message = s"webhooks connection for ${web} is not supported."
-          (StatusCodes.NotFound, Map("message" -> message))
-        }
-      } else {
-        val event = eventOpt.get
-        val data = eventClient.futureInsert(event, appId, channelId).map { id =>
-          val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
-
-          if (stats) {
-            statsActorRef ! Bookkeeping(appId, result._1, event)
-          }
-          result
-        }
-        data
-      }
-    }
-  }
-
-  def getForm(
-    appId: Int,
-    channelId: Option[Int],
-    web: String,
-    log: LoggingAdapter
-  )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
-    Future {
-      WebhooksConnectors.form.get(web).map { connector =>
-        (StatusCodes.OK, Map("message" -> "Ok"))
-      }.getOrElse {
-        val message = s"webhooks connection for ${web} is not supported."
-        (StatusCodes.NotFound, Map("message" -> message))
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala b/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala
deleted file mode 100644
index 97c9775..0000000
--- a/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.api
-
-import io.prediction.data.webhooks.JsonConnector
-import io.prediction.data.webhooks.FormConnector
-
-import io.prediction.data.webhooks.segmentio.SegmentIOConnector
-import io.prediction.data.webhooks.mailchimp.MailChimpConnector
-
-private[prediction] object WebhooksConnectors {
-
-  val json: Map[String, JsonConnector] = Map(
-    "segmentio" -> SegmentIOConnector
-  )
-
-  val form: Map[String, FormConnector] = Map(
-    "mailchimp" -> MailChimpConnector
-  )
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/package.scala b/data/src/main/scala/io/prediction/data/package.scala
deleted file mode 100644
index afbe573..0000000
--- a/data/src/main/scala/io/prediction/data/package.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction
-
-/** Provides data access for PredictionIO and any engines running on top of
-  * PredictionIO
-  */
-package object data {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala b/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala
deleted file mode 100644
index f197e78..0000000
--- a/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import java.security.SecureRandom
-
-import io.prediction.annotation.DeveloperApi
-import org.apache.commons.codec.binary.Base64
-
-/** :: DeveloperApi ::
-  * Stores mapping of access keys, app IDs, and lists of allowed event names
-  *
-  * @param key Access key
-  * @param appid App ID
-  * @param events List of allowed events for this particular app key
-  * @group Meta Data
-  */
-@DeveloperApi
-case class AccessKey(
-  key: String,
-  appid: Int,
-  events: Seq[String])
-
-/** :: DeveloperApi ::
-  * Base trait of the [[AccessKey]] data access object
-  *
-  * @group Meta Data
-  */
-@DeveloperApi
-trait AccessKeys {
-  /** Insert a new [[AccessKey]]. If the key field is empty, a key will be
-    * generated.
-    */
-  def insert(k: AccessKey): Option[String]
-
-  /** Get an [[AccessKey]] by key */
-  def get(k: String): Option[AccessKey]
-
-  /** Get all [[AccessKey]]s */
-  def getAll(): Seq[AccessKey]
-
-  /** Get all [[AccessKey]]s for a particular app ID */
-  def getByAppid(appid: Int): Seq[AccessKey]
-
-  /** Update an [[AccessKey]] */
-  def update(k: AccessKey): Unit
-
-  /** Delete an [[AccessKey]] */
-  def delete(k: String): Unit
-
-  /** Default implementation of key generation */
-  def generateKey: String = {
-    val sr = SecureRandom.getInstanceStrong
-    val srBytes = Array.fill(48)(0.toByte)
-    sr.nextBytes(srBytes)
-    Base64.encodeBase64URLSafeString(srBytes)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Apps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/Apps.scala b/data/src/main/scala/io/prediction/data/storage/Apps.scala
deleted file mode 100644
index 32343e1..0000000
--- a/data/src/main/scala/io/prediction/data/storage/Apps.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import io.prediction.annotation.DeveloperApi
-
-/** :: DeveloperApi ::
-  * Stores mapping of app IDs and names
-  *
-  * @param id ID of the app.
-  * @param name Name of the app.
-  * @param description Long description of the app.
-  * @group Meta Data
-  */
-@DeveloperApi
-case class App(
-  id: Int,
-  name: String,
-  description: Option[String])
-
-/** :: DeveloperApi ::
-  * Base trait of the [[App]] data access object
-  *
-  * @group Meta Data
-  */
-@DeveloperApi
-trait Apps {
-  /** Insert a new [[App]]. Returns a generated app ID if the supplied app ID is 0. */
-  def insert(app: App): Option[Int]
-
-  /** Get an [[App]] by app ID */
-  def get(id: Int): Option[App]
-
-  /** Get an [[App]] by app name */
-  def getByName(name: String): Option[App]
-
-  /** Get all [[App]]s */
-  def getAll(): Seq[App]
-
-  /** Update an [[App]] */
-  def update(app: App): Unit
-
-  /** Delete an [[App]] */
-  def delete(id: Int): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/BiMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/BiMap.scala b/data/src/main/scala/io/prediction/data/storage/BiMap.scala
deleted file mode 100644
index cbf3e12..0000000
--- a/data/src/main/scala/io/prediction/data/storage/BiMap.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import scala.collection.immutable.HashMap
-
-import org.apache.spark.rdd.RDD
-
-/** Immutable Bi-directional Map
-  *
-  */
-class BiMap[K, V] private[prediction] (
-  private val m: Map[K, V],
-  private val i: Option[BiMap[V, K]] = None
-  ) extends Serializable {
-
-  // NOTE: make inverse's inverse point back to current BiMap
-  val inverse: BiMap[V, K] = i.getOrElse {
-    val rev = m.map(_.swap)
-    require((rev.size == m.size),
-      s"Failed to create reversed map. Cannot have duplicated values.")
-    new BiMap(rev, Some(this))
-  }
-
-  def get(k: K): Option[V] = m.get(k)
-
-  def getOrElse(k: K, default: => V): V = m.getOrElse(k, default)
-
-  def contains(k: K): Boolean = m.contains(k)
-
-  def apply(k: K): V = m.apply(k)
-
-  /** Converts to a map.
-    * @return a map of type immutable.Map[K, V]
-    */
-  def toMap: Map[K, V] = m
-
-  /** Converts to a sequence.
-    * @return a sequence containing all elements of this map
-    */
-  def toSeq: Seq[(K, V)] = m.toSeq
-
-  def size: Int = m.size
-
-  def take(n: Int): BiMap[K, V] = BiMap(m.take(n))
-
-  override def toString: String = m.toString
-}
-
-object BiMap {
-
-  def apply[K, V](x: Map[K, V]): BiMap[K, V] = new BiMap(x)
-
-  /** Create a BiMap[String, Long] from a set of String. The Long index starts
-    * from 0.
-    * @param keys a set of String
-    * @return a String to Long BiMap
-    */
-  def stringLong(keys: Set[String]): BiMap[String, Long] = {
-    val hm = HashMap(keys.toSeq.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*)
-    new BiMap(hm)
-  }
-
-  /** Create a BiMap[String, Long] from an array of String.
-    * NOTE: the the array cannot have duplicated element.
-    * The Long index starts from 0.
-    * @param keys a set of String
-    * @return a String to Long BiMap
-    */
-  def stringLong(keys: Array[String]): BiMap[String, Long] = {
-    val hm = HashMap(keys.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*)
-    new BiMap(hm)
-  }
-
-  /** Create a BiMap[String, Long] from RDD[String]. The Long index starts
-    * from 0.
-    * @param keys RDD of String
-    * @return a String to Long BiMap
-    */
-  def stringLong(keys: RDD[String]): BiMap[String, Long] = {
-    stringLong(keys.distinct.collect)
-  }
-
-  /** Create a BiMap[String, Int] from a set of String. The Int index starts
-    * from 0.
-    * @param keys a set of String
-    * @return a String to Int BiMap
-    */
-  def stringInt(keys: Set[String]): BiMap[String, Int] = {
-    val hm = HashMap(keys.toSeq.zipWithIndex : _*)
-    new BiMap(hm)
-  }
-
-  /** Create a BiMap[String, Int] from an array of String.
-    * NOTE: the the array cannot have duplicated element.
-    * The Int index starts from 0.
-    * @param keys a set of String
-    * @return a String to Int BiMap
-    */
-  def stringInt(keys: Array[String]): BiMap[String, Int] = {
-    val hm = HashMap(keys.zipWithIndex : _*)
-    new BiMap(hm)
-  }
-
-  /** Create a BiMap[String, Int] from RDD[String]. The Int index starts
-    * from 0.
-    * @param keys RDD of String
-    * @return a String to Int BiMap
-    */
-  def stringInt(keys: RDD[String]): BiMap[String, Int] = {
-    stringInt(keys.distinct.collect)
-  }
-
-  private[this] def stringDoubleImpl(keys: Seq[String])
-  : BiMap[String, Double] = {
-    val ki = keys.zipWithIndex.map(e => (e._1, e._2.toDouble))
-    new BiMap(HashMap(ki : _*))
-  }
-
-  /** Create a BiMap[String, Double] from a set of String. The Double index
-    * starts from 0.
-    * @param keys a set of String
-    * @return a String to Double BiMap
-    */
-  def stringDouble(keys: Set[String]): BiMap[String, Double] = {
-    // val hm = HashMap(keys.toSeq.zipWithIndex.map(_.toDouble) : _*)
-    // new BiMap(hm)
-    stringDoubleImpl(keys.toSeq)
-  }
-
-  /** Create a BiMap[String, Double] from an array of String.
-    * NOTE: the the array cannot have duplicated element.
-    * The Double index starts from 0.
-    * @param keys a set of String
-    * @return a String to Double BiMap
-    */
-  def stringDouble(keys: Array[String]): BiMap[String, Double] = {
-    // val hm = HashMap(keys.zipWithIndex.mapValues(_.toDouble) : _*)
-    // new BiMap(hm)
-    stringDoubleImpl(keys.toSeq)
-  }
-
-  /** Create a BiMap[String, Double] from RDD[String]. The Double index starts
-    * from 0.
-    * @param keys RDD of String
-    * @return a String to Double BiMap
-    */
-  def stringDouble(keys: RDD[String]): BiMap[String, Double] = {
-    stringDoubleImpl(keys.distinct.collect)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Channels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/Channels.scala b/data/src/main/scala/io/prediction/data/storage/Channels.scala
deleted file mode 100644
index 3fa7aef..0000000
--- a/data/src/main/scala/io/prediction/data/storage/Channels.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import io.prediction.annotation.DeveloperApi
-
-/** :: DeveloperApi ::
-  * Stores mapping of channel IDs, names and app ID
-  *
-  * @param id ID of the channel
-  * @param name Name of the channel (must be unique within the same app)
-  * @param appid ID of the app which this channel belongs to
-  * @group Meta Data
-  */
-@DeveloperApi
-case class Channel(
-  id: Int,
-  name: String, // must be unique within the same app
-  appid: Int
-) {
-  require(Channel.isValidName(name),
-    "Invalid channel name: ${name}. ${Channel.nameConstraint}")
-}
-
-/** :: DeveloperApi ::
-  * Companion object of [[Channel]]
-  *
-  * @group Meta Data
-  */
-@DeveloperApi
-object Channel {
-  /** Examine whether the supplied channel name is valid. A valid channel name
-    * must consists of 1 to 16 alphanumeric and '-' characters.
-    *
-    * @param s Channel name to examine
-    * @return true if channel name is valid, false otherwise
-    */
-  def isValidName(s: String): Boolean = {
-    // note: update channelNameConstraint if this rule is changed
-    s.matches("^[a-zA-Z0-9-]{1,16}$")
-  }
-
-  /** For consistent error message display */
-  val nameConstraint: String =
-    "Only alphanumeric and - characters are allowed and max length is 16."
-}
-
-/** :: DeveloperApi ::
-  * Base trait of the [[Channel]] data access object
-  *
-  * @group Meta Data
-  */
-@DeveloperApi
-trait Channels {
-  /** Insert a new [[Channel]]. Returns a generated channel ID if original ID is 0. */
-  def insert(channel: Channel): Option[Int]
-
-  /** Get a [[Channel]] by channel ID */
-  def get(id: Int): Option[Channel]
-
-  /** Get all [[Channel]] by app ID */
-  def getByAppid(appid: Int): Seq[Channel]
-
-  /** Delete a [[Channel]] */
-  def delete(id: Int): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/DataMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/DataMap.scala b/data/src/main/scala/io/prediction/data/storage/DataMap.scala
deleted file mode 100644
index 91a0ba5..0000000
--- a/data/src/main/scala/io/prediction/data/storage/DataMap.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import org.json4s._
-import org.json4s.native.JsonMethods.parse
-
-import scala.collection.GenTraversableOnce
-import scala.collection.JavaConversions
-
-/** Exception class for [[DataMap]]
-  *
-  * @group Event Data
-  */
-case class DataMapException(msg: String, cause: Exception)
-  extends Exception(msg, cause) {
-  def this(msg: String) = this(msg, null)
-}
-
-/** A DataMap stores properties of the event or entity. Internally it is a Map
-  * whose keys are property names and values are corresponding JSON values
-  * respectively. Use the [[get]] method to retrieve the value of a mandatory
-  * property or use [[getOpt]] to retrieve the value of an optional property.
-  *
-  * @param fields Map of property name to JValue
-  * @group Event Data
-  */
-class DataMap (
-  val fields: Map[String, JValue]
-) extends Serializable {
-  @transient lazy implicit private val formats = DefaultFormats +
-    new DateTimeJson4sSupport.Serializer
-
-  /** Check the existence of a required property name. Throw an exception if
-    * it does not exist.
-    *
-    * @param name The property name
-    */
-  def require(name: String): Unit = {
-    if (!fields.contains(name)) {
-      throw new DataMapException(s"The field $name is required.")
-    }
-  }
-
-  /** Check if this DataMap contains a specific property.
-    *
-    * @param name The property name
-    * @return Return true if the property exists, else false.
-    */
-  def contains(name: String): Boolean = {
-    fields.contains(name)
-  }
-
-  /** Get the value of a mandatory property. Exception is thrown if the property
-    * does not exist.
-    *
-    * @tparam T The type of the property value
-    * @param name The property name
-    * @return Return the property value of type T
-    */
-  def get[T: Manifest](name: String): T = {
-    require(name)
-    fields(name) match {
-      case JNull => throw new DataMapException(
-        s"The required field $name cannot be null.")
-      case x: JValue => x.extract[T]
-    }
-  }
-
-  /** Get the value of an optional property. Return None if the property does
-    * not exist.
-    *
-    * @tparam T The type of the property value
-    * @param name The property name
-    * @return Return the property value of type Option[T]
-    */
-  def getOpt[T: Manifest](name: String): Option[T] = {
-    // either the field doesn't exist or its value is null
-    fields.get(name).flatMap(_.extract[Option[T]])
-  }
-
-  /** Get the value of an optional property. Return default value if the
-    * property does not exist.
-    *
-    * @tparam T The type of the property value
-    * @param name The property name
-    * @param default The default property value of type T
-    * @return Return the property value of type T
-    */
-  def getOrElse[T: Manifest](name: String, default: T): T = {
-    getOpt[T](name).getOrElse(default)
-  }
-
-  /** Java-friendly method for getting the value of a property. Return null if the
-    * property does not exist.
-    *
-    * @tparam T The type of the property value
-    * @param name The property name
-    * @param clazz The class of the type of the property value
-    * @return Return the property value of type T
-    */
-  def get[T](name: String, clazz: java.lang.Class[T]): T = {
-    val manifest =  new Manifest[T] {
-      override def erasure: Class[_] = clazz
-      override def runtimeClass: Class[_] = clazz
-    }
-
-    fields.get(name) match {
-      case None => null.asInstanceOf[T]
-      case Some(JNull) => null.asInstanceOf[T]
-      case Some(x) => x.extract[T](formats, manifest)
-    }
-  }
-
-  /** Java-friendly method for getting a list of values of a property. Return null if the
-    * property does not exist.
-    *
-    * @param name The property name
-    * @return Return the list of property values
-    */
-  def getStringList(name: String): java.util.List[String] = {
-    fields.get(name) match {
-      case None => null
-      case Some(JNull) => null
-      case Some(x) =>
-        JavaConversions.seqAsJavaList(x.extract[List[String]](formats, manifest[List[String]]))
-    }
-  }
-
-  /** Return a new DataMap with elements containing elements from the left hand
-    * side operand followed by elements from the right hand side operand.
-    *
-    * @param that Right hand side DataMap
-    * @return A new DataMap
-    */
-  def ++ (that: DataMap): DataMap = DataMap(this.fields ++ that.fields)
-
-  /** Creates a new DataMap from this DataMap by removing all elements of
-    * another collection.
-    *
-    * @param that A collection containing the removed property names
-    * @return A new DataMap
-    */
-  def -- (that: GenTraversableOnce[String]): DataMap =
-    DataMap(this.fields -- that)
-
-  /** Tests whether the DataMap is empty.
-    *
-    * @return true if the DataMap is empty, false otherwise.
-    */
-  def isEmpty: Boolean = fields.isEmpty
-
-  /** Collects all property names of this DataMap in a set.
-    *
-    * @return a set containing all property names of this DataMap.
-    */
-  def keySet: Set[String] = this.fields.keySet
-
-  /** Converts this DataMap to a List.
-    *
-    * @return a list of (property name, JSON value) tuples.
-    */
-  def toList(): List[(String, JValue)] = fields.toList
-
-  /** Converts this DataMap to a JObject.
-    *
-    * @return the JObject initialized by this DataMap.
-    */
-  def toJObject(): JObject = JObject(toList())
-
-  /** Converts this DataMap to case class of type T.
-    *
-    * @return the object of type T.
-    */
-  def extract[T: Manifest]: T = {
-    toJObject().extract[T]
-  }
-
-  override
-  def toString: String = s"DataMap($fields)"
-
-  override
-  def hashCode: Int = 41 + fields.hashCode
-
-  override
-  def equals(other: Any): Boolean = other match {
-    case that: DataMap => that.canEqual(this) && this.fields.equals(that.fields)
-    case _ => false
-  }
-
-  def canEqual(other: Any): Boolean = other.isInstanceOf[DataMap]
-}
-
-/** Companion object of the [[DataMap]] class
-  *
-  * @group Event Data
-  */
-object DataMap {
-  /** Create an empty DataMap
-    * @return an empty DataMap
-    */
-  def apply(): DataMap = new DataMap(Map[String, JValue]())
-
-  /** Create an DataMap from a Map of String to JValue
-    * @param fields a Map of String to JValue
-    * @return a new DataMap initialized by fields
-    */
-  def apply(fields: Map[String, JValue]): DataMap = new DataMap(fields)
-
-  /** Create an DataMap from a JObject
-    * @param jObj JObject
-    * @return a new DataMap initialized by a JObject
-    */
-  def apply(jObj: JObject): DataMap = {
-    if (jObj == null) {
-      apply()
-    } else {
-      new DataMap(jObj.obj.toMap)
-    }
-  }
-
-  /** Create an DataMap from a JSON String
-    * @param js JSON String. eg """{ "a": 1, "b": "foo" }"""
-    * @return a new DataMap initialized by a JSON string
-    */
-  def apply(js: String): DataMap = apply(parse(js).asInstanceOf[JObject])
-
-}