You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:48 UTC
[17/34] incubator-predictionio git commit: rename all except examples
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala b/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala
new file mode 100644
index 0000000..2d5939f
--- /dev/null
+++ b/core/src/test/scala/org/apache/predictionio/workflow/EvaluationWorkflowTest.scala
@@ -0,0 +1,61 @@
+package org.apache.predictionio.workflow
+
+import org.apache.predictionio.controller._
+
+import org.scalatest.FunSuite
+import org.scalatest.Matchers._
+
+class EvaluationWorkflowSuite extends FunSuite with SharedSparkContext {
+
+ test("Evaluation return best engine params, simple result type: Double") {
+ val engine = new Engine1()
+ val ep0 = EngineParams(dataSourceParams = Engine1.DSP(0.2))
+ val ep1 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
+ val ep2 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
+ val ep3 = EngineParams(dataSourceParams = Engine1.DSP(-0.2))
+ val engineParamsList = Seq(ep0, ep1, ep2, ep3)
+
+ val evaluator = MetricEvaluator(new Metric0())
+
+ object Eval extends Evaluation {
+ engineEvaluator = (new Engine1(), MetricEvaluator(new Metric0()))
+ }
+
+ val result = EvaluationWorkflow.runEvaluation(
+ sc,
+ Eval,
+ engine,
+ engineParamsList,
+ evaluator,
+ WorkflowParams())
+
+ result.bestScore.score shouldBe 0.3
+ result.bestEngineParams shouldBe ep1
+ }
+
+ test("Evaluation return best engine params, complex result type") {
+ val engine = new Engine1()
+ val ep0 = EngineParams(dataSourceParams = Engine1.DSP(0.2))
+ val ep1 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
+ val ep2 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
+ val ep3 = EngineParams(dataSourceParams = Engine1.DSP(-0.2))
+ val engineParamsList = Seq(ep0, ep1, ep2, ep3)
+
+ val evaluator = MetricEvaluator(new Metric1())
+
+ object Eval extends Evaluation {
+ engineEvaluator = (new Engine1(), MetricEvaluator(new Metric1()))
+ }
+
+ val result = EvaluationWorkflow.runEvaluation(
+ sc,
+ Eval,
+ engine,
+ engineParamsList,
+ evaluator,
+ WorkflowParams())
+
+ result.bestScore.score shouldBe Metric1.Result(0, 0.3)
+ result.bestEngineParams shouldBe ep1
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala b/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala
new file mode 100644
index 0000000..217f416
--- /dev/null
+++ b/core/src/test/scala/org/apache/predictionio/workflow/JsonExtractorSuite.scala
@@ -0,0 +1,383 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.workflow
+
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.controller.Utils
+import org.json4s.CustomSerializer
+import org.json4s.JsonAST.JField
+import org.json4s.JsonAST.JObject
+import org.json4s.JsonAST.JString
+import org.json4s.MappingException
+import org.json4s.native.JsonMethods.compact
+import org.json4s.native.JsonMethods.render
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+
+class JsonExtractorSuite extends FunSuite with Matchers {
+
+ test("Extract Scala object using option Json4sNative works with optional and default value " +
+ "provided") {
+
+ val json = """{"string": "query string", "optional": "optional string", "default": "d"}"""
+
+ val query = JsonExtractor.extract(
+ JsonExtractorOption.Json4sNative,
+ json,
+ classOf[ScalaQuery])
+
+ query should be (ScalaQuery("query string", Some("optional string"), "d"))
+ }
+
+ test("Extract Scala object using option Json4sNative works with no optional and no default " +
+ "value provided") {
+
+ val json = """{"string": "query string"}"""
+
+ val query = JsonExtractor.extract(
+ JsonExtractorOption.Json4sNative,
+ json,
+ classOf[ScalaQuery])
+
+ query should be (ScalaQuery("query string", None, "default"))
+ }
+
+ test("Extract Scala object using option Json4sNative works with null optional and null default" +
+ " value") {
+
+ val json = """{"string": "query string", "optional": null, "default": null}"""
+
+ val query = JsonExtractor.extract(
+ JsonExtractorOption.Json4sNative,
+ json,
+ classOf[ScalaQuery])
+
+ query should be (ScalaQuery("query string", None, "default"))
+ }
+
+ test("Extract Scala object using option Both works with optional and default value provided") {
+
+ val json = """{"string": "query string", "optional": "optional string", "default": "d"}"""
+
+ val query = JsonExtractor.extract(
+ JsonExtractorOption.Json4sNative,
+ json,
+ classOf[ScalaQuery])
+
+ query should be (ScalaQuery("query string", Some("optional string"), "d"))
+ }
+
+ test("Extract Scala object using option Both works with no optional and no default value " +
+ "provided") {
+
+ val json = """{"string": "query string"}"""
+
+ val query = JsonExtractor.extract(
+ JsonExtractorOption.Json4sNative,
+ json,
+ classOf[ScalaQuery])
+
+ query should be (ScalaQuery("query string", None, "default"))
+ }
+
+ test("Extract Scala object using option Both works with null optional and null default value") {
+
+ val json = """{"string": "query string", "optional": null, "default": null}"""
+
+ val query = JsonExtractor.extract(
+ JsonExtractorOption.Json4sNative,
+ json,
+ classOf[ScalaQuery])
+
+ query should be (ScalaQuery("query string", None, "default"))
+ }
+
+ test("Extract Scala object using option Gson should not get default value and optional none" +
+ " value") {
+
+ val json = """{"string": "query string"}"""
+ val query = JsonExtractor.extract(
+ JsonExtractorOption.Gson,
+ json,
+ classOf[ScalaQuery])
+
+ query should be (ScalaQuery("query string", null, null))
+ }
+
+ test("Extract Scala object using option Gson should throw an exception with optional " +
+ "value provided") {
+
+ val json = """{"string": "query string", "optional": "o", "default": "d"}"""
+ intercept[RuntimeException] {
+ JsonExtractor.extract(
+ JsonExtractorOption.Gson,
+ json,
+ classOf[ScalaQuery])
+ }
+ }
+
+ test("Extract Java object using option Gson works") {
+
+ val json = """{"q": "query string"}"""
+
+ val query = JsonExtractor.extract(
+ JsonExtractorOption.Gson,
+ json,
+ classOf[JavaQuery])
+
+ query should be (new JavaQuery("query string"))
+ }
+
+ test("Extract Java object using option Both works") {
+
+ val json = """{"q": "query string"}"""
+
+ val query = JsonExtractor.extract(
+ JsonExtractorOption.Both,
+ json,
+ classOf[JavaQuery])
+
+ query should be (new JavaQuery("query string"))
+ }
+
+ test("Extract Java object using option Json4sNative should throw an exception") {
+
+ val json = """{"q": "query string"}"""
+
+ intercept[MappingException] {
+ JsonExtractor.extract(
+ JsonExtractorOption.Json4sNative,
+ json,
+ classOf[JavaQuery])
+ }
+ }
+
+ test("Extract Scala object using option Json4sNative with custom deserializer") {
+ val json = """{"string": "query string", "optional": "o", "default": "d"}"""
+
+ val query = JsonExtractor.extract(
+ JsonExtractorOption.Json4sNative,
+ json,
+ classOf[ScalaQuery],
+ Utils.json4sDefaultFormats + new UpperCaseFormat
+ )
+
+ query should be(ScalaQuery("QUERY STRING", Some("O"), "D"))
+ }
+
+ test("Extract Java object usingoption Gson with custom deserializer") {
+ val json = """{"q": "query string"}"""
+
+ val query = JsonExtractor.extract(
+ extractorOption = JsonExtractorOption.Gson,
+ json = json,
+ clazz = classOf[JavaQuery],
+ gsonTypeAdapterFactories = Seq(new JavaQueryTypeAdapterFactory)
+ )
+
+ query should be(new JavaQuery("QUERY STRING"))
+ }
+
+ test("Java object to JValue using option Both works") {
+ val query = new JavaQuery("query string")
+ val jValue = JsonExtractor.toJValue(JsonExtractorOption.Both, query)
+
+ compact(render(jValue)) should be ("""{"q":"query string"}""")
+ }
+
+ test("Java object to JValue using option Gson works") {
+ val query = new JavaQuery("query string")
+ val jValue = JsonExtractor.toJValue(JsonExtractorOption.Gson, query)
+
+ compact(render(jValue)) should be ("""{"q":"query string"}""")
+ }
+
+ test("Java object to JValue using option Json4sNative results in empty Json") {
+ val query = new JavaQuery("query string")
+ val jValue = JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, query)
+
+ compact(render(jValue)) should be ("""{}""")
+ }
+
+ test("Scala object to JValue using option Both works") {
+ val query = new ScalaQuery("query string", Some("option"))
+ val jValue = JsonExtractor.toJValue(JsonExtractorOption.Both, query)
+
+ compact(render(jValue)) should
+ be ("""{"string":"query string","optional":"option","default":"default"}""")
+ }
+
+ test("Scala object to JValue using option Gson does not serialize optional") {
+ val query = new ScalaQuery("query string", Some("option"))
+ val jValue = JsonExtractor.toJValue(JsonExtractorOption.Gson, query)
+
+ compact(render(jValue)) should
+ be ("""{"string":"query string","optional":{},"default":"default"}""")
+ }
+
+ test("Scala object to JValue using option Json4sNative works") {
+ val query = new ScalaQuery("query string", Some("option"))
+ val jValue = JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, query)
+
+ compact(render(jValue)) should
+ be ("""{"string":"query string","optional":"option","default":"default"}""")
+ }
+
+ test("Scala object to JValue using option Json4sNative with custom serializer") {
+ val query = new ScalaQuery("query string", Some("option"))
+ val jValue = JsonExtractor.toJValue(
+ JsonExtractorOption.Json4sNative,
+ query,
+ Utils.json4sDefaultFormats + new UpperCaseFormat
+ )
+
+ compact(render(jValue)) should
+ be ("""{"string":"QUERY STRING","optional":"OPTION","default":"DEFAULT"}""")
+ }
+
+ test("Java object to JValue using option Gson with custom serializer") {
+ val query = new JavaQuery("query string")
+ val jValue = JsonExtractor.toJValue(
+ extractorOption = JsonExtractorOption.Gson,
+ o = query,
+ gsonTypeAdapterFactories = Seq(new JavaQueryTypeAdapterFactory)
+ )
+
+ compact(render(jValue)) should be ("""{"q":"QUERY STRING"}""")
+ }
+
+ test("Java Param to Json using option Both") {
+ val param = ("algo", new JavaParams("parameter"))
+ val json = JsonExtractor.paramToJson(JsonExtractorOption.Both, param)
+
+ json should be ("""{"algo":{"p":"parameter"}}""")
+ }
+
+ test("Java Param to Json using option Gson") {
+ val param = ("algo", new JavaParams("parameter"))
+ val json = JsonExtractor.paramToJson(JsonExtractorOption.Gson, param)
+
+ json should be ("""{"algo":{"p":"parameter"}}""")
+ }
+
+ test("Scala Param to Json using option Both") {
+ val param = ("algo", AlgorithmParams("parameter"))
+ val json = JsonExtractor.paramToJson(JsonExtractorOption.Both, param)
+
+ json should be ("""{"algo":{"a":"parameter"}}""")
+ }
+
+ test("Scala Param to Json using option Json4sNative") {
+ val param = ("algo", AlgorithmParams("parameter"))
+ val json = JsonExtractor.paramToJson(JsonExtractorOption.Json4sNative, param)
+
+ json should be ("""{"algo":{"a":"parameter"}}""")
+ }
+
+ test("Java Params to Json using option Both") {
+ val params = Seq(("algo", new JavaParams("parameter")), ("algo2", new JavaParams("parameter2")))
+ val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params)
+
+ json should be ("""[{"algo":{"p":"parameter"}},{"algo2":{"p":"parameter2"}}]""")
+ }
+
+ test("Java Params to Json using option Gson") {
+ val params = Seq(("algo", new JavaParams("parameter")), ("algo2", new JavaParams("parameter2")))
+ val json = JsonExtractor.paramsToJson(JsonExtractorOption.Gson, params)
+
+ json should be ("""[{"algo":{"p":"parameter"}},{"algo2":{"p":"parameter2"}}]""")
+ }
+
+ test("Scala Params to Json using option Both") {
+ val params =
+ Seq(("algo", AlgorithmParams("parameter")), ("algo2", AlgorithmParams("parameter2")))
+ val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params)
+
+ json should be (org.json4s.native.Serialization.write(params)(Utils.json4sDefaultFormats))
+ }
+
+ test("Scala Params to Json using option Json4sNative") {
+ val params =
+ Seq(("algo", AlgorithmParams("parameter")), ("algo2", AlgorithmParams("parameter2")))
+ val json = JsonExtractor.paramsToJson(JsonExtractorOption.Json4sNative, params)
+
+ json should be (org.json4s.native.Serialization.write(params)(Utils.json4sDefaultFormats))
+ }
+
+ test("Mixed Java and Scala Params to Json using option Both") {
+ val params =
+ Seq(("scala", AlgorithmParams("parameter")), ("java", new JavaParams("parameter2")))
+ val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params)
+
+ json should be ("""[{"scala":{"a":"parameter"}},{"java":{"p":"parameter2"}}]""")
+ }
+
+ test("Serializing Scala EngineParams works using option Json4sNative") {
+ val ep = new EngineParams(
+ dataSourceParams = ("ds", DataSourceParams("dsp")),
+ algorithmParamsList = Seq(("a0", AlgorithmParams("ap"))))
+
+ val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Json4sNative, ep)
+
+ json should be (
+ """{"dataSourceParams":{"ds":{"a":"dsp"}},"preparatorParams":{"":{}},""" +
+ """"algorithmParamsList":[{"a0":{"a":"ap"}}],"servingParams":{"":{}}}""")
+ }
+
+ test("Serializing Java EngineParams works using option Gson") {
+ val ep = new EngineParams(
+ dataSourceParams = ("ds", new JavaParams("dsp")),
+ algorithmParamsList = Seq(("a0", new JavaParams("ap")), ("a1", new JavaParams("ap2"))))
+
+ val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Gson, ep)
+
+ json should be (
+ """{"dataSourceParams":{"ds":{"p":"dsp"}},"preparatorParams":{"":{}},""" +
+ """"algorithmParamsList":[{"a0":{"p":"ap"}},{"a1":{"p":"ap2"}}],"servingParams":{"":{}}}""")
+ }
+
+ test("Serializing Java EngineParams works using option Both") {
+ val ep = new EngineParams(
+ dataSourceParams = ("ds", new JavaParams("dsp")),
+ algorithmParamsList = Seq(("a0", new JavaParams("ap")), ("a1", new JavaParams("ap2"))))
+
+ val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Both, ep)
+
+ json should be (
+ """{"dataSourceParams":{"ds":{"p":"dsp"}},"preparatorParams":{"":{}},""" +
+ """"algorithmParamsList":[{"a0":{"p":"ap"}},{"a1":{"p":"ap2"}}],"servingParams":{"":{}}}""")
+ }
+}
+
+private case class AlgorithmParams(a: String) extends Params
+
+private case class DataSourceParams(a: String) extends Params
+
+private case class ScalaQuery(string: String, optional: Option[String], default: String = "default")
+
+private class UpperCaseFormat extends CustomSerializer[ScalaQuery](format => ( {
+ case JObject(JField("string", JString(string)) ::
+ JField("optional", JString(optional)) ::
+ JField("default", JString(default)) ::
+ Nil) => ScalaQuery(string.toUpperCase, Some(optional.toUpperCase), default.toUpperCase)
+}, {
+ case x: ScalaQuery =>
+ JObject(
+ JField("string", JString(x.string.toUpperCase)),
+ JField("optional", JString(x.optional.get.toUpperCase)),
+ JField("default", JString(x.default.toUpperCase)))
+}))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/Utils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/Utils.scala b/data/src/main/scala/io/prediction/data/Utils.scala
deleted file mode 100644
index 78b71cc..0000000
--- a/data/src/main/scala/io/prediction/data/Utils.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data
-
-import org.joda.time.DateTime
-import org.joda.time.format.ISODateTimeFormat
-
-import java.lang.IllegalArgumentException
-
-private[prediction] object Utils {
-
- // use dateTime() for strict ISO8601 format
- val dateTimeFormatter = ISODateTimeFormat.dateTime().withOffsetParsed()
-
- val dateTimeNoMillisFormatter =
- ISODateTimeFormat.dateTimeNoMillis().withOffsetParsed()
-
- def stringToDateTime(dt: String): DateTime = {
- // We accept two formats.
- // 1. "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"
- // 2. "yyyy-MM-dd'T'HH:mm:ssZZ"
- // The first one also takes milliseconds into account.
- try {
- // formatting for "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"
- dateTimeFormatter.parseDateTime(dt)
- } catch {
- case e: IllegalArgumentException => {
- // handle when the datetime string doesn't specify milliseconds.
- dateTimeNoMillisFormatter.parseDateTime(dt)
- }
- }
- }
-
- def dateTimeToString(dt: DateTime): String = dateTimeFormatter.print(dt)
- // dt.toString
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/Common.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/Common.scala b/data/src/main/scala/io/prediction/data/api/Common.scala
deleted file mode 100644
index 6681a1d..0000000
--- a/data/src/main/scala/io/prediction/data/api/Common.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.api
-
-import io.prediction.data.webhooks.ConnectorException
-import io.prediction.data.storage.StorageException
-
-import spray.routing._
-import spray.routing.Directives._
-import spray.routing.Rejection
-import spray.http.StatusCodes
-import spray.http.StatusCode
-import spray.httpx.Json4sSupport
-
-import org.json4s.Formats
-import org.json4s.DefaultFormats
-
-object Common {
-
- object Json4sProtocol extends Json4sSupport {
- implicit def json4sFormats: Formats = DefaultFormats
- }
-
- import Json4sProtocol._
-
- val rejectionHandler = RejectionHandler {
- case MalformedRequestContentRejection(msg, _) :: _ =>
- complete(StatusCodes.BadRequest, Map("message" -> msg))
- case MissingQueryParamRejection(msg) :: _ =>
- complete(StatusCodes.NotFound,
- Map("message" -> s"missing required query parameter ${msg}."))
- case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => {
- val msg = cause match {
- case AuthenticationFailedRejection.CredentialsRejected =>
- "Invalid accessKey."
- case AuthenticationFailedRejection.CredentialsMissing =>
- "Missing accessKey."
- }
- complete(StatusCodes.Unauthorized, challengeHeaders, Map("message" -> msg))
- }
- case ChannelRejection(msg) :: _ =>
- complete(StatusCodes.Unauthorized, Map("message" -> msg))
- case NonExistentAppRejection(msg) :: _ =>
- complete(StatusCodes.Unauthorized, Map("message" -> msg))
- }
-
- val exceptionHandler = ExceptionHandler {
- case e: ConnectorException => {
- val msg = s"${e.getMessage()}"
- complete(StatusCodes.BadRequest, Map("message" -> msg))
- }
- case e: StorageException => {
- val msg = s"${e.getMessage()}"
- complete(StatusCodes.InternalServerError, Map("message" -> msg))
- }
- case e: Exception => {
- val msg = s"${e.getMessage()}"
- complete(StatusCodes.InternalServerError, Map("message" -> msg))
- }
- }
-}
-
-/** invalid channel */
-case class ChannelRejection(msg: String) extends Rejection
-
-/** the app doesn't exist */
-case class NonExistentAppRejection(msg: String) extends Rejection
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventInfo.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/EventInfo.scala b/data/src/main/scala/io/prediction/data/api/EventInfo.scala
deleted file mode 100644
index 1e324c2..0000000
--- a/data/src/main/scala/io/prediction/data/api/EventInfo.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.api
-
-import io.prediction.data.storage.Event
-
-case class EventInfo(
- appId: Int,
- channelId: Option[Int],
- event: Event)
-
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventServer.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/EventServer.scala b/data/src/main/scala/io/prediction/data/api/EventServer.scala
deleted file mode 100644
index 139f964..0000000
--- a/data/src/main/scala/io/prediction/data/api/EventServer.scala
+++ /dev/null
@@ -1,640 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.api
-
-import akka.event.Logging
-import sun.misc.BASE64Decoder
-
-import java.util.concurrent.TimeUnit
-
-import akka.actor._
-import akka.io.IO
-import akka.pattern.ask
-import akka.util.Timeout
-import io.prediction.data.Utils
-import io.prediction.data.storage.AccessKeys
-import io.prediction.data.storage.Channels
-import io.prediction.data.storage.DateTimeJson4sSupport
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventJson4sSupport
-import io.prediction.data.storage.BatchEventsJson4sSupport
-import io.prediction.data.storage.LEvents
-import io.prediction.data.storage.Storage
-import org.json4s.DefaultFormats
-import org.json4s.Formats
-import org.json4s.JObject
-import org.json4s.native.JsonMethods.parse
-import spray.can.Http
-import spray.http.FormData
-import spray.http.MediaTypes
-import spray.http.StatusCodes
-import spray.httpx.Json4sSupport
-import spray.routing._
-import spray.routing.authentication.Authentication
-
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.util.{Try, Success, Failure}
-
-class EventServiceActor(
- val eventClient: LEvents,
- val accessKeysClient: AccessKeys,
- val channelsClient: Channels,
- val config: EventServerConfig) extends HttpServiceActor {
-
- object Json4sProtocol extends Json4sSupport {
- implicit def json4sFormats: Formats = DefaultFormats +
- new EventJson4sSupport.APISerializer +
- new BatchEventsJson4sSupport.APISerializer +
- // NOTE: don't use Json4s JodaTimeSerializers since it has issues,
- // some format not converted, or timezone not correct
- new DateTimeJson4sSupport.Serializer
- }
-
-
- val MaxNumberOfEventsPerBatchRequest = 50
-
- val logger = Logging(context.system, this)
-
- // we use the enclosing ActorContext's or ActorSystem's dispatcher for our
- // Futures
- implicit def executionContext: ExecutionContext = context.dispatcher
-
- implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-
- val rejectionHandler = Common.rejectionHandler
-
- val jsonPath = """(.+)\.json$""".r
- val formPath = """(.+)\.form$""".r
-
- val pluginContext = EventServerPluginContext(logger)
-
- private lazy val base64Decoder = new BASE64Decoder
-
- case class AuthData(appId: Int, channelId: Option[Int], events: Seq[String])
-
- /* with accessKey in query/header, return appId if succeed */
- def withAccessKey: RequestContext => Future[Authentication[AuthData]] = {
- ctx: RequestContext =>
- val accessKeyParamOpt = ctx.request.uri.query.get("accessKey")
- val channelParamOpt = ctx.request.uri.query.get("channel")
- Future {
- // with accessKey in query, return appId if succeed
- accessKeyParamOpt.map { accessKeyParam =>
- accessKeysClient.get(accessKeyParam).map { k =>
- channelParamOpt.map { ch =>
- val channelMap =
- channelsClient.getByAppid(k.appid)
- .map(c => (c.name, c.id)).toMap
- if (channelMap.contains(ch)) {
- Right(AuthData(k.appid, Some(channelMap(ch)), k.events))
- } else {
- Left(ChannelRejection(s"Invalid channel '$ch'."))
- }
- }.getOrElse{
- Right(AuthData(k.appid, None, k.events))
- }
- }.getOrElse(FailedAuth)
- }.getOrElse {
- // with accessKey in header, return appId if succeed
- ctx.request.headers.find(_.name == "Authorization").map { authHeader \u21d2
- authHeader.value.split("Basic ") match {
- case Array(_, value) \u21d2
- val appAccessKey =
- new String(base64Decoder.decodeBuffer(value)).trim.split(":")(0)
- accessKeysClient.get(appAccessKey) match {
- case Some(k) \u21d2 Right(AuthData(k.appid, None, k.events))
- case None \u21d2 FailedAuth
- }
-
- case _ \u21d2 FailedAuth
- }
- }.getOrElse(MissedAuth)
- }
- }
- }
-
- private val FailedAuth = Left(
- AuthenticationFailedRejection(
- AuthenticationFailedRejection.CredentialsRejected, List()
- )
- )
-
- private val MissedAuth = Left(
- AuthenticationFailedRejection(
- AuthenticationFailedRejection.CredentialsMissing, List()
- )
- )
-
- lazy val statsActorRef = actorRefFactory.actorSelection("/user/StatsActor")
- lazy val pluginsActorRef = actorRefFactory.actorSelection("/user/PluginsActor")
-
- val route: Route =
- pathSingleSlash {
- import Json4sProtocol._
-
- get {
- respondWithMediaType(MediaTypes.`application/json`) {
- complete(Map("status" -> "alive"))
- }
- }
- } ~
- path("plugins.json") {
- import Json4sProtocol._
- get {
- respondWithMediaType(MediaTypes.`application/json`) {
- complete {
- Map("plugins" -> Map(
- "inputblockers" -> pluginContext.inputBlockers.map { case (n, p) =>
- n -> Map(
- "name" -> p.pluginName,
- "description" -> p.pluginDescription,
- "class" -> p.getClass.getName)
- },
- "inputsniffers" -> pluginContext.inputSniffers.map { case (n, p) =>
- n -> Map(
- "name" -> p.pluginName,
- "description" -> p.pluginDescription,
- "class" -> p.getClass.getName)
- }
- ))
- }
- }
- }
- } ~
- path("plugins" / Segments) { segments =>
- get {
- handleExceptions(Common.exceptionHandler) {
- authenticate(withAccessKey) { authData =>
- respondWithMediaType(MediaTypes.`application/json`) {
- complete {
- val pluginArgs = segments.drop(2)
- val pluginType = segments(0)
- val pluginName = segments(1)
- pluginType match {
- case EventServerPlugin.inputBlocker =>
- pluginContext.inputBlockers(pluginName).handleREST(
- authData.appId,
- authData.channelId,
- pluginArgs)
- case EventServerPlugin.inputSniffer =>
- pluginsActorRef ? PluginsActor.HandleREST(
- appId = authData.appId,
- channelId = authData.channelId,
- pluginName = pluginName,
- pluginArgs = pluginArgs) map {
- _.asInstanceOf[String]
- }
- }
- }
- }
- }
- }
- }
- } ~
- path("events" / jsonPath ) { eventId =>
-
- import Json4sProtocol._
-
- get {
- handleExceptions(Common.exceptionHandler) {
- handleRejections(rejectionHandler) {
- authenticate(withAccessKey) { authData =>
- val appId = authData.appId
- val channelId = authData.channelId
- respondWithMediaType(MediaTypes.`application/json`) {
- complete {
- logger.debug(s"GET event ${eventId}.")
- val data = eventClient.futureGet(eventId, appId, channelId).map { eventOpt =>
- eventOpt.map( event =>
- (StatusCodes.OK, event)
- ).getOrElse(
- (StatusCodes.NotFound, Map("message" -> "Not Found"))
- )
- }
- data
- }
- }
- }
- }
- }
- } ~
- delete {
- handleExceptions(Common.exceptionHandler) {
- handleRejections(rejectionHandler) {
- authenticate(withAccessKey) { authData =>
- val appId = authData.appId
- val channelId = authData.channelId
- respondWithMediaType(MediaTypes.`application/json`) {
- complete {
- logger.debug(s"DELETE event ${eventId}.")
- val data = eventClient.futureDelete(eventId, appId, channelId).map { found =>
- if (found) {
- (StatusCodes.OK, Map("message" -> "Found"))
- } else {
- (StatusCodes.NotFound, Map("message" -> "Not Found"))
- }
- }
- data
- }
- }
- }
- }
- }
- }
- } ~
- path("events.json") {
-
- import Json4sProtocol._
-
- post {
- handleExceptions(Common.exceptionHandler) {
- handleRejections(rejectionHandler) {
- authenticate(withAccessKey) { authData =>
- val appId = authData.appId
- val channelId = authData.channelId
- val events = authData.events
- entity(as[Event]) { event =>
- complete {
- if (events.isEmpty || authData.events.contains(event.event)) {
- pluginContext.inputBlockers.values.foreach(
- _.process(EventInfo(
- appId = appId,
- channelId = channelId,
- event = event), pluginContext))
- val data = eventClient.futureInsert(event, appId, channelId).map { id =>
- pluginsActorRef ! EventInfo(
- appId = appId,
- channelId = channelId,
- event = event)
- val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
- if (config.stats) {
- statsActorRef ! Bookkeeping(appId, result._1, event)
- }
- result
- }
- data
- } else {
- (StatusCodes.Forbidden,
- Map("message" -> s"${event.event} events are not allowed"))
- }
- }
- }
- }
- }
- }
- } ~
- get {
- handleExceptions(Common.exceptionHandler) {
- handleRejections(rejectionHandler) {
- authenticate(withAccessKey) { authData =>
- val appId = authData.appId
- val channelId = authData.channelId
- parameters(
- 'startTime.as[Option[String]],
- 'untilTime.as[Option[String]],
- 'entityType.as[Option[String]],
- 'entityId.as[Option[String]],
- 'event.as[Option[String]],
- 'targetEntityType.as[Option[String]],
- 'targetEntityId.as[Option[String]],
- 'limit.as[Option[Int]],
- 'reversed.as[Option[Boolean]]) {
- (startTimeStr, untilTimeStr, entityType, entityId,
- eventName, // only support one event name
- targetEntityType, targetEntityId,
- limit, reversed) =>
- respondWithMediaType(MediaTypes.`application/json`) {
- complete {
- logger.debug(
- s"GET events of appId=${appId} " +
- s"st=${startTimeStr} ut=${untilTimeStr} " +
- s"et=${entityType} eid=${entityId} " +
- s"li=${limit} rev=${reversed} ")
-
- require(!((reversed == Some(true))
- && (entityType.isEmpty || entityId.isEmpty)),
- "the parameter reversed can only be used with" +
- " both entityType and entityId specified.")
-
- val parseTime = Future {
- val startTime = startTimeStr.map(Utils.stringToDateTime(_))
- val untilTime = untilTimeStr.map(Utils.stringToDateTime(_))
- (startTime, untilTime)
- }
-
-
- parseTime.flatMap { case (startTime, untilTime) =>
- val data = eventClient.futureFind(
- appId = appId,
- channelId = channelId,
- startTime = startTime,
- untilTime = untilTime,
- entityType = entityType,
- entityId = entityId,
- eventNames = eventName.map(List(_)),
- targetEntityType = targetEntityType.map(Some(_)),
- targetEntityId = targetEntityId.map(Some(_)),
- limit = limit.orElse(Some(20)),
- reversed = reversed)
- .map { eventIter =>
- if (eventIter.hasNext) {
- (StatusCodes.OK, eventIter.toArray)
- } else {
- (StatusCodes.NotFound,
- Map("message" -> "Not Found"))
- }
- }
- data
- }.recover {
- case e: Exception =>
- (StatusCodes.BadRequest, Map("message" -> s"${e}"))
- }
- }
- }
- }
- }
- }
- }
- }
- } ~
- path("batch" / "events.json") {
-
- import Json4sProtocol._
-
- post {
- handleExceptions(Common.exceptionHandler) {
- handleRejections(rejectionHandler) {
- authenticate(withAccessKey) { authData =>
- val appId = authData.appId
- val channelId = authData.channelId
- val allowedEvents = authData.events
- val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = {
- case Success(event) => {
- if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) {
- pluginContext.inputBlockers.values.foreach(
- _.process(EventInfo(
- appId = appId,
- channelId = channelId,
- event = event), pluginContext))
- val data = eventClient.futureInsert(event, appId, channelId).map { id =>
- pluginsActorRef ! EventInfo(
- appId = appId,
- channelId = channelId,
- event = event)
- val status = StatusCodes.Created
- val result = Map(
- "status" -> status.intValue,
- "eventId" -> s"${id}")
- if (config.stats) {
- statsActorRef ! Bookkeeping(appId, status, event)
- }
- result
- }.recover { case exception =>
- Map(
- "status" -> StatusCodes.InternalServerError.intValue,
- "message" -> s"${exception.getMessage()}")
- }
- data
- } else {
- Future.successful(Map(
- "status" -> StatusCodes.Forbidden.intValue,
- "message" -> s"${event.event} events are not allowed"))
- }
- }
- case Failure(exception) => {
- Future.successful(Map(
- "status" -> StatusCodes.BadRequest.intValue,
- "message" -> s"${exception.getMessage()}"))
- }
- }
-
- entity(as[Seq[Try[Event]]]) { events =>
- complete {
- if (events.length <= MaxNumberOfEventsPerBatchRequest) {
- Future.traverse(events)(handleEvent)
- } else {
- (StatusCodes.BadRequest,
- Map("message" -> (s"Batch request must have less than or equal to " +
- s"${MaxNumberOfEventsPerBatchRequest} events")))
- }
- }
- }
- }
- }
- }
- }
- } ~
- path("stats.json") {
-
- import Json4sProtocol._
-
- get {
- handleExceptions(Common.exceptionHandler) {
- handleRejections(rejectionHandler) {
- authenticate(withAccessKey) { authData =>
- val appId = authData.appId
- respondWithMediaType(MediaTypes.`application/json`) {
- if (config.stats) {
- complete {
- statsActorRef ? GetStats(appId) map {
- _.asInstanceOf[Map[String, StatsSnapshot]]
- }
- }
- } else {
- complete(
- StatusCodes.NotFound,
- parse("""{"message": "To see stats, launch Event Server """ +
- """with --stats argument."}"""))
- }
- }
- }
- }
- }
- } // stats.json get
- } ~
- path("webhooks" / jsonPath ) { web =>
- import Json4sProtocol._
-
- post {
- handleExceptions(Common.exceptionHandler) {
- handleRejections(rejectionHandler) {
- authenticate(withAccessKey) { authData =>
- val appId = authData.appId
- val channelId = authData.channelId
- respondWithMediaType(MediaTypes.`application/json`) {
- entity(as[JObject]) { jObj =>
- complete {
- Webhooks.postJson(
- appId = appId,
- channelId = channelId,
- web = web,
- data = jObj,
- eventClient = eventClient,
- log = logger,
- stats = config.stats,
- statsActorRef = statsActorRef)
- }
- }
- }
- }
- }
- }
- } ~
- get {
- handleExceptions(Common.exceptionHandler) {
- handleRejections(rejectionHandler) {
- authenticate(withAccessKey) { authData =>
- val appId = authData.appId
- val channelId = authData.channelId
- respondWithMediaType(MediaTypes.`application/json`) {
- complete {
- Webhooks.getJson(
- appId = appId,
- channelId = channelId,
- web = web,
- log = logger)
- }
- }
- }
- }
- }
- }
- } ~
- path("webhooks" / formPath ) { web =>
- post {
- handleExceptions(Common.exceptionHandler) {
- handleRejections(rejectionHandler) {
- authenticate(withAccessKey) { authData =>
- val appId = authData.appId
- val channelId = authData.channelId
- respondWithMediaType(MediaTypes.`application/json`) {
- entity(as[FormData]){ formData =>
- // logger.debug(formData.toString)
- complete {
- // respond with JSON
- import Json4sProtocol._
-
- Webhooks.postForm(
- appId = appId,
- channelId = channelId,
- web = web,
- data = formData,
- eventClient = eventClient,
- log = logger,
- stats = config.stats,
- statsActorRef = statsActorRef)
- }
- }
- }
- }
- }
- }
- } ~
- get {
- handleExceptions(Common.exceptionHandler) {
- handleRejections(rejectionHandler) {
- authenticate(withAccessKey) { authData =>
- val appId = authData.appId
- val channelId = authData.channelId
- respondWithMediaType(MediaTypes.`application/json`) {
- complete {
- // respond with JSON
- import Json4sProtocol._
-
- Webhooks.getForm(
- appId = appId,
- channelId = channelId,
- web = web,
- log = logger)
- }
- }
- }
- }
- }
- }
-
- }
-
- def receive: Actor.Receive = runRoute(route)
-}
-
-
-
-/* message */
-case class StartServer(host: String, port: Int)
-
-class EventServerActor(
- val eventClient: LEvents,
- val accessKeysClient: AccessKeys,
- val channelsClient: Channels,
- val config: EventServerConfig) extends Actor with ActorLogging {
- val child = context.actorOf(
- Props(classOf[EventServiceActor],
- eventClient,
- accessKeysClient,
- channelsClient,
- config),
- "EventServiceActor")
- implicit val system = context.system
-
- def receive: Actor.Receive = {
- case StartServer(host, portNum) => {
- IO(Http) ! Http.Bind(child, interface = host, port = portNum)
- }
- case m: Http.Bound => log.info("Bound received. EventServer is ready.")
- case m: Http.CommandFailed => log.error("Command failed.")
- case _ => log.error("Unknown message.")
- }
-}
-
-case class EventServerConfig(
- ip: String = "localhost",
- port: Int = 7070,
- plugins: String = "plugins",
- stats: Boolean = false)
-
-object EventServer {
- def createEventServer(config: EventServerConfig): Unit = {
- implicit val system = ActorSystem("EventServerSystem")
-
- val eventClient = Storage.getLEvents()
- val accessKeysClient = Storage.getMetaDataAccessKeys()
- val channelsClient = Storage.getMetaDataChannels()
-
- val serverActor = system.actorOf(
- Props(
- classOf[EventServerActor],
- eventClient,
- accessKeysClient,
- channelsClient,
- config),
- "EventServerActor"
- )
- if (config.stats) system.actorOf(Props[StatsActor], "StatsActor")
- system.actorOf(Props[PluginsActor], "PluginsActor")
- serverActor ! StartServer(config.ip, config.port)
- system.awaitTermination()
- }
-}
-
-object Run {
- def main(args: Array[String]) {
- EventServer.createEventServer(EventServerConfig(
- ip = "0.0.0.0",
- port = 7070))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala b/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala
deleted file mode 100644
index a87fc84..0000000
--- a/data/src/main/scala/io/prediction/data/api/EventServerPlugin.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.api
-
-trait EventServerPlugin {
- val pluginName: String
- val pluginDescription: String
- val pluginType: String
-
- def start(context: EventServerPluginContext): Unit
-
- def process(eventInfo: EventInfo, context: EventServerPluginContext)
-
- def handleREST(appId: Int, channelId: Option[Int], arguments: Seq[String]): String
-}
-
-object EventServerPlugin {
- val inputBlocker = "inputblocker"
- val inputSniffer = "inputsniffer"
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala b/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala
deleted file mode 100644
index 1d8d36e..0000000
--- a/data/src/main/scala/io/prediction/data/api/EventServerPluginContext.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.api
-
-import java.util.ServiceLoader
-
-import akka.event.LoggingAdapter
-import grizzled.slf4j.Logging
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-
-class EventServerPluginContext(
- val plugins: mutable.Map[String, mutable.Map[String, EventServerPlugin]],
- val log: LoggingAdapter) {
- def inputBlockers: Map[String, EventServerPlugin] =
- plugins.getOrElse(EventServerPlugin.inputBlocker, Map()).toMap
-
- def inputSniffers: Map[String, EventServerPlugin] =
- plugins.getOrElse(EventServerPlugin.inputSniffer, Map()).toMap
-}
-
-object EventServerPluginContext extends Logging {
- def apply(log: LoggingAdapter): EventServerPluginContext = {
- val plugins = mutable.Map[String, mutable.Map[String, EventServerPlugin]](
- EventServerPlugin.inputBlocker -> mutable.Map(),
- EventServerPlugin.inputSniffer -> mutable.Map())
- val serviceLoader = ServiceLoader.load(classOf[EventServerPlugin])
- serviceLoader foreach { service =>
- plugins(service.pluginType) += service.pluginName -> service
- }
- new EventServerPluginContext(
- plugins,
- log)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/PluginsActor.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/PluginsActor.scala b/data/src/main/scala/io/prediction/data/api/PluginsActor.scala
deleted file mode 100644
index 7883adf..0000000
--- a/data/src/main/scala/io/prediction/data/api/PluginsActor.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.api
-
-import akka.actor.Actor
-import akka.event.Logging
-
-class PluginsActor() extends Actor {
- implicit val system = context.system
- val log = Logging(system, this)
-
- val pluginContext = EventServerPluginContext(log)
-
- def receive: PartialFunction[Any, Unit] = {
- case e: EventInfo =>
- pluginContext.inputSniffers.values.foreach(_.process(e, pluginContext))
- case h: PluginsActor.HandleREST =>
- try {
- sender() ! pluginContext.inputSniffers(h.pluginName).handleREST(
- h.appId,
- h.channelId,
- h.pluginArgs)
- } catch {
- case e: Exception =>
- sender() ! s"""{"message":"${e.getMessage}"}"""
- }
- case _ =>
- log.error("Unknown message sent to Event Server input sniffer plugin host.")
- }
-}
-
-object PluginsActor {
- case class HandleREST(
- pluginName: String,
- appId: Int,
- channelId: Option[Int],
- pluginArgs: Seq[String])
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/Stats.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/Stats.scala b/data/src/main/scala/io/prediction/data/api/Stats.scala
deleted file mode 100644
index ca5f05e..0000000
--- a/data/src/main/scala/io/prediction/data/api/Stats.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.api
-
-import io.prediction.data.storage.Event
-
-import spray.http.StatusCode
-
-import scala.collection.mutable.{ HashMap => MHashMap }
-import scala.collection.mutable
-
-import com.github.nscala_time.time.Imports.DateTime
-
-case class EntityTypesEvent(
- val entityType: String,
- val targetEntityType: Option[String],
- val event: String) {
-
- def this(e: Event) = this(
- e.entityType,
- e.targetEntityType,
- e.event)
-}
-
-case class KV[K, V](key: K, value: V)
-
-case class StatsSnapshot(
- val startTime: DateTime,
- val endTime: Option[DateTime],
- val basic: Seq[KV[EntityTypesEvent, Long]],
- val statusCode: Seq[KV[StatusCode, Long]]
-)
-
-
-class Stats(val startTime: DateTime) {
- private[this] var _endTime: Option[DateTime] = None
- var statusCodeCount = MHashMap[(Int, StatusCode), Long]().withDefaultValue(0L)
- var eteCount = MHashMap[(Int, EntityTypesEvent), Long]().withDefaultValue(0L)
-
- def cutoff(endTime: DateTime) {
- _endTime = Some(endTime)
- }
-
- def update(appId: Int, statusCode: StatusCode, event: Event) {
- statusCodeCount((appId, statusCode)) += 1
- eteCount((appId, new EntityTypesEvent(event))) += 1
- }
-
- def extractByAppId[K, V](appId: Int, m: mutable.Map[(Int, K), V])
- : Seq[KV[K, V]] = {
- m
- .toSeq
- .flatMap { case (k, v) =>
- if (k._1 == appId) { Seq(KV(k._2, v)) } else { Seq() }
- }
- }
-
- def get(appId: Int): StatsSnapshot = {
- StatsSnapshot(
- startTime,
- _endTime,
- extractByAppId(appId, eteCount),
- extractByAppId(appId, statusCodeCount)
- )
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/StatsActor.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/StatsActor.scala b/data/src/main/scala/io/prediction/data/api/StatsActor.scala
deleted file mode 100644
index 857352f..0000000
--- a/data/src/main/scala/io/prediction/data/api/StatsActor.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.api
-
-import io.prediction.data.storage.Event
-
-import spray.http.StatusCode
-
-import akka.actor.Actor
-import akka.event.Logging
-
-import com.github.nscala_time.time.Imports.DateTime
-
-/* message to StatsActor */
-case class Bookkeeping(val appId: Int, statusCode: StatusCode, event: Event)
-
-/* message to StatsActor */
-case class GetStats(val appId: Int)
-
-class StatsActor extends Actor {
- implicit val system = context.system
- val log = Logging(system, this)
-
- def getCurrent: DateTime = {
- DateTime.now.
- withMinuteOfHour(0).
- withSecondOfMinute(0).
- withMillisOfSecond(0)
- }
-
- var longLiveStats = new Stats(DateTime.now)
- var hourlyStats = new Stats(getCurrent)
-
- var prevHourlyStats = new Stats(getCurrent.minusHours(1))
- prevHourlyStats.cutoff(hourlyStats.startTime)
-
- def bookkeeping(appId: Int, statusCode: StatusCode, event: Event) {
- val current = getCurrent
- // If the current hour is different from the stats start time, we create
- // another stats instance, and move the current to prev.
- if (current != hourlyStats.startTime) {
- prevHourlyStats = hourlyStats
- prevHourlyStats.cutoff(current)
- hourlyStats = new Stats(current)
- }
-
- hourlyStats.update(appId, statusCode, event)
- longLiveStats.update(appId, statusCode, event)
- }
-
- def receive: Actor.Receive = {
- case Bookkeeping(appId, statusCode, event) =>
- bookkeeping(appId, statusCode, event)
- case GetStats(appId) => sender() ! Map(
- "time" -> DateTime.now,
- "currentHour" -> hourlyStats.get(appId),
- "prevHour" -> prevHourlyStats.get(appId),
- "longLive" -> longLiveStats.get(appId))
- case _ => log.error("Unknown message.")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/Webhooks.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/Webhooks.scala b/data/src/main/scala/io/prediction/data/api/Webhooks.scala
deleted file mode 100644
index ff18888..0000000
--- a/data/src/main/scala/io/prediction/data/api/Webhooks.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.api
-
-import io.prediction.data.webhooks.JsonConnector
-import io.prediction.data.webhooks.FormConnector
-import io.prediction.data.webhooks.ConnectorUtil
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventJson4sSupport
-import io.prediction.data.storage.LEvents
-
-import spray.routing._
-import spray.routing.Directives._
-import spray.http.StatusCodes
-import spray.http.StatusCode
-import spray.http.FormData
-import spray.httpx.Json4sSupport
-
-import org.json4s.Formats
-import org.json4s.DefaultFormats
-import org.json4s.JObject
-
-import akka.event.LoggingAdapter
-import akka.actor.ActorSelection
-
-import scala.concurrent.{ExecutionContext, Future}
-
-
-private[prediction] object Webhooks {
-
- def postJson(
- appId: Int,
- channelId: Option[Int],
- web: String,
- data: JObject,
- eventClient: LEvents,
- log: LoggingAdapter,
- stats: Boolean,
- statsActorRef: ActorSelection
- )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
-
- val eventFuture = Future {
- WebhooksConnectors.json.get(web).map { connector =>
- ConnectorUtil.toEvent(connector, data)
- }
- }
-
- eventFuture.flatMap { eventOpt =>
- if (eventOpt.isEmpty) {
- Future successful {
- val message = s"webhooks connection for ${web} is not supported."
- (StatusCodes.NotFound, Map("message" -> message))
- }
- } else {
- val event = eventOpt.get
- val data = eventClient.futureInsert(event, appId, channelId).map { id =>
- val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
-
- if (stats) {
- statsActorRef ! Bookkeeping(appId, result._1, event)
- }
- result
- }
- data
- }
- }
- }
-
- def getJson(
- appId: Int,
- channelId: Option[Int],
- web: String,
- log: LoggingAdapter
- )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
- Future {
- WebhooksConnectors.json.get(web).map { connector =>
- (StatusCodes.OK, Map("message" -> "Ok"))
- }.getOrElse {
- val message = s"webhooks connection for ${web} is not supported."
- (StatusCodes.NotFound, Map("message" -> message))
- }
- }
- }
-
- def postForm(
- appId: Int,
- channelId: Option[Int],
- web: String,
- data: FormData,
- eventClient: LEvents,
- log: LoggingAdapter,
- stats: Boolean,
- statsActorRef: ActorSelection
- )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
- val eventFuture = Future {
- WebhooksConnectors.form.get(web).map { connector =>
- ConnectorUtil.toEvent(connector, data.fields.toMap)
- }
- }
-
- eventFuture.flatMap { eventOpt =>
- if (eventOpt.isEmpty) {
- Future {
- val message = s"webhooks connection for ${web} is not supported."
- (StatusCodes.NotFound, Map("message" -> message))
- }
- } else {
- val event = eventOpt.get
- val data = eventClient.futureInsert(event, appId, channelId).map { id =>
- val result = (StatusCodes.Created, Map("eventId" -> s"${id}"))
-
- if (stats) {
- statsActorRef ! Bookkeeping(appId, result._1, event)
- }
- result
- }
- data
- }
- }
- }
-
- def getForm(
- appId: Int,
- channelId: Option[Int],
- web: String,
- log: LoggingAdapter
- )(implicit ec: ExecutionContext): Future[(StatusCode, Map[String, String])] = {
- Future {
- WebhooksConnectors.form.get(web).map { connector =>
- (StatusCodes.OK, Map("message" -> "Ok"))
- }.getOrElse {
- val message = s"webhooks connection for ${web} is not supported."
- (StatusCodes.NotFound, Map("message" -> message))
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala b/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala
deleted file mode 100644
index 97c9775..0000000
--- a/data/src/main/scala/io/prediction/data/api/WebhooksConnectors.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.api
-
-import io.prediction.data.webhooks.JsonConnector
-import io.prediction.data.webhooks.FormConnector
-
-import io.prediction.data.webhooks.segmentio.SegmentIOConnector
-import io.prediction.data.webhooks.mailchimp.MailChimpConnector
-
-private[prediction] object WebhooksConnectors {
-
- val json: Map[String, JsonConnector] = Map(
- "segmentio" -> SegmentIOConnector
- )
-
- val form: Map[String, FormConnector] = Map(
- "mailchimp" -> MailChimpConnector
- )
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/package.scala b/data/src/main/scala/io/prediction/data/package.scala
deleted file mode 100644
index afbe573..0000000
--- a/data/src/main/scala/io/prediction/data/package.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction
-
-/** Provides data access for PredictionIO and any engines running on top of
- * PredictionIO
- */
-package object data {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala b/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala
deleted file mode 100644
index f197e78..0000000
--- a/data/src/main/scala/io/prediction/data/storage/AccessKeys.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import java.security.SecureRandom
-
-import io.prediction.annotation.DeveloperApi
-import org.apache.commons.codec.binary.Base64
-
-/** :: DeveloperApi ::
- * Stores mapping of access keys, app IDs, and lists of allowed event names
- *
- * @param key Access key
- * @param appid App ID
- * @param events List of allowed events for this particular app key
- * @group Meta Data
- */
-@DeveloperApi
-case class AccessKey(
- key: String,
- appid: Int,
- events: Seq[String])
-
-/** :: DeveloperApi ::
- * Base trait of the [[AccessKey]] data access object
- *
- * @group Meta Data
- */
-@DeveloperApi
-trait AccessKeys {
- /** Insert a new [[AccessKey]]. If the key field is empty, a key will be
- * generated.
- */
- def insert(k: AccessKey): Option[String]
-
- /** Get an [[AccessKey]] by key */
- def get(k: String): Option[AccessKey]
-
- /** Get all [[AccessKey]]s */
- def getAll(): Seq[AccessKey]
-
- /** Get all [[AccessKey]]s for a particular app ID */
- def getByAppid(appid: Int): Seq[AccessKey]
-
- /** Update an [[AccessKey]] */
- def update(k: AccessKey): Unit
-
- /** Delete an [[AccessKey]] */
- def delete(k: String): Unit
-
- /** Default implementation of key generation */
- def generateKey: String = {
- val sr = SecureRandom.getInstanceStrong
- val srBytes = Array.fill(48)(0.toByte)
- sr.nextBytes(srBytes)
- Base64.encodeBase64URLSafeString(srBytes)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Apps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/Apps.scala b/data/src/main/scala/io/prediction/data/storage/Apps.scala
deleted file mode 100644
index 32343e1..0000000
--- a/data/src/main/scala/io/prediction/data/storage/Apps.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import io.prediction.annotation.DeveloperApi
-
-/** :: DeveloperApi ::
- * Stores mapping of app IDs and names
- *
- * @param id ID of the app.
- * @param name Name of the app.
- * @param description Long description of the app.
- * @group Meta Data
- */
-@DeveloperApi
-case class App(
- id: Int,
- name: String,
- description: Option[String])
-
-/** :: DeveloperApi ::
- * Base trait of the [[App]] data access object
- *
- * @group Meta Data
- */
-@DeveloperApi
-trait Apps {
- /** Insert a new [[App]]. Returns a generated app ID if the supplied app ID is 0. */
- def insert(app: App): Option[Int]
-
- /** Get an [[App]] by app ID */
- def get(id: Int): Option[App]
-
- /** Get an [[App]] by app name */
- def getByName(name: String): Option[App]
-
- /** Get all [[App]]s */
- def getAll(): Seq[App]
-
- /** Update an [[App]] */
- def update(app: App): Unit
-
- /** Delete an [[App]] */
- def delete(id: Int): Unit
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/BiMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/BiMap.scala b/data/src/main/scala/io/prediction/data/storage/BiMap.scala
deleted file mode 100644
index cbf3e12..0000000
--- a/data/src/main/scala/io/prediction/data/storage/BiMap.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import scala.collection.immutable.HashMap
-
-import org.apache.spark.rdd.RDD
-
-/** Immutable Bi-directional Map
- *
- */
-class BiMap[K, V] private[prediction] (
- private val m: Map[K, V],
- private val i: Option[BiMap[V, K]] = None
- ) extends Serializable {
-
- // NOTE: make inverse's inverse point back to current BiMap
- val inverse: BiMap[V, K] = i.getOrElse {
- val rev = m.map(_.swap)
- require((rev.size == m.size),
- s"Failed to create reversed map. Cannot have duplicated values.")
- new BiMap(rev, Some(this))
- }
-
- def get(k: K): Option[V] = m.get(k)
-
- def getOrElse(k: K, default: => V): V = m.getOrElse(k, default)
-
- def contains(k: K): Boolean = m.contains(k)
-
- def apply(k: K): V = m.apply(k)
-
- /** Converts to a map.
- * @return a map of type immutable.Map[K, V]
- */
- def toMap: Map[K, V] = m
-
- /** Converts to a sequence.
- * @return a sequence containing all elements of this map
- */
- def toSeq: Seq[(K, V)] = m.toSeq
-
- def size: Int = m.size
-
- def take(n: Int): BiMap[K, V] = BiMap(m.take(n))
-
- override def toString: String = m.toString
-}
-
-object BiMap {
-
- def apply[K, V](x: Map[K, V]): BiMap[K, V] = new BiMap(x)
-
- /** Create a BiMap[String, Long] from a set of String. The Long index starts
- * from 0.
- * @param keys a set of String
- * @return a String to Long BiMap
- */
- def stringLong(keys: Set[String]): BiMap[String, Long] = {
- val hm = HashMap(keys.toSeq.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*)
- new BiMap(hm)
- }
-
- /** Create a BiMap[String, Long] from an array of String.
- * NOTE: the the array cannot have duplicated element.
- * The Long index starts from 0.
- * @param keys a set of String
- * @return a String to Long BiMap
- */
- def stringLong(keys: Array[String]): BiMap[String, Long] = {
- val hm = HashMap(keys.zipWithIndex.map(t => (t._1, t._2.toLong)) : _*)
- new BiMap(hm)
- }
-
- /** Create a BiMap[String, Long] from RDD[String]. The Long index starts
- * from 0.
- * @param keys RDD of String
- * @return a String to Long BiMap
- */
- def stringLong(keys: RDD[String]): BiMap[String, Long] = {
- stringLong(keys.distinct.collect)
- }
-
- /** Create a BiMap[String, Int] from a set of String. The Int index starts
- * from 0.
- * @param keys a set of String
- * @return a String to Int BiMap
- */
- def stringInt(keys: Set[String]): BiMap[String, Int] = {
- val hm = HashMap(keys.toSeq.zipWithIndex : _*)
- new BiMap(hm)
- }
-
- /** Create a BiMap[String, Int] from an array of String.
- * NOTE: the the array cannot have duplicated element.
- * The Int index starts from 0.
- * @param keys a set of String
- * @return a String to Int BiMap
- */
- def stringInt(keys: Array[String]): BiMap[String, Int] = {
- val hm = HashMap(keys.zipWithIndex : _*)
- new BiMap(hm)
- }
-
- /** Create a BiMap[String, Int] from RDD[String]. The Int index starts
- * from 0.
- * @param keys RDD of String
- * @return a String to Int BiMap
- */
- def stringInt(keys: RDD[String]): BiMap[String, Int] = {
- stringInt(keys.distinct.collect)
- }
-
- private[this] def stringDoubleImpl(keys: Seq[String])
- : BiMap[String, Double] = {
- val ki = keys.zipWithIndex.map(e => (e._1, e._2.toDouble))
- new BiMap(HashMap(ki : _*))
- }
-
- /** Create a BiMap[String, Double] from a set of String. The Double index
- * starts from 0.
- * @param keys a set of String
- * @return a String to Double BiMap
- */
- def stringDouble(keys: Set[String]): BiMap[String, Double] = {
- // val hm = HashMap(keys.toSeq.zipWithIndex.map(_.toDouble) : _*)
- // new BiMap(hm)
- stringDoubleImpl(keys.toSeq)
- }
-
- /** Create a BiMap[String, Double] from an array of String.
- * NOTE: the the array cannot have duplicated element.
- * The Double index starts from 0.
- * @param keys a set of String
- * @return a String to Double BiMap
- */
- def stringDouble(keys: Array[String]): BiMap[String, Double] = {
- // val hm = HashMap(keys.zipWithIndex.mapValues(_.toDouble) : _*)
- // new BiMap(hm)
- stringDoubleImpl(keys.toSeq)
- }
-
- /** Create a BiMap[String, Double] from RDD[String]. The Double index starts
- * from 0.
- * @param keys RDD of String
- * @return a String to Double BiMap
- */
- def stringDouble(keys: RDD[String]): BiMap[String, Double] = {
- stringDoubleImpl(keys.distinct.collect)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/Channels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/Channels.scala b/data/src/main/scala/io/prediction/data/storage/Channels.scala
deleted file mode 100644
index 3fa7aef..0000000
--- a/data/src/main/scala/io/prediction/data/storage/Channels.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import io.prediction.annotation.DeveloperApi
-
-/** :: DeveloperApi ::
- * Stores mapping of channel IDs, names and app ID
- *
- * @param id ID of the channel
- * @param name Name of the channel (must be unique within the same app)
- * @param appid ID of the app which this channel belongs to
- * @group Meta Data
- */
-@DeveloperApi
-case class Channel(
- id: Int,
- name: String, // must be unique within the same app
- appid: Int
-) {
- require(Channel.isValidName(name),
- "Invalid channel name: ${name}. ${Channel.nameConstraint}")
-}
-
-/** :: DeveloperApi ::
- * Companion object of [[Channel]]
- *
- * @group Meta Data
- */
-@DeveloperApi
-object Channel {
- /** Examine whether the supplied channel name is valid. A valid channel name
- * must consists of 1 to 16 alphanumeric and '-' characters.
- *
- * @param s Channel name to examine
- * @return true if channel name is valid, false otherwise
- */
- def isValidName(s: String): Boolean = {
- // note: update channelNameConstraint if this rule is changed
- s.matches("^[a-zA-Z0-9-]{1,16}$")
- }
-
- /** For consistent error message display */
- val nameConstraint: String =
- "Only alphanumeric and - characters are allowed and max length is 16."
-}
-
-/** :: DeveloperApi ::
- * Base trait of the [[Channel]] data access object
- *
- * @group Meta Data
- */
-@DeveloperApi
-trait Channels {
- /** Insert a new [[Channel]]. Returns a generated channel ID if original ID is 0. */
- def insert(channel: Channel): Option[Int]
-
- /** Get a [[Channel]] by channel ID */
- def get(id: Int): Option[Channel]
-
- /** Get all [[Channel]] by app ID */
- def getByAppid(appid: Int): Seq[Channel]
-
- /** Delete a [[Channel]] */
- def delete(id: Int): Unit
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/storage/DataMap.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/storage/DataMap.scala b/data/src/main/scala/io/prediction/data/storage/DataMap.scala
deleted file mode 100644
index 91a0ba5..0000000
--- a/data/src/main/scala/io/prediction/data/storage/DataMap.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import org.json4s._
-import org.json4s.native.JsonMethods.parse
-
-import scala.collection.GenTraversableOnce
-import scala.collection.JavaConversions
-
-/** Exception class for [[DataMap]]
- *
- * @group Event Data
- */
-case class DataMapException(msg: String, cause: Exception)
- extends Exception(msg, cause) {
- def this(msg: String) = this(msg, null)
-}
-
-/** A DataMap stores properties of the event or entity. Internally it is a Map
- * whose keys are property names and values are corresponding JSON values
- * respectively. Use the [[get]] method to retrieve the value of a mandatory
- * property or use [[getOpt]] to retrieve the value of an optional property.
- *
- * @param fields Map of property name to JValue
- * @group Event Data
- */
-class DataMap (
- val fields: Map[String, JValue]
-) extends Serializable {
- @transient lazy implicit private val formats = DefaultFormats +
- new DateTimeJson4sSupport.Serializer
-
- /** Check the existence of a required property name. Throw an exception if
- * it does not exist.
- *
- * @param name The property name
- */
- def require(name: String): Unit = {
- if (!fields.contains(name)) {
- throw new DataMapException(s"The field $name is required.")
- }
- }
-
- /** Check if this DataMap contains a specific property.
- *
- * @param name The property name
- * @return Return true if the property exists, else false.
- */
- def contains(name: String): Boolean = {
- fields.contains(name)
- }
-
- /** Get the value of a mandatory property. Exception is thrown if the property
- * does not exist.
- *
- * @tparam T The type of the property value
- * @param name The property name
- * @return Return the property value of type T
- */
- def get[T: Manifest](name: String): T = {
- require(name)
- fields(name) match {
- case JNull => throw new DataMapException(
- s"The required field $name cannot be null.")
- case x: JValue => x.extract[T]
- }
- }
-
- /** Get the value of an optional property. Return None if the property does
- * not exist.
- *
- * @tparam T The type of the property value
- * @param name The property name
- * @return Return the property value of type Option[T]
- */
- def getOpt[T: Manifest](name: String): Option[T] = {
- // either the field doesn't exist or its value is null
- fields.get(name).flatMap(_.extract[Option[T]])
- }
-
- /** Get the value of an optional property. Return default value if the
- * property does not exist.
- *
- * @tparam T The type of the property value
- * @param name The property name
- * @param default The default property value of type T
- * @return Return the property value of type T
- */
- def getOrElse[T: Manifest](name: String, default: T): T = {
- getOpt[T](name).getOrElse(default)
- }
-
- /** Java-friendly method for getting the value of a property. Return null if the
- * property does not exist.
- *
- * @tparam T The type of the property value
- * @param name The property name
- * @param clazz The class of the type of the property value
- * @return Return the property value of type T
- */
- def get[T](name: String, clazz: java.lang.Class[T]): T = {
- val manifest = new Manifest[T] {
- override def erasure: Class[_] = clazz
- override def runtimeClass: Class[_] = clazz
- }
-
- fields.get(name) match {
- case None => null.asInstanceOf[T]
- case Some(JNull) => null.asInstanceOf[T]
- case Some(x) => x.extract[T](formats, manifest)
- }
- }
-
- /** Java-friendly method for getting a list of values of a property. Return null if the
- * property does not exist.
- *
- * @param name The property name
- * @return Return the list of property values
- */
- def getStringList(name: String): java.util.List[String] = {
- fields.get(name) match {
- case None => null
- case Some(JNull) => null
- case Some(x) =>
- JavaConversions.seqAsJavaList(x.extract[List[String]](formats, manifest[List[String]]))
- }
- }
-
- /** Return a new DataMap with elements containing elements from the left hand
- * side operand followed by elements from the right hand side operand.
- *
- * @param that Right hand side DataMap
- * @return A new DataMap
- */
- def ++ (that: DataMap): DataMap = DataMap(this.fields ++ that.fields)
-
- /** Creates a new DataMap from this DataMap by removing all elements of
- * another collection.
- *
- * @param that A collection containing the removed property names
- * @return A new DataMap
- */
- def -- (that: GenTraversableOnce[String]): DataMap =
- DataMap(this.fields -- that)
-
- /** Tests whether the DataMap is empty.
- *
- * @return true if the DataMap is empty, false otherwise.
- */
- def isEmpty: Boolean = fields.isEmpty
-
- /** Collects all property names of this DataMap in a set.
- *
- * @return a set containing all property names of this DataMap.
- */
- def keySet: Set[String] = this.fields.keySet
-
- /** Converts this DataMap to a List.
- *
- * @return a list of (property name, JSON value) tuples.
- */
- def toList(): List[(String, JValue)] = fields.toList
-
- /** Converts this DataMap to a JObject.
- *
- * @return the JObject initialized by this DataMap.
- */
- def toJObject(): JObject = JObject(toList())
-
- /** Converts this DataMap to case class of type T.
- *
- * @return the object of type T.
- */
- def extract[T: Manifest]: T = {
- toJObject().extract[T]
- }
-
- override
- def toString: String = s"DataMap($fields)"
-
- override
- def hashCode: Int = 41 + fields.hashCode
-
- override
- def equals(other: Any): Boolean = other match {
- case that: DataMap => that.canEqual(this) && this.fields.equals(that.fields)
- case _ => false
- }
-
- def canEqual(other: Any): Boolean = other.isInstanceOf[DataMap]
-}
-
-/** Companion object of the [[DataMap]] class
- *
- * @group Event Data
- */
-object DataMap {
- /** Create an empty DataMap
- * @return an empty DataMap
- */
- def apply(): DataMap = new DataMap(Map[String, JValue]())
-
- /** Create an DataMap from a Map of String to JValue
- * @param fields a Map of String to JValue
- * @return a new DataMap initialized by fields
- */
- def apply(fields: Map[String, JValue]): DataMap = new DataMap(fields)
-
- /** Create an DataMap from a JObject
- * @param jObj JObject
- * @return a new DataMap initialized by a JObject
- */
- def apply(jObj: JObject): DataMap = {
- if (jObj == null) {
- apply()
- } else {
- new DataMap(jObj.obj.toMap)
- }
- }
-
- /** Create an DataMap from a JSON String
- * @param js JSON String. eg """{ "a": 1, "b": "foo" }"""
- * @return a new DataMap initialized by a JSON string
- */
- def apply(js: String): DataMap = apply(parse(js).asInstanceOf[JObject])
-
-}