You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:39 UTC
[08/34] incubator-predictionio git commit: rename all except examples
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala b/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala
new file mode 100644
index 0000000..5bd7478
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala
@@ -0,0 +1,200 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.view
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Storage
+
+import org.joda.time.DateTime
+import scala.language.implicitConversions
+
+import scala.concurrent.ExecutionContext.Implicits.global // TODO
+
+@deprecated("Use LEvents or LEventStore instead.", "0.9.2")
+object ViewPredicates {
+ def getStartTimePredicate(startTimeOpt: Option[DateTime])
+ : (Event => Boolean) = {
+ startTimeOpt.map(getStartTimePredicate).getOrElse(_ => true)
+ }
+
+ def getStartTimePredicate(startTime: DateTime): (Event => Boolean) = {
+ e => (!(e.eventTime.isBefore(startTime) || e.eventTime.isEqual(startTime)))
+ }
+
+ def getUntilTimePredicate(untilTimeOpt: Option[DateTime])
+ : (Event => Boolean) = {
+ untilTimeOpt.map(getUntilTimePredicate).getOrElse(_ => true)
+ }
+
+ def getUntilTimePredicate(untilTime: DateTime): (Event => Boolean) = {
+ _.eventTime.isBefore(untilTime)
+ }
+
+ def getEntityTypePredicate(entityTypeOpt: Option[String]): (Event => Boolean)
+ = {
+ entityTypeOpt.map(getEntityTypePredicate).getOrElse(_ => true)
+ }
+
+ def getEntityTypePredicate(entityType: String): (Event => Boolean) = {
+ (_.entityType == entityType)
+ }
+
+ def getEventPredicate(eventOpt: Option[String]): (Event => Boolean)
+ = {
+ eventOpt.map(getEventPredicate).getOrElse(_ => true)
+ }
+
+ def getEventPredicate(event: String): (Event => Boolean) = {
+ (_.event == event)
+ }
+}
+
+@deprecated("Use LEvents instead.", "0.9.2")
+object ViewAggregators {
+ def getDataMapAggregator(): ((Option[DataMap], Event) => Option[DataMap]) = {
+ (p, e) => {
+ e.event match {
+ case "$set" => {
+ if (p == None) {
+ Some(e.properties)
+ } else {
+ p.map(_ ++ e.properties)
+ }
+ }
+ case "$unset" => {
+ if (p == None) {
+ None
+ } else {
+ p.map(_ -- e.properties.keySet)
+ }
+ }
+ case "$delete" => None
+ case _ => p // do nothing for others
+ }
+ }
+ }
+}
+
+@deprecated("Use LEvents instead.", "0.9.2")
+object EventSeq {
+ // Need to
+ // >>> import scala.language.implicitConversions
+ // to enable implicit conversion. Only import in the code where this is
+ // necessary to avoid confusion.
+ implicit def eventSeqToList(es: EventSeq): List[Event] = es.events
+ implicit def listToEventSeq(l: List[Event]): EventSeq = new EventSeq(l)
+}
+
+
+@deprecated("Use LEvents instead.", "0.9.2")
+class EventSeq(val events: List[Event]) {
+ def filter(
+ eventOpt: Option[String] = None,
+ entityTypeOpt: Option[String] = None,
+ startTimeOpt: Option[DateTime] = None,
+ untilTimeOpt: Option[DateTime] = None): EventSeq = {
+
+ events
+ .filter(ViewPredicates.getEventPredicate(eventOpt))
+ .filter(ViewPredicates.getStartTimePredicate(startTimeOpt))
+ .filter(ViewPredicates.getUntilTimePredicate(untilTimeOpt))
+ .filter(ViewPredicates.getEntityTypePredicate(entityTypeOpt))
+ }
+
+ def filter(p: (Event => Boolean)): EventSeq = events.filter(p)
+
+ def aggregateByEntityOrdered[T](init: T, op: (T, Event) => T)
+ : Map[String, T] = {
+ events
+ .groupBy( _.entityId )
+ .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op))
+ .toMap
+ }
+
+
+}
+
+
+@deprecated("Use LEventStore instead.", "0.9.2")
+class LBatchView(
+ val appId: Int,
+ val startTime: Option[DateTime],
+ val untilTime: Option[DateTime]) {
+
+ @transient lazy val eventsDb = Storage.getLEvents()
+
+ @transient lazy val _events = eventsDb.find(
+ appId = appId,
+ startTime = startTime,
+ untilTime = untilTime).toList
+
+ @transient lazy val events: EventSeq = new EventSeq(_events)
+
+ /* Aggregate event data
+ *
+ * @param entityType only aggregate event with entityType
+ * @param startTimeOpt if specified, only aggregate event after (inclusive)
+ * startTimeOpt
+ * @param untilTimeOpt if specified, only aggregate event until (exclusive)
+ * endTimeOpt
+ */
+ def aggregateProperties(
+ entityType: String,
+ startTimeOpt: Option[DateTime] = None,
+ untilTimeOpt: Option[DateTime] = None
+ ): Map[String, DataMap] = {
+
+ events
+ .filter(entityTypeOpt = Some(entityType))
+ .filter(e => EventValidation.isSpecialEvents(e.event))
+ .aggregateByEntityOrdered(
+ init = None,
+ op = ViewAggregators.getDataMapAggregator())
+ .filter{ case (k, v) => (v != None) }
+ .mapValues(_.get)
+
+ }
+
+ /*
+ def aggregateByEntityOrdered[T](
+ predicate: Event => Boolean,
+ init: T,
+ op: (T, Event) => T): Map[String, T] = {
+
+ _events
+ .filter( predicate(_) )
+ .groupBy( _.entityId )
+ .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op))
+ .toMap
+
+ }
+ */
+
+ /*
+ def groupByEntityOrdered[T](
+ predicate: Event => Boolean,
+ map: Event => T): Map[String, Seq[T]] = {
+
+ _events
+ .filter( predicate(_) )
+ .groupBy( _.entityId )
+ .mapValues( _.sortBy(_.eventTime.getMillis).map(map(_)) )
+ .toMap
+ }
+ */
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala b/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
new file mode 100644
index 0000000..6c75402
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
@@ -0,0 +1,209 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.view
+
+import org.apache.predictionio.data.storage.hbase.HBPEvents
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Storage
+
+import org.joda.time.DateTime
+
+import org.json4s.JValue
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+
+// each JValue data associated with the time it is set
+private[prediction] case class PropTime(val d: JValue, val t: Long) extends Serializable
+
+private[prediction] case class SetProp (
+ val fields: Map[String, PropTime],
+ // last set time. Note: fields could be empty with valid set time
+ val t: Long) extends Serializable {
+
+ def ++ (that: SetProp): SetProp = {
+ val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+ val common: Map[String, PropTime] = commonKeys.map { k =>
+ val thisData = this.fields(k)
+ val thatData = that.fields(k)
+ // only keep the value with latest time
+ val v = if (thisData.t > thatData.t) thisData else thatData
+ (k, v)
+ }.toMap
+
+ val combinedFields = common ++
+ (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+ // keep the latest set time
+ val combinedT = if (this.t > that.t) this.t else that.t
+
+ SetProp(
+ fields = combinedFields,
+ t = combinedT
+ )
+ }
+}
+
+private[prediction] case class UnsetProp (fields: Map[String, Long]) extends Serializable {
+ def ++ (that: UnsetProp): UnsetProp = {
+ val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+ val common: Map[String, Long] = commonKeys.map { k =>
+ val thisData = this.fields(k)
+ val thatData = that.fields(k)
+ // only keep the value with latest time
+ val v = if (thisData > thatData) thisData else thatData
+ (k, v)
+ }.toMap
+
+ val combinedFields = common ++
+ (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+ UnsetProp(
+ fields = combinedFields
+ )
+ }
+}
+
+private[prediction] case class DeleteEntity (t: Long) extends Serializable {
+ def ++ (that: DeleteEntity): DeleteEntity = {
+ if (this.t > that.t) this else that
+ }
+}
+
+private[prediction] case class EventOp (
+ val setProp: Option[SetProp] = None,
+ val unsetProp: Option[UnsetProp] = None,
+ val deleteEntity: Option[DeleteEntity] = None
+) extends Serializable {
+
+ def ++ (that: EventOp): EventOp = {
+ EventOp(
+ setProp = (setProp ++ that.setProp).reduceOption(_ ++ _),
+ unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _),
+ deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _)
+ )
+ }
+
+ def toDataMap(): Option[DataMap] = {
+ setProp.flatMap { set =>
+
+ val unsetKeys: Set[String] = unsetProp.map( unset =>
+ unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet
+ ).getOrElse(Set())
+
+ val combinedFields = deleteEntity.map { delete =>
+ if (delete.t >= set.t) {
+ None
+ } else {
+ val deleteKeys: Set[String] = set.fields
+ .filter { case (k, PropTime(kv, t)) =>
+ (delete.t >= t)
+ }.keySet
+ Some(set.fields -- unsetKeys -- deleteKeys)
+ }
+ }.getOrElse{
+ Some(set.fields -- unsetKeys)
+ }
+
+ // Note: mapValues() doesn't return concrete Map and causes
+ // NotSerializableException issue. Use map(identity) to work around this.
+ // see https://issues.scala-lang.org/browse/SI-7005
+ combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity)))
+ }
+ }
+
+}
+
+private[prediction] object EventOp {
+ def apply(e: Event): EventOp = {
+ val t = e.eventTime.getMillis
+ e.event match {
+ case "$set" => {
+ val fields = e.properties.fields.mapValues(jv =>
+ PropTime(jv, t)
+ ).map(identity)
+
+ EventOp(
+ setProp = Some(SetProp(fields = fields, t = t))
+ )
+ }
+ case "$unset" => {
+ val fields = e.properties.fields.mapValues(jv => t).map(identity)
+ EventOp(
+ unsetProp = Some(UnsetProp(fields = fields))
+ )
+ }
+ case "$delete" => {
+ EventOp(
+ deleteEntity = Some(DeleteEntity(t))
+ )
+ }
+ case _ => {
+ EventOp()
+ }
+ }
+ }
+}
+
+@deprecated("Use PEvents or PEventStore instead.", "0.9.2")
+class PBatchView(
+ val appId: Int,
+ val startTime: Option[DateTime],
+ val untilTime: Option[DateTime],
+ val sc: SparkContext) {
+
+ // NOTE: parallel Events DB interface
+ @transient lazy val eventsDb = Storage.getPEvents()
+
+ @transient lazy val _events: RDD[Event] =
+ eventsDb.getByAppIdAndTimeAndEntity(
+ appId = appId,
+ startTime = startTime,
+ untilTime = untilTime,
+ entityType = None,
+ entityId = None)(sc)
+
+ // TODO: change to use EventSeq?
+ @transient lazy val events: RDD[Event] = _events
+
+ def aggregateProperties(
+ entityType: String,
+ startTimeOpt: Option[DateTime] = None,
+ untilTimeOpt: Option[DateTime] = None
+ ): RDD[(String, DataMap)] = {
+
+ _events
+ .filter( e => ((e.entityType == entityType) &&
+ (EventValidation.isSpecialEvents(e.event))) )
+ .map( e => (e.entityId, EventOp(e) ))
+ .aggregateByKey[EventOp](EventOp())(
+ // within same partition
+ seqOp = { case (u, v) => u ++ v },
+ // across partition
+ combOp = { case (accu, u) => accu ++ u }
+ )
+ .mapValues(_.toDataMap)
+ .filter{ case (k, v) => v.isDefined }
+ .map{ case (k, v) => (k, v.get) }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala b/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala
new file mode 100644
index 0000000..eba3276
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala
@@ -0,0 +1,94 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.view
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Storage
+
+import scala.concurrent.ExecutionContext.Implicits.global // TODO
+
+import grizzled.slf4j.Logger
+import org.joda.time.DateTime
+
+import scala.language.implicitConversions
+
+class TestHBLEvents() {
+ @transient lazy val eventsDb = Storage.getLEvents()
+
+ def run(): Unit = {
+ val r = eventsDb.find(
+ appId = 1,
+ startTime = None,
+ untilTime = None,
+ entityType = Some("pio_user"),
+ entityId = Some("3")).toList
+ println(r)
+ }
+}
+
+class TestSource(val appId: Int) {
+ @transient lazy val logger = Logger[this.type]
+ @transient lazy val batchView = new LBatchView(appId,
+ None, None)
+
+ def run(): Unit = {
+ println(batchView.events)
+ }
+}
+
+object QuickTest {
+
+ def main(args: Array[String]) {
+ val t = new TestHBLEvents()
+ t.run()
+
+ // val ts = new TestSource(args(0).toInt)
+ // ts.run()
+ }
+}
+
+object TestEventTime {
+ @transient lazy val batchView = new LBatchView(9, None, None)
+
+ // implicit def back2list(es: EventSeq) = es.events
+
+ def main(args: Array[String]) {
+ val e = batchView.events.filter(
+ eventOpt = Some("rate"),
+ startTimeOpt = Some(new DateTime(1998, 1, 1, 0, 0))
+ // untilTimeOpt = Some(new DateTime(1997, 1, 1, 0, 0))
+ )
+ // untilTimeOpt = Some(new DateTime(2000, 1, 1, 0, 0)))
+
+ e.foreach { println }
+ println()
+ println()
+ println()
+ val u = batchView.aggregateProperties("pio_item")
+ u.foreach { println }
+ println()
+ println()
+ println()
+
+ // val l: Seq[Event] = e
+ val l = e.map { _.entityId }
+ l.foreach { println }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala
new file mode 100644
index 0000000..ee47a9c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala
@@ -0,0 +1,31 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.webhooks
+
+/** Webhooks Connnector Exception
+ *
+ * @param message the detail message
+ * @param cause the cause
+ */
+private[prediction] class ConnectorException(message: String, cause: Throwable)
+ extends Exception(message, cause) {
+
+ /** Webhooks Connnector Exception with cause being set to null
+ *
+ * @param message the detail message
+ */
+ def this(message: String) = this(message, null)
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala
new file mode 100644
index 0000000..40feb98
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala
@@ -0,0 +1,46 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.webhooks
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventJson4sSupport
+
+import org.json4s.Formats
+import org.json4s.DefaultFormats
+import org.json4s.JObject
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+
+private[prediction] object ConnectorUtil {
+
+ implicit val eventJson4sFormats: Formats = DefaultFormats +
+ new EventJson4sSupport.APISerializer
+
+ // intentionally use EventJson4sSupport.APISerializer to convert
+ // from JSON to Event object. Don't allow connector directly create
+ // Event object so that the Event object formation is consistent
+ // by enforcing JSON format
+
+ def toEvent(connector: JsonConnector, data: JObject): Event = {
+ read[Event](write(connector.toEventJson(data)))
+ }
+
+ def toEvent(connector: FormConnector, data: Map[String, String]): Event = {
+ read[Event](write(connector.toEventJson(data)))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala
new file mode 100644
index 0000000..dd04a21
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala
@@ -0,0 +1,32 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.webhooks
+
+import org.json4s.JObject
+
+/** Connector for Webhooks connection with Form submission data format
+ */
+private[prediction] trait FormConnector {
+
+ // TODO: support conversion to multiple events?
+
+ /** Convert from original Form submission data to Event JObject
+ * @param data Map of key-value pairs in String type received through webhooks
+ * @return Event JObject
+ */
+ def toEventJson(data: Map[String, String]): JObject
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala
new file mode 100644
index 0000000..eda8059
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala
@@ -0,0 +1,31 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.webhooks
+
+import org.json4s.JObject
+
+/** Connector for Webhooks connection */
+private[prediction] trait JsonConnector {
+
+ // TODO: support conversion to multiple events?
+
+ /** Convert from original JObject to Event JObject
+ * @param data original JObject recevived through webhooks
+ * @return Event JObject
+ */
+ def toEventJson(data: JObject): JObject
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala
new file mode 100644
index 0000000..adf8791
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala
@@ -0,0 +1,123 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.webhooks.exampleform
+
+import org.apache.predictionio.data.webhooks.FormConnector
+import org.apache.predictionio.data.webhooks.ConnectorException
+
+import org.json4s.JObject
+
+
+/** Example FormConnector with following types of webhook form data inputs:
+ *
+ * UserAction
+ *
+ * "type"="userAction"
+ * "userId"="as34smg4",
+ * "event"="do_something",
+ * "context[ip]"="24.5.68.47", // optional
+ * "context[prop1]"="2.345", // optional
+ * "context[prop2]"="value1" // optional
+ * "anotherProperty1"="100",
+ * "anotherProperty2"="optional1", // optional
+ * "timestamp"="2015-01-02T00:30:12.984Z"
+ *
+ * UserActionItem
+ *
+ * "type"="userActionItem"
+ * "userId"="as34smg4",
+ * "event"="do_something_on",
+ * "itemId"="kfjd312bc",
+ * "context[ip]"="1.23.4.56",
+ * "context[prop1]"="2.345",
+ * "context[prop2]"="value1",
+ * "anotherPropertyA"="4.567", // optional
+ * "anotherPropertyB"="false", // optional
+ * "timestamp"="2015-01-15T04:20:23.567Z"
+ *
+ */
+private[prediction] object ExampleFormConnector extends FormConnector {
+
+ override
+ def toEventJson(data: Map[String, String]): JObject = {
+ val json = try {
+ data.get("type") match {
+ case Some("userAction") => userActionToEventJson(data)
+ case Some("userActionItem") => userActionItemToEventJson(data)
+ case Some(x) => throw new ConnectorException(
+ s"Cannot convert unknown type ${x} to event JSON")
+ case None => throw new ConnectorException(
+ s"The field 'type' is required.")
+ }
+ } catch {
+ case e: ConnectorException => throw e
+ case e: Exception => throw new ConnectorException(
+ s"Cannot convert ${data} to event JSON. ${e.getMessage()}", e)
+ }
+ json
+ }
+
+ def userActionToEventJson(data: Map[String, String]): JObject = {
+ import org.json4s.JsonDSL._
+
+ // two level optional data
+ val context = if (data.exists(_._1.startsWith("context["))) {
+ Some(
+ ("ip" -> data.get("context[ip]")) ~
+ ("prop1" -> data.get("context[prop1]").map(_.toDouble)) ~
+ ("prop2" -> data.get("context[prop2]"))
+ )
+ } else {
+ None
+ }
+
+ val json =
+ ("event" -> data("event")) ~
+ ("entityType" -> "user") ~
+ ("entityId" -> data("userId")) ~
+ ("eventTime" -> data("timestamp")) ~
+ ("properties" -> (
+ ("context" -> context) ~
+ ("anotherProperty1" -> data("anotherProperty1").toInt) ~
+ ("anotherProperty2" -> data.get("anotherProperty2"))
+ ))
+ json
+ }
+
+
+ def userActionItemToEventJson(data: Map[String, String]): JObject = {
+ import org.json4s.JsonDSL._
+
+ val json =
+ ("event" -> data("event")) ~
+ ("entityType" -> "user") ~
+ ("entityId" -> data("userId")) ~
+ ("targetEntityType" -> "item") ~
+ ("targetEntityId" -> data("itemId")) ~
+ ("eventTime" -> data("timestamp")) ~
+ ("properties" -> (
+ ("context" -> (
+ ("ip" -> data("context[ip]")) ~
+ ("prop1" -> data("context[prop1]").toDouble) ~
+ ("prop2" -> data("context[prop2]"))
+ )) ~
+ ("anotherPropertyA" -> data.get("anotherPropertyA").map(_.toDouble)) ~
+ ("anotherPropertyB" -> data.get("anotherPropertyB").map(_.toBoolean))
+ ))
+ json
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala
new file mode 100644
index 0000000..2129134
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala
@@ -0,0 +1,153 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.webhooks.examplejson
+
+import org.apache.predictionio.data.webhooks.JsonConnector
+import org.apache.predictionio.data.webhooks.ConnectorException
+
+import org.json4s.Formats
+import org.json4s.DefaultFormats
+import org.json4s.JObject
+
+/** Example JsonConnector with following types of webhooks JSON input:
+ *
+ * UserAction
+ *
+ * {
+ * "type": "userAction"
+ * "userId": "as34smg4",
+ * "event": "do_something",
+ * "context": {
+ * "ip": "24.5.68.47",
+ * "prop1": 2.345,
+ * "prop2": "value1"
+ * },
+ * "anotherProperty1": 100,
+ * "anotherProperty2": "optional1",
+ * "timestamp": "2015-01-02T00:30:12.984Z"
+ * }
+ *
+ * UserActionItem
+ *
+ * {
+ * "type": "userActionItem"
+ * "userId": "as34smg4",
+ * "event": "do_something_on",
+ * "itemId": "kfjd312bc",
+ * "context": {
+ * "ip": "1.23.4.56",
+ * "prop1": 2.345,
+ * "prop2": "value1"
+ * },
+ * "anotherPropertyA": 4.567,
+ * "anotherPropertyB": false,
+ * "timestamp": "2015-01-15T04:20:23.567Z"
+ * }
+ */
+private[prediction] object ExampleJsonConnector extends JsonConnector {
+
+ implicit val json4sFormats: Formats = DefaultFormats
+
+ override def toEventJson(data: JObject): JObject = {
+ val common = try {
+ data.extract[Common]
+ } catch {
+ case e: Exception => throw new ConnectorException(
+ s"Cannot extract Common field from ${data}. ${e.getMessage()}", e)
+ }
+
+ val json = try {
+ common.`type` match {
+ case "userAction" =>
+ toEventJson(common = common, userAction = data.extract[UserAction])
+ case "userActionItem" =>
+ toEventJson(common = common, userActionItem = data.extract[UserActionItem])
+ case x: String =>
+ throw new ConnectorException(
+ s"Cannot convert unknown type '${x}' to Event JSON.")
+ }
+ } catch {
+ case e: ConnectorException => throw e
+ case e: Exception => throw new ConnectorException(
+ s"Cannot convert ${data} to eventJson. ${e.getMessage()}", e)
+ }
+
+ json
+ }
+
+ def toEventJson(common: Common, userAction: UserAction): JObject = {
+ import org.json4s.JsonDSL._
+
+ // map to EventAPI JSON
+ val json =
+ ("event" -> userAction.event) ~
+ ("entityType" -> "user") ~
+ ("entityId" -> userAction.userId) ~
+ ("eventTime" -> userAction.timestamp) ~
+ ("properties" -> (
+ ("context" -> userAction.context) ~
+ ("anotherProperty1" -> userAction.anotherProperty1) ~
+ ("anotherProperty2" -> userAction.anotherProperty2)
+ ))
+ json
+ }
+
+ def toEventJson(common: Common, userActionItem: UserActionItem): JObject = {
+ import org.json4s.JsonDSL._
+
+ // map to EventAPI JSON
+ val json =
+ ("event" -> userActionItem.event) ~
+ ("entityType" -> "user") ~
+ ("entityId" -> userActionItem.userId) ~
+ ("targetEntityType" -> "item") ~
+ ("targetEntityId" -> userActionItem.itemId) ~
+ ("eventTime" -> userActionItem.timestamp) ~
+ ("properties" -> (
+ ("context" -> userActionItem.context) ~
+ ("anotherPropertyA" -> userActionItem.anotherPropertyA) ~
+ ("anotherPropertyB" -> userActionItem.anotherPropertyB)
+ ))
+ json
+ }
+
+ // Common required fields
+ case class Common(
+ `type`: String
+ )
+
+ // User Actions fields
+ case class UserAction (
+ userId: String,
+ event: String,
+ context: Option[JObject],
+ anotherProperty1: Int,
+ anotherProperty2: Option[String],
+ timestamp: String
+ )
+
+ // UserActionItem fields
+ case class UserActionItem (
+ userId: String,
+ event: String,
+ itemId: String,
+ context: JObject,
+ anotherPropertyA: Option[Double],
+ anotherPropertyB: Option[Boolean],
+ timestamp: String
+ )
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala
new file mode 100644
index 0000000..abf8a7f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala
@@ -0,0 +1,305 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.webhooks.mailchimp
+
+import org.apache.predictionio.data.webhooks.FormConnector
+import org.apache.predictionio.data.webhooks.ConnectorException
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.Utils
+
+import org.json4s.JObject
+
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+
+private[prediction] object MailChimpConnector extends FormConnector {
+
+ override
+ def toEventJson(data: Map[String, String]): JObject = {
+
+ val json = data.get("type") match {
+ case Some("subscribe") => subscribeToEventJson(data)
+ // UNSUBSCRIBE
+ case Some("unsubscribe") => unsubscribeToEventJson(data)
+ // PROFILE UPDATES
+ case Some("profile") => profileToEventJson(data)
+ // EMAIL UPDATE
+ case Some("upemail") => upemailToEventJson(data)
+ // CLEANED EMAILS
+ case Some("cleaned") => cleanedToEventJson(data)
+ // CAMPAIGN SENDING STATUS
+ case Some("campaign") => campaignToEventJson(data)
+ // invalid type
+ case Some(x) => throw new ConnectorException(
+ s"Cannot convert unknown MailChimp data type ${x} to event JSON")
+ case None => throw new ConnectorException(
+ s"The field 'type' is required for MailChimp data.")
+ }
+ json
+ }
+
+
+ val mailChimpDateTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
+ .withZone(EventValidation.defaultTimeZone)
+
+ def parseMailChimpDateTime(s: String): DateTime = {
+ mailChimpDateTimeFormat.parseDateTime(s)
+ }
+
+ def subscribeToEventJson(data: Map[String, String]): JObject = {
+
+ import org.json4s.JsonDSL._
+
+ /*
+ "type": "subscribe",
+ "fired_at": "2009-03-26 21:35:57",
+ "data[id]": "8a25ff1d98",
+ "data[list_id]": "a6b5da1054",
+ "data[email]": "api@mailchimp.com",
+ "data[email_type]": "html",
+ "data[merges][EMAIL]": "api@mailchimp.com",
+ "data[merges][FNAME]": "MailChimp",
+ "data[merges][LNAME]": "API",
+ "data[merges][INTERESTS]": "Group1,Group2",
+ "data[ip_opt]": "10.20.10.30",
+ "data[ip_signup]": "10.20.10.30"
+ */
+
+ // convert to ISO8601 format
+ val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+ // TODO: handle optional fields
+ val json =
+ ("event" -> "subscribe") ~
+ ("entityType" -> "user") ~
+ ("entityId" -> data("data[id]")) ~
+ ("targetEntityType" -> "list") ~
+ ("targetEntityId" -> data("data[list_id]")) ~
+ ("eventTime" -> eventTime) ~
+ ("properties" -> (
+ ("email" -> data("data[email]")) ~
+ ("email_type" -> data("data[email_type]")) ~
+ ("merges" -> (
+ ("EMAIL" -> data("data[merges][EMAIL]")) ~
+ ("FNAME" -> data("data[merges][FNAME]"))) ~
+ ("LNAME" -> data("data[merges][LNAME]")) ~
+ ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
+ )) ~
+ ("ip_opt" -> data("data[ip_opt]")) ~
+ ("ip_signup" -> data("data[ip_signup]")
+ ))
+
+ json
+
+ }
+
+ def unsubscribeToEventJson(data: Map[String, String]): JObject = {
+
+ import org.json4s.JsonDSL._
+
+ /*
+ "action" will either be "unsub" or "delete".
+ The reason will be "manual" unless caused by a spam complaint - then it will be "abuse"
+
+ "type": "unsubscribe",
+ "fired_at": "2009-03-26 21:40:57",
+ "data[action]": "unsub",
+ "data[reason]": "manual",
+ "data[id]": "8a25ff1d98",
+ "data[list_id]": "a6b5da1054",
+ "data[email]": "api+unsub@mailchimp.com",
+ "data[email_type]": "html",
+ "data[merges][EMAIL]": "api+unsub@mailchimp.com",
+ "data[merges][FNAME]": "MailChimp",
+ "data[merges][LNAME]": "API",
+ "data[merges][INTERESTS]": "Group1,Group2",
+ "data[ip_opt]": "10.20.10.30",
+ "data[campaign_id]": "cb398d21d2",
+ */
+
+ // convert to ISO8601 format
+ val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+ val json =
+ ("event" -> "unsubscribe") ~
+ ("entityType" -> "user") ~
+ ("entityId" -> data("data[id]")) ~
+ ("targetEntityType" -> "list") ~
+ ("targetEntityId" -> data("data[list_id]")) ~
+ ("eventTime" -> eventTime) ~
+ ("properties" -> (
+ ("action" -> data("data[action]")) ~
+ ("reason" -> data("data[reason]")) ~
+ ("email" -> data("data[email]")) ~
+ ("email_type" -> data("data[email_type]")) ~
+ ("merges" -> (
+ ("EMAIL" -> data("data[merges][EMAIL]")) ~
+ ("FNAME" -> data("data[merges][FNAME]"))) ~
+ ("LNAME" -> data("data[merges][LNAME]")) ~
+ ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
+ )) ~
+ ("ip_opt" -> data("data[ip_opt]")) ~
+ ("campaign_id" -> data("data[campaign_id]")
+ ))
+
+ json
+
+ }
+
+ def profileToEventJson(data: Map[String, String]): JObject = {
+
+ import org.json4s.JsonDSL._
+
+ /*
+ "type": "profile",
+ "fired_at": "2009-03-26 21:31:21",
+ "data[id]": "8a25ff1d98",
+ "data[list_id]": "a6b5da1054",
+ "data[email]": "api@mailchimp.com",
+ "data[email_type]": "html",
+ "data[merges][EMAIL]": "api@mailchimp.com",
+ "data[merges][FNAME]": "MailChimp",
+ "data[merges][LNAME]": "API",
+ "data[merges][INTERESTS]": "Group1,Group2", \\OPTIONAL
+ "data[ip_opt]": "10.20.10.30"
+ */
+
+ // convert to ISO8601 format
+ val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+ val json =
+ ("event" -> "profile") ~
+ ("entityType" -> "user") ~
+ ("entityId" -> data("data[id]")) ~
+ ("targetEntityType" -> "list") ~
+ ("targetEntityId" -> data("data[list_id]")) ~
+ ("eventTime" -> eventTime) ~
+ ("properties" -> (
+ ("email" -> data("data[email]")) ~
+ ("email_type" -> data("data[email_type]")) ~
+ ("merges" -> (
+ ("EMAIL" -> data("data[merges][EMAIL]")) ~
+ ("FNAME" -> data("data[merges][FNAME]"))) ~
+ ("LNAME" -> data("data[merges][LNAME]")) ~
+ ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
+ )) ~
+ ("ip_opt" -> data("data[ip_opt]")
+ ))
+
+ json
+
+ }
+
+ def upemailToEventJson(data: Map[String, String]): JObject = {
+
+ import org.json4s.JsonDSL._
+
+ /*
+ "type": "upemail",
+ "fired_at": "2009-03-26 22:15:09",
+ "data[list_id]": "a6b5da1054",
+ "data[new_id]": "51da8c3259",
+ "data[new_email]": "api+new@mailchimp.com",
+ "data[old_email]": "api+old@mailchimp.com"
+ */
+
+ // convert to ISO8601 format
+ val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+ val json =
+ ("event" -> "upemail") ~
+ ("entityType" -> "user") ~
+ ("entityId" -> data("data[new_id]")) ~
+ ("targetEntityType" -> "list") ~
+ ("targetEntityId" -> data("data[list_id]")) ~
+ ("eventTime" -> eventTime) ~
+ ("properties" -> (
+ ("new_email" -> data("data[new_email]")) ~
+ ("old_email" -> data("data[old_email]"))
+ ))
+
+ json
+
+ }
+
+ def cleanedToEventJson(data: Map[String, String]): JObject = {
+
+ import org.json4s.JsonDSL._
+
+ /*
+ Reason will be one of "hard" (for hard bounces) or "abuse"
+ "type": "cleaned",
+ "fired_at": "2009-03-26 22:01:00",
+ "data[list_id]": "a6b5da1054",
+ "data[campaign_id]": "4fjk2ma9xd",
+ "data[reason]": "hard",
+ "data[email]": "api+cleaned@mailchimp.com"
+ */
+
+ // convert to ISO8601 format
+ val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+ val json =
+ ("event" -> "cleaned") ~
+ ("entityType" -> "list") ~
+ ("entityId" -> data("data[list_id]")) ~
+ ("eventTime" -> eventTime) ~
+ ("properties" -> (
+ ("campaignId" -> data("data[campaign_id]")) ~
+ ("reason" -> data("data[reason]")) ~
+ ("email" -> data("data[email]"))
+ ))
+
+ json
+
+ }
+
+ def campaignToEventJson(data: Map[String, String]): JObject = {
+
+ import org.json4s.JsonDSL._
+
+ /*
+ "type": "campaign",
+ "fired_at": "2009-03-26 21:31:21",
+ "data[id]": "5aa2102003",
+ "data[subject]": "Test Campaign Subject",
+ "data[status]": "sent",
+ "data[reason]": "",
+ "data[list_id]": "a6b5da1054"
+ */
+
+ // convert to ISO8601 format
+ val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+ val json =
+ ("event" -> "campaign") ~
+ ("entityType" -> "campaign") ~
+ ("entityId" -> data("data[id]")) ~
+ ("targetEntityType" -> "list") ~
+ ("targetEntityId" -> data("data[list_id]")) ~
+ ("eventTime" -> eventTime) ~
+ ("properties" -> (
+ ("subject" -> data("data[subject]")) ~
+ ("status" -> data("data[status]")) ~
+ ("reason" -> data("data[reason]"))
+ ))
+
+ json
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala
new file mode 100644
index 0000000..b7548b0
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala
@@ -0,0 +1,306 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.webhooks.segmentio
+
+import org.apache.predictionio.data.webhooks.{ConnectorException, JsonConnector}
+import org.json4s._
+
+private[prediction] object SegmentIOConnector extends JsonConnector {
+
+ // private lazy val supportedAPI = Vector("2", "2.0", "2.0.0")
+
+ implicit val json4sFormats: Formats = DefaultFormats
+
+ override
+ def toEventJson(data: JObject): JObject = {
+ try {
+ val version: String = data.values("version").toString
+/*
+ if (!supportedAPI.contains(version)) {
+ throw new ConnectorException(
+ s"Supported segment.io API versions: [2]. got [$version]"
+ )
+ }
+*/
+ } catch { case _: Throwable \u21d2
+ throw new ConnectorException(s"Failed to get segment.io API version.")
+ }
+
+ val common = try {
+ data.extract[Common]
+ } catch {
+ case e: Throwable \u21d2 throw new ConnectorException(
+ s"Cannot extract Common field from $data. ${e.getMessage}", e
+ )
+ }
+
+ try {
+ common.`type` match {
+ case "identify" \u21d2
+ toEventJson(
+ common = common,
+ identify = data.extract[Events.Identify]
+ )
+
+ case "track" \u21d2
+ toEventJson(
+ common = common,
+ track = data.extract[Events.Track]
+ )
+
+ case "alias" \u21d2
+ toEventJson(
+ common = common,
+ alias = data.extract[Events.Alias]
+ )
+
+ case "page" \u21d2
+ toEventJson(
+ common = common,
+ page = data.extract[Events.Page]
+ )
+
+ case "screen" \u21d2
+ toEventJson(
+ common = common,
+ screen = data.extract[Events.Screen]
+ )
+
+ case "group" \u21d2
+ toEventJson(
+ common = common,
+ group = data.extract[Events.Group]
+ )
+
+ case _ \u21d2
+ throw new ConnectorException(
+ s"Cannot convert unknown type ${common.`type`} to event JSON."
+ )
+ }
+ } catch {
+ case e: ConnectorException => throw e
+ case e: Exception =>
+ throw new ConnectorException(
+ s"Cannot convert $data to event JSON. ${e.getMessage}", e
+ )
+ }
+ }
+
+ def toEventJson(common: Common, identify: Events.Identify ): JObject = {
+ import org.json4s.JsonDSL._
+ val eventProperties = "traits" \u2192 identify.traits
+ toJson(common, eventProperties)
+ }
+
+ def toEventJson(common: Common, track: Events.Track): JObject = {
+ import org.json4s.JsonDSL._
+ val eventProperties =
+ ("properties" \u2192 track.properties) ~
+ ("event" \u2192 track.event)
+ toJson(common, eventProperties)
+ }
+
+ def toEventJson(common: Common, alias: Events.Alias): JObject = {
+ import org.json4s.JsonDSL._
+ toJson(common, "previous_id" \u2192 alias.previous_id)
+ }
+
+ def toEventJson(common: Common, screen: Events.Screen): JObject = {
+ import org.json4s.JsonDSL._
+ val eventProperties =
+ ("name" \u2192 screen.name) ~
+ ("properties" \u2192 screen.properties)
+ toJson(common, eventProperties)
+ }
+
+ def toEventJson(common: Common, page: Events.Page): JObject = {
+ import org.json4s.JsonDSL._
+ val eventProperties =
+ ("name" \u2192 page.name) ~
+ ("properties" \u2192 page.properties)
+ toJson(common, eventProperties)
+ }
+
+ def toEventJson(common: Common, group: Events.Group): JObject = {
+ import org.json4s.JsonDSL._
+ val eventProperties =
+ ("group_id" \u2192 group.group_id) ~
+ ("traits" \u2192 group.traits)
+ toJson(common, eventProperties)
+ }
+
+ private def toJson(common: Common, props: JObject): JsonAST.JObject = {
+ val commonFields = commonToJson(common)
+ JObject(("properties" \u2192 properties(common, props)) :: commonFields.obj)
+ }
+
+ private def properties(common: Common, eventProps: JObject): JObject = {
+ import org.json4s.JsonDSL._
+ common.context map { context \u21d2
+ try {
+ ("context" \u2192 Extraction.decompose(context)) ~ eventProps
+ } catch {
+ case e: Throwable \u21d2
+ throw new ConnectorException(
+ s"Cannot convert $context to event JSON. ${e.getMessage }", e
+ )
+ }
+ } getOrElse eventProps
+ }
+
+ private def commonToJson(common: Common): JObject =
+ commonToJson(common, common.`type`)
+
+ private def commonToJson(common: Common, typ: String): JObject = {
+ import org.json4s.JsonDSL._
+ common.user_id.orElse(common.anonymous_id) match {
+ case Some(userId) \u21d2
+ ("event" \u2192 typ) ~
+ ("entityType" \u2192 "user") ~
+ ("entityId" \u2192 userId) ~
+ ("eventTime" \u2192 common.timestamp)
+
+ case None \u21d2
+ throw new ConnectorException(
+ "there was no `userId` or `anonymousId` in the common fields."
+ )
+ }
+ }
+}
+
+object Events {
+
+ private[prediction] case class Track(
+ event: String,
+ properties: Option[JObject] = None
+ )
+
+ private[prediction] case class Alias(previous_id: String, user_id: String)
+
+ private[prediction] case class Group(
+ group_id: String,
+ traits: Option[JObject] = None
+ )
+
+ private[prediction] case class Screen(
+ name: Option[String] = None,
+ properties: Option[JObject] = None
+ )
+
+ private[prediction] case class Page(
+ name: Option[String] = None,
+ properties: Option[JObject] = None
+ )
+
+ private[prediction] case class Identify(
+ user_id: String,
+ traits: Option[JObject]
+ )
+
+}
+
+object Common {
+
+ private[prediction] case class Integrations(
+ All: Boolean = false,
+ Mixpanel: Boolean = false,
+ Marketo: Boolean = false,
+ Salesforse: Boolean = false
+ )
+
+ private[prediction] case class Context(
+ ip: String,
+ library: Library,
+ user_agent: String,
+ app: Option[App] = None,
+ campaign: Option[Campaign] = None,
+ device: Option[Device] = None,
+ network: Option[Network] = None,
+ location: Option[Location] = None,
+ os: Option[OS] = None,
+ referrer: Option[Referrer] = None,
+ screen: Option[Screen] = None,
+ timezone: Option[String] = None
+ )
+
+ private[prediction] case class Screen(width: Int, height: Int, density: Int)
+
+ private[prediction] case class Referrer(id: String, `type`: String)
+
+ private[prediction] case class OS(name: String, version: String)
+
+ private[prediction] case class Location(
+ city: Option[String] = None,
+ country: Option[String] = None,
+ latitude: Option[Double] = None,
+ longitude: Option[Double] = None,
+ speed: Option[Int] = None
+ )
+
+ case class Page(
+ path: String,
+ referrer: String,
+ search: String,
+ title: String,
+ url: String
+ )
+
+ private[prediction] case class Network(
+ bluetooth: Option[Boolean] = None,
+ carrier: Option[String] = None,
+ cellular: Option[Boolean] = None,
+ wifi: Option[Boolean] = None
+ )
+
+ private[prediction] case class Library(name: String, version: String)
+
+ private[prediction] case class Device(
+ id: Option[String] = None,
+ advertising_id: Option[String] = None,
+ ad_tracking_enabled: Option[Boolean] = None,
+ manufacturer: Option[String] = None,
+ model: Option[String] = None,
+ name: Option[String] = None,
+ `type`: Option[String] = None,
+ token: Option[String] = None
+ )
+
+ private[prediction] case class Campaign(
+ name: Option[String] = None,
+ source: Option[String] = None,
+ medium: Option[String] = None,
+ term: Option[String] = None,
+ content: Option[String] = None
+ )
+
+ private[prediction] case class App(
+ name: Option[String] = None,
+ version: Option[String] = None,
+ build: Option[String] = None
+ )
+
+}
+
+private[prediction] case class Common(
+ `type`: String,
+ sent_at: String,
+ timestamp: String,
+ version: String,
+ anonymous_id: Option[String] = None,
+ user_id: Option[String] = None,
+ context: Option[Common.Context] = None,
+ integrations: Option[Common.Integrations] = None
+)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala b/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala
deleted file mode 100644
index 9f7a74e..0000000
--- a/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.api
-
-import io.prediction.data.storage.Storage
-
-import akka.testkit.TestProbe
-import akka.actor.ActorSystem
-import akka.actor.Props
-
-import spray.http.HttpEntity
-import spray.http.HttpResponse
-import spray.http.ContentTypes
-import spray.httpx.RequestBuilding.Get
-
-import org.specs2.mutable.Specification
-
-class EventServiceSpec extends Specification {
-
- val system = ActorSystem("EventServiceSpecSystem")
-
- val eventClient = Storage.getLEvents()
- val accessKeysClient = Storage.getMetaDataAccessKeys()
- val channelsClient = Storage.getMetaDataChannels()
-
- val eventServiceActor = system.actorOf(
- Props(
- new EventServiceActor(
- eventClient,
- accessKeysClient,
- channelsClient,
- EventServerConfig()
- )
- )
- )
-
- "GET / request" should {
- "properly produce OK HttpResponses" in {
- val probe = TestProbe()(system)
- probe.send(eventServiceActor, Get("/"))
- probe.expectMsg(
- HttpResponse(
- 200,
- HttpEntity(
- contentType = ContentTypes.`application/json`,
- string = """{"status":"alive"}"""
- )
- )
- )
- success
- }
- }
-
- step(system.shutdown())
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala b/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala
deleted file mode 100644
index bae0f0b..0000000
--- a/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-package io.prediction.data.api
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import io.prediction.data.storage._
-import org.joda.time.DateTime
-import org.specs2.mutable.Specification
-import spray.http.HttpHeaders.RawHeader
-import spray.http.{ContentTypes, HttpEntity, HttpResponse}
-import spray.httpx.RequestBuilding._
-import sun.misc.BASE64Encoder
-
-import scala.concurrent.{Future, ExecutionContext}
-
-class SegmentIOAuthSpec extends Specification {
-
- val system = ActorSystem("EventServiceSpecSystem")
- sequential
- isolated
- val eventClient = new LEvents {
- override def init(appId: Int, channelId: Option[Int]): Boolean = true
-
- override def futureInsert(event: Event, appId: Int, channelId: Option[Int])
- (implicit ec: ExecutionContext): Future[String] =
- Future successful "event_id"
-
- override def futureFind(
- appId: Int, channelId: Option[Int], startTime: Option[DateTime],
- untilTime: Option[DateTime], entityType: Option[String],
- entityId: Option[String], eventNames: Option[Seq[String]],
- targetEntityType: Option[Option[String]],
- targetEntityId: Option[Option[String]], limit: Option[Int],
- reversed: Option[Boolean])
- (implicit ec: ExecutionContext): Future[Iterator[Event]] =
- Future successful List.empty[Event].iterator
-
- override def futureGet(eventId: String, appId: Int, channelId: Option[Int])
- (implicit ec: ExecutionContext): Future[Option[Event]] =
- Future successful None
-
- override def remove(appId: Int, channelId: Option[Int]): Boolean = true
-
- override def futureDelete(eventId: String, appId: Int, channelId: Option[Int])
- (implicit ec: ExecutionContext): Future[Boolean] =
- Future successful true
-
- override def close(): Unit = {}
- }
- val appId = 0
- val accessKeysClient = new AccessKeys {
- override def insert(k: AccessKey): Option[String] = null
- override def getByAppid(appid: Int): Seq[AccessKey] = null
- override def update(k: AccessKey): Unit = {}
- override def delete(k: String): Unit = {}
- override def getAll(): Seq[AccessKey] = null
-
- override def get(k: String): Option[AccessKey] =
- k match {
- case "abc" \u21d2 Some(AccessKey(k, appId, Seq.empty))
- case _ \u21d2 None
- }
- }
-
- val channelsClient = Storage.getMetaDataChannels()
- val eventServiceActor = system.actorOf(
- Props(
- new EventServiceActor(
- eventClient,
- accessKeysClient,
- channelsClient,
- EventServerConfig()
- )
- )
- )
-
- val base64Encoder = new BASE64Encoder
-
- "Event Service" should {
-
- "reject with CredentialsRejected with invalid credentials" in {
- val accessKey = "abc123:"
- val probe = TestProbe()(system)
- probe.send(
- eventServiceActor,
- Post("/webhooks/segmentio.json")
- .withHeaders(
- List(
- RawHeader("Authorization", s"Basic $accessKey")
- )
- )
- )
- probe.expectMsg(
- HttpResponse(
- 401,
- HttpEntity(
- contentType = ContentTypes.`application/json`,
- string = """{"message":"Invalid accessKey."}"""
- )
- )
- )
- success
- }
-
- "reject with CredentialsMissed without credentials" in {
- val probe = TestProbe()(system)
- probe.send(
- eventServiceActor,
- Post("/webhooks/segmentio.json")
- )
- probe.expectMsg(
- HttpResponse(
- 401,
- HttpEntity(
- contentType = ContentTypes.`application/json`,
- string = """{"message":"Missing accessKey."}"""
- )
- )
- )
- success
- }
-
- "process SegmentIO identity request properly" in {
- val jsonReq =
- """
- |{
- | "anonymous_id": "507f191e810c19729de860ea",
- | "channel": "browser",
- | "context": {
- | "ip": "8.8.8.8",
- | "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)"
- | },
- | "message_id": "022bb90c-bbac-11e4-8dfc-aa07a5b093db",
- | "timestamp": "2015-02-23T22:28:55.387Z",
- | "sent_at": "2015-02-23T22:28:55.111Z",
- | "traits": {
- | "name": "Peter Gibbons",
- | "email": "peter@initech.com",
- | "plan": "premium",
- | "logins": 5
- | },
- | "type": "identify",
- | "user_id": "97980cfea0067",
- | "version": "2"
- |}
- """.stripMargin
-
- val accessKey = "abc:"
- val accessKeyEncoded = base64Encoder.encodeBuffer(accessKey.getBytes)
- val probe = TestProbe()(system)
- probe.send(
- eventServiceActor,
- Post(
- "/webhooks/segmentio.json",
- HttpEntity(ContentTypes.`application/json`, jsonReq.getBytes)
- ).withHeaders(
- List(
- RawHeader("Authorization", s"Basic $accessKeyEncoded")
- )
- )
- )
- probe.expectMsg(
- HttpResponse(
- 201,
- HttpEntity(
- contentType = ContentTypes.`application/json`,
- string = """{"eventId":"event_id"}"""
- )
- )
- )
- success
- }
- }
-
- step(system.shutdown())
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala b/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala
deleted file mode 100644
index e6d28b3..0000000
--- a/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala
+++ /dev/null
@@ -1,196 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import org.specs2.mutable._
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.SparkConf
-import org.apache.spark.rdd.RDD
-
-class BiMapSpec extends Specification {
-
- System.clearProperty("spark.driver.port")
- System.clearProperty("spark.hostPort")
- val sc = new SparkContext("local[4]", "BiMapSpec test")
-
- "BiMap created with map" should {
-
- val keys = Seq(1, 4, 6)
- val orgValues = Seq(2, 5, 7)
- val org = keys.zip(orgValues).toMap
- val bi = BiMap(org)
-
- "return correct values for each key of original map" in {
- val biValues = keys.map(k => bi(k))
-
- biValues must beEqualTo(orgValues)
- }
-
- "get return Option[V]" in {
- val checkKeys = keys ++ Seq(12345)
- val biValues = checkKeys.map(k => bi.get(k))
- val expected = orgValues.map(Some(_)) ++ Seq(None)
-
- biValues must beEqualTo(expected)
- }
-
- "getOrElse return value for each key of original map" in {
- val biValues = keys.map(k => bi.getOrElse(k, -1))
-
- biValues must beEqualTo(orgValues)
- }
-
- "getOrElse return default values for invalid key" in {
- val keys = Seq(999, -1, -2)
- val defaults = Seq(1234, 5678, 987)
- val biValues = keys.zip(defaults).map{ case (k,d) => bi.getOrElse(k, d) }
-
- biValues must beEqualTo(defaults)
- }
-
- "contains() returns true/false correctly" in {
- val checkKeys = keys ++ Seq(12345)
- val biValues = checkKeys.map(k => bi.contains(k))
- val expected = orgValues.map(_ => true) ++ Seq(false)
-
- biValues must beEqualTo(expected)
- }
-
- "same size as original map" in {
- (bi.size) must beEqualTo(org.size)
- }
-
- "take(2) returns BiMap of size 2" in {
- bi.take(2).size must beEqualTo(2)
- }
-
- "toMap contain same element as original map" in {
- (bi.toMap) must beEqualTo(org)
- }
-
- "toSeq contain same element as original map" in {
- (bi.toSeq) must containTheSameElementsAs(org.toSeq)
- }
-
- "inverse and return correct keys for each values of original map" in {
- val biKeys = orgValues.map(v => bi.inverse(v))
- biKeys must beEqualTo(keys)
- }
-
- "inverse with same size" in {
- bi.inverse.size must beEqualTo(org.size)
- }
-
- "inverse's inverse reference back to the same original object" in {
- // NOTE: reference equality
- bi.inverse.inverse == bi
- }
- }
-
- "BiMap created with duplicated values in map" should {
- val dup = Map(1 -> 2, 4 -> 7, 6 -> 7)
- "return IllegalArgumentException" in {
- BiMap(dup) must throwA[IllegalArgumentException]
- }
- }
-
- "BiMap.stringLong and stringInt" should {
-
- "create BiMap from set of string" in {
- val keys = Set("a", "b", "foo", "bar")
- val values: Seq[Long] = Seq(0, 1, 2, 3)
-
- val bi = BiMap.stringLong(keys)
- val biValues = keys.map(k => bi(k))
-
- val biInt = BiMap.stringInt(keys)
- val valuesInt: Seq[Int] = values.map(_.toInt)
- val biIntValues = keys.map(k => biInt(k))
-
- biValues must containTheSameElementsAs(values) and
- (biIntValues must containTheSameElementsAs(valuesInt))
- }
-
- "create BiMap from Array of unique string" in {
- val keys = Array("a", "b", "foo", "bar")
- val values: Seq[Long] = Seq(0, 1, 2, 3)
-
- val bi = BiMap.stringLong(keys)
- val biValues = keys.toSeq.map(k => bi(k))
-
- val biInt = BiMap.stringInt(keys)
- val valuesInt: Seq[Int] = values.map(_.toInt)
- val biIntValues = keys.toSeq.map(k => biInt(k))
-
- biValues must containTheSameElementsAs(values) and
- (biIntValues must containTheSameElementsAs(valuesInt))
- }
-
- "not guarantee sequential index for Array with duplicated string" in {
- val keys = Array("a", "b", "foo", "bar", "a", "b", "x")
- val dupValues: Seq[Long] = Seq(0, 1, 2, 3, 4, 5, 6)
- val values = keys.zip(dupValues).toMap.values.toSeq
-
- val bi = BiMap.stringLong(keys)
- val biValues = keys.toSet[String].map(k => bi(k))
-
- val biInt = BiMap.stringInt(keys)
- val valuesInt: Seq[Int] = values.map(_.toInt)
- val biIntValues = keys.toSet[String].map(k => biInt(k))
-
- biValues must containTheSameElementsAs(values) and
- (biIntValues must containTheSameElementsAs(valuesInt))
- }
-
- "create BiMap from RDD[String]" in {
-
- val keys = Seq("a", "b", "foo", "bar")
- val values: Seq[Long] = Seq(0, 1, 2, 3)
- val rdd = sc.parallelize(keys)
-
- val bi = BiMap.stringLong(rdd)
- val biValues = keys.map(k => bi(k))
-
- val biInt = BiMap.stringInt(rdd)
- val valuesInt: Seq[Int] = values.map(_.toInt)
- val biIntValues = keys.map(k => biInt(k))
-
- biValues must containTheSameElementsAs(values) and
- (biIntValues must containTheSameElementsAs(valuesInt))
- }
-
- "create BiMap from RDD[String] with duplicated string" in {
-
- val keys = Seq("a", "b", "foo", "bar", "a", "b", "x")
- val values: Seq[Long] = Seq(0, 1, 2, 3, 4)
- val rdd = sc.parallelize(keys)
-
- val bi = BiMap.stringLong(rdd)
- val biValues = keys.distinct.map(k => bi(k))
-
- val biInt = BiMap.stringInt(rdd)
- val valuesInt: Seq[Int] = values.map(_.toInt)
- val biIntValues = keys.distinct.map(k => biInt(k))
-
- biValues must containTheSameElementsAs(values) and
- (biIntValues must containTheSameElementsAs(valuesInt))
- }
- }
-
- step(sc.stop())
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala b/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala
deleted file mode 100644
index 97e9b09..0000000
--- a/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala
+++ /dev/null
@@ -1,243 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import org.specs2.mutable._
-
-class DataMapSpec extends Specification {
-
- "DataMap" should {
-
- val properties = DataMap("""
- {
- "prop1" : 1,
- "prop2" : "value2",
- "prop3" : [1, 2, 3],
- "prop4" : true,
- "prop5" : ["a", "b", "c", "c"],
- "prop6" : 4.56
- }
- """)
-
- "get Int data" in {
- properties.get[Int]("prop1") must beEqualTo(1)
- properties.getOpt[Int]("prop1") must beEqualTo(Some(1))
- }
-
- "get String data" in {
- properties.get[String]("prop2") must beEqualTo("value2")
- properties.getOpt[String]("prop2") must beEqualTo(Some("value2"))
- }
-
- "get List of Int data" in {
- properties.get[List[Int]]("prop3") must beEqualTo(List(1,2,3))
- properties.getOpt[List[Int]]("prop3") must beEqualTo(Some(List(1,2,3)))
- }
-
- "get Boolean data" in {
- properties.get[Boolean]("prop4") must beEqualTo(true)
- properties.getOpt[Boolean]("prop4") must beEqualTo(Some(true))
- }
-
- "get List of String data" in {
- properties.get[List[String]]("prop5") must beEqualTo(List("a", "b", "c", "c"))
- properties.getOpt[List[String]]("prop5") must beEqualTo(Some(List("a", "b", "c", "c")))
- }
-
- "get Set of String data" in {
- properties.get[Set[String]]("prop5") must beEqualTo(Set("a", "b", "c"))
- properties.getOpt[Set[String]]("prop5") must beEqualTo(Some(Set("a", "b", "c")))
- }
-
- "get Double data" in {
- properties.get[Double]("prop6") must beEqualTo(4.56)
- properties.getOpt[Double]("prop6") must beEqualTo(Some(4.56))
- }
-
- "get empty optional Int data" in {
- properties.getOpt[Int]("prop9999") must beEqualTo(None)
- }
-
- }
-
- "DataMap with multi-level data" should {
- val properties = DataMap("""
- {
- "context": {
- "ip": "1.23.4.56",
- "prop1": 2.345
- "prop2": "value1",
- "prop4": [1, 2, 3]
- },
- "anotherPropertyA": 4.567,
- "anotherPropertyB": false
- }
- """)
-
- "get case class data" in {
- val expected = DataMapSpec.Context(
- ip = "1.23.4.56",
- prop1 = Some(2.345),
- prop2 = Some("value1"),
- prop3 = None,
- prop4 = List(1,2,3)
- )
-
- properties.get[DataMapSpec.Context]("context") must beEqualTo(expected)
- }
-
- "get empty optional case class data" in {
- properties.getOpt[DataMapSpec.Context]("context999") must beEqualTo(None)
- }
-
- "get double data" in {
- properties.get[Double]("anotherPropertyA") must beEqualTo(4.567)
- }
-
- "get boolean data" in {
- properties.get[Boolean]("anotherPropertyB") must beEqualTo(false)
- }
- }
-
- "DataMap extract" should {
-
- "extract to case class object" in {
- val properties = DataMap("""
- {
- "prop1" : 1,
- "prop2" : "value2",
- "prop3" : [1, 2, 3],
- "prop4" : true,
- "prop5" : ["a", "b", "c", "c"],
- "prop6" : 4.56
- }
- """)
-
- val result = properties.extract[DataMapSpec.BasicProperty]
- val expected = DataMapSpec.BasicProperty(
- prop1 = 1,
- prop2 = "value2",
- prop3 = List(1,2,3),
- prop4 = true,
- prop5 = List("a", "b", "c", "c"),
- prop6 = 4.56
- )
-
- result must beEqualTo(expected)
- }
-
- "extract with optional fields" in {
- val propertiesEmpty = DataMap("""{}""")
- val propertiesSome = DataMap("""
- {
- "prop1" : 1,
- "prop5" : ["a", "b", "c", "c"],
- "prop6" : 4.56
- }
- """)
-
- val resultEmpty = propertiesEmpty.extract[DataMapSpec.OptionProperty]
- val expectedEmpty = DataMapSpec.OptionProperty(
- prop1 = None,
- prop2 = None,
- prop3 = None,
- prop4 = None,
- prop5 = None,
- prop6 = None
- )
-
- val resultSome = propertiesSome.extract[DataMapSpec.OptionProperty]
- val expectedSome = DataMapSpec.OptionProperty(
- prop1 = Some(1),
- prop2 = None,
- prop3 = None,
- prop4 = None,
- prop5 = Some(List("a", "b", "c", "c")),
- prop6 = Some(4.56)
- )
-
- resultEmpty must beEqualTo(expectedEmpty)
- resultSome must beEqualTo(expectedSome)
- }
-
- "extract to multi-level object" in {
- val properties = DataMap("""
- {
- "context": {
- "ip": "1.23.4.56",
- "prop1": 2.345
- "prop2": "value1",
- "prop4": [1, 2, 3]
- },
- "anotherPropertyA": 4.567,
- "anotherPropertyB": false
- }
- """)
-
- val result = properties.extract[DataMapSpec.MultiLevelProperty]
- val expected = DataMapSpec.MultiLevelProperty(
- context = DataMapSpec.Context(
- ip = "1.23.4.56",
- prop1 = Some(2.345),
- prop2 = Some("value1"),
- prop3 = None,
- prop4 = List(1,2,3)
- ),
- anotherPropertyA = 4.567,
- anotherPropertyB = false
- )
-
- result must beEqualTo(expected)
- }
-
- }
-}
-
-object DataMapSpec {
-
- // define this case class inside object to avoid case class name conflict with other tests
- case class Context(
- ip: String,
- prop1: Option[Double],
- prop2: Option[String],
- prop3: Option[Int],
- prop4: List[Int]
- )
-
- case class BasicProperty(
- prop1: Int,
- prop2: String,
- prop3: List[Int],
- prop4: Boolean,
- prop5: List[String],
- prop6: Double
- )
-
- case class OptionProperty(
- prop1: Option[Int],
- prop2: Option[String],
- prop3: Option[List[Int]],
- prop4: Option[Boolean],
- prop5: Option[List[String]],
- prop6: Option[Double]
- )
-
- case class MultiLevelProperty(
- context: Context,
- anotherPropertyA: Double,
- anotherPropertyB: Boolean
- )
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala b/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala
deleted file mode 100644
index 77a66d5..0000000
--- a/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import org.specs2.mutable._
-
-import org.json4s.JObject
-import org.json4s.native.JsonMethods.parse
-
-import org.joda.time.DateTime
-
-class LEventAggregatorSpec extends Specification with TestEvents {
-
- "LEventAggregator.aggregateProperties()" should {
-
- "aggregate two entities' properties as DataMap correctly" in {
- val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
- val result: Map[String, DataMap] =
- LEventAggregator.aggregateProperties(events.toIterator)
-
- val expected = Map(
- "u1" -> DataMap(u1),
- "u2" -> DataMap(u2)
- )
-
- result must beEqualTo(expected)
- }
-
- "aggregate two entities' properties as PropertyMap correctly" in {
- val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
- val result: Map[String, PropertyMap] =
- LEventAggregator.aggregateProperties(events.toIterator)
-
- val expected = Map(
- "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
- "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
- )
-
- result must beEqualTo(expected)
- }
-
-
- "aggregate deleted entity correctly" in {
- val events = Vector(u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2)
-
- val result = LEventAggregator.aggregateProperties(events.toIterator)
- val expected = Map(
- "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
- )
-
- result must beEqualTo(expected)
- }
- }
-
-
- "LEventAggregator.aggregatePropertiesSingle()" should {
-
- "aggregate single entity properties as DataMap correctly" in {
- val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2)
- val eventsIt = events.toIterator
-
- val result: Option[DataMap] = LEventAggregator
- .aggregatePropertiesSingle(eventsIt)
- val expected = DataMap(u1)
-
- result must beEqualTo(Some(expected))
- }
-
- "aggregate single entity properties as PropertyMap correctly" in {
- val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2)
- val eventsIt = events.toIterator
-
- val result: Option[PropertyMap] = LEventAggregator
- .aggregatePropertiesSingle(eventsIt)
- val expected = PropertyMap(u1, u1BaseTime, u1LastTime)
-
- result must beEqualTo(Some(expected))
- }
-
- "aggregate deleted entity correctly" in {
- // put the delete event in the middle
- val events = Vector(u1e4, u1e2, u1ed, u1e3, u1e1, u1e5)
- val eventsIt = events.toIterator
-
- val result = LEventAggregator.aggregatePropertiesSingle(eventsIt)
-
- result must beEqualTo(None)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala b/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala
deleted file mode 100644
index 5b38cdb..0000000
--- a/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import org.specs2._
-import org.specs2.specification.Step
-
-class LEventsSpec extends Specification with TestEvents {
- def is = s2"""
-
- PredictionIO Storage LEvents Specification
-
- Events can be implemented by:
- - HBLEvents ${hbEvents}
- - JDBCLEvents ${jdbcLEvents}
-
- """
-
- def hbEvents = sequential ^ s2"""
-
- HBLEvents should
- - behave like any LEvents implementation ${events(hbDO)}
- - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
-
- """
-
- def jdbcLEvents = sequential ^ s2"""
-
- JDBCLEvents should
- - behave like any LEvents implementation ${events(jdbcDO)}
-
- """
-
- val appId = 1
-
- def events(eventClient: LEvents) = sequential ^ s2"""
-
- init default ${initDefault(eventClient)}
- insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)}
- insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)}
- insert and delete by ID ${insertAndDelete(eventClient)}
- insert test user events ${insertTestUserEvents(eventClient)}
- find user events ${findUserEvents(eventClient)}
- aggregate user properties ${aggregateUserProperties(eventClient)}
- aggregate one user properties ${aggregateOneUserProperties(eventClient)}
- aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)}
- init channel ${initChannel(eventClient)}
- insert 2 events to channel ${insertChannel(eventClient)}
- insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)}
- find events from channel ${findChannel(eventClient)}
- remove default ${removeDefault(eventClient)}
- remove channel ${removeChannel(eventClient)}
-
- """
-
- val dbName = "test_pio_storage_events_" + hashCode
- def hbDO = Storage.getDataObject[LEvents](
- StorageTestUtils.hbaseSourceName,
- dbName
- )
-
- def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName)
-
- def initDefault(eventClient: LEvents) = {
- eventClient.init(appId)
- }
-
- def insertAndGetEvents(eventClient: LEvents) = {
-
- // events from TestEvents trait
- val listOfEvents = List(r1,r2,r3)
-
- val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
-
- val insertedEventId: List[String] = insertResp
-
- val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
- .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
-
- val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
-
- val getEvents = getResp
-
- insertedEvent must containTheSameElementsAs(getEvents)
- }
-
- def insertAndGetTimezone(eventClient: LEvents) = {
- val listOfEvents = List(tz1, tz2, tz3)
-
- val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
-
- val insertedEventId: List[String] = insertResp
-
- val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
- .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
-
- val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
-
- val getEvents = getResp
-
- insertedEvent must containTheSameElementsAs(getEvents)
- }
-
- def insertAndDelete(eventClient: LEvents) = {
- val eventId = eventClient.insert(r2, appId)
-
- val resultBefore = eventClient.get(eventId, appId)
-
- val expectedBefore = r2.copy(eventId = Some(eventId))
-
- val deleteStatus = eventClient.delete(eventId, appId)
-
- val resultAfter = eventClient.get(eventId, appId)
-
- (resultBefore must beEqualTo(Some(expectedBefore))) and
- (deleteStatus must beEqualTo(true)) and
- (resultAfter must beEqualTo(None))
- }
-
- def insertTestUserEvents(eventClient: LEvents) = {
- // events from TestEvents trait
- val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
-
- listOfEvents.map{ eventClient.insert(_, appId) }
-
- success
- }
-
- def findUserEvents(eventClient: LEvents) = {
-
- val results: List[Event] = eventClient.find(
- appId = appId,
- entityType = Some("user"))
- .toList
- .map(e => e.copy(eventId = None)) // ignore eventID
-
- // same events in insertTestUserEvents
- val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
-
- results must containTheSameElementsAs(expected)
- }
-
- def aggregateUserProperties(eventClient: LEvents) = {
-
- val result: Map[String, PropertyMap] = eventClient.aggregateProperties(
- appId = appId,
- entityType = "user")
-
- val expected = Map(
- "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
- "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
- )
-
- result must beEqualTo(expected)
- }
-
- def aggregateOneUserProperties(eventClient: LEvents) = {
- val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
- appId = appId,
- entityType = "user",
- entityId = "u1")
-
- val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime))
-
- result must beEqualTo(expected)
- }
-
- def aggregateNonExistentUserProperties(eventClient: LEvents) = {
- val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
- appId = appId,
- entityType = "user",
- entityId = "u999999")
-
- result must beEqualTo(None)
- }
-
- val channelId = 12
-
- def initChannel(eventClient: LEvents) = {
- eventClient.init(appId, Some(channelId))
- }
-
- def insertChannel(eventClient: LEvents) = {
-
- // events from TestEvents trait
- val listOfEvents = List(r4,r5)
-
- listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) )
-
- success
- }
-
- def insertAndDeleteChannel(eventClient: LEvents) = {
-
- val eventId = eventClient.insert(r2, appId, Some(channelId))
-
- val resultBefore = eventClient.get(eventId, appId, Some(channelId))
-
- val expectedBefore = r2.copy(eventId = Some(eventId))
-
- val deleteStatus = eventClient.delete(eventId, appId, Some(channelId))
-
- val resultAfter = eventClient.get(eventId, appId, Some(channelId))
-
- (resultBefore must beEqualTo(Some(expectedBefore))) and
- (deleteStatus must beEqualTo(true)) and
- (resultAfter must beEqualTo(None))
- }
-
- def findChannel(eventClient: LEvents) = {
-
- val results: List[Event] = eventClient.find(
- appId = appId,
- channelId = Some(channelId)
- )
- .toList
- .map(e => e.copy(eventId = None)) // ignore eventId
-
- // same events in insertChannel
- val expected = List(r4, r5)
-
- results must containTheSameElementsAs(expected)
- }
-
- def removeDefault(eventClient: LEvents) = {
- eventClient.remove(appId)
- }
-
- def removeChannel(eventClient: LEvents) = {
- eventClient.remove(appId, Some(channelId))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala b/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala
deleted file mode 100644
index b00ec7c..0000000
--- a/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import org.specs2.mutable._
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-
-class PEventAggregatorSpec extends Specification with TestEvents {
-
- System.clearProperty("spark.driver.port")
- System.clearProperty("spark.hostPort")
- val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
-
- "PEventAggregator" should {
-
- "aggregate two entities' properties as DataMap/PropertyMap correctly" in {
- val events = sc.parallelize(Seq(
- u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2))
-
- val users = PEventAggregator.aggregateProperties(events)
-
- val userMap = users.collectAsMap.toMap
- val expectedDM = Map(
- "u1" -> DataMap(u1),
- "u2" -> DataMap(u2)
- )
-
- val expectedPM = Map(
- "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
- "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
- )
-
- userMap must beEqualTo(expectedDM)
- userMap must beEqualTo(expectedPM)
- }
-
- "aggregate deleted entity correctly" in {
- // put the delete event in middle
- val events = sc.parallelize(Seq(
- u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2))
-
- val users = PEventAggregator.aggregateProperties(events)
-
- val userMap = users.collectAsMap.toMap
- val expectedPM = Map(
- "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
- )
-
- userMap must beEqualTo(expectedPM)
- }
-
- }
-
- step(sc.stop())
-}