You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:39 UTC

[08/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala b/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala
new file mode 100644
index 0000000..5bd7478
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala
@@ -0,0 +1,200 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.view
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Storage
+
+import org.joda.time.DateTime
+import scala.language.implicitConversions
+
+import scala.concurrent.ExecutionContext.Implicits.global // TODO
+
+@deprecated("Use LEvents or LEventStore instead.", "0.9.2")
+object ViewPredicates {
+  def getStartTimePredicate(startTimeOpt: Option[DateTime])
+  : (Event => Boolean) = {
+    startTimeOpt.map(getStartTimePredicate).getOrElse(_ => true)
+  }
+
+  def getStartTimePredicate(startTime: DateTime): (Event => Boolean) = {
+    e => (!(e.eventTime.isBefore(startTime) || e.eventTime.isEqual(startTime)))
+  }
+
+  def getUntilTimePredicate(untilTimeOpt: Option[DateTime])
+  : (Event => Boolean) = {
+    untilTimeOpt.map(getUntilTimePredicate).getOrElse(_ => true)
+  }
+
+  def getUntilTimePredicate(untilTime: DateTime): (Event => Boolean) = {
+    _.eventTime.isBefore(untilTime)
+  }
+
+  def getEntityTypePredicate(entityTypeOpt: Option[String]): (Event => Boolean)
+  = {
+    entityTypeOpt.map(getEntityTypePredicate).getOrElse(_ => true)
+  }
+
+  def getEntityTypePredicate(entityType: String): (Event => Boolean) = {
+    (_.entityType == entityType)
+  }
+
+  def getEventPredicate(eventOpt: Option[String]): (Event => Boolean)
+  = {
+    eventOpt.map(getEventPredicate).getOrElse(_ => true)
+  }
+
+  def getEventPredicate(event: String): (Event => Boolean) = {
+    (_.event == event)
+  }
+}
+
+@deprecated("Use LEvents instead.", "0.9.2")
+object ViewAggregators {
+  def getDataMapAggregator(): ((Option[DataMap], Event) => Option[DataMap]) = {
+    (p, e) => {
+      e.event match {
+        case "$set" => {
+          if (p == None) {
+            Some(e.properties)
+          } else {
+            p.map(_ ++ e.properties)
+          }
+        }
+        case "$unset" => {
+          if (p == None) {
+            None
+          } else {
+            p.map(_ -- e.properties.keySet)
+          }
+        }
+        case "$delete" => None
+        case _ => p // do nothing for others
+      }
+    }
+  }
+}
+
+@deprecated("Use LEvents instead.", "0.9.2")
+object EventSeq {
+  // Need to
+  // >>> import scala.language.implicitConversions
+  // to enable implicit conversion. Only import in the code where this is
+  // necessary to avoid confusion.
+  implicit def eventSeqToList(es: EventSeq): List[Event] = es.events
+  implicit def listToEventSeq(l: List[Event]): EventSeq = new EventSeq(l)
+}
+
+
+@deprecated("Use LEvents instead.", "0.9.2")
+class EventSeq(val events: List[Event]) {
+  def filter(
+    eventOpt: Option[String] = None,
+    entityTypeOpt: Option[String] = None,
+    startTimeOpt: Option[DateTime] = None,
+    untilTimeOpt: Option[DateTime] = None): EventSeq = {
+
+    events
+    .filter(ViewPredicates.getEventPredicate(eventOpt))
+    .filter(ViewPredicates.getStartTimePredicate(startTimeOpt))
+    .filter(ViewPredicates.getUntilTimePredicate(untilTimeOpt))
+    .filter(ViewPredicates.getEntityTypePredicate(entityTypeOpt))
+  }
+
+  def filter(p: (Event => Boolean)): EventSeq = events.filter(p)
+
+  def aggregateByEntityOrdered[T](init: T, op: (T, Event) => T)
+  : Map[String, T] = {
+    events
+    .groupBy( _.entityId )
+    .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op))
+    .toMap
+  }
+
+
+}
+
+
+@deprecated("Use LEventStore instead.", "0.9.2")
+class LBatchView(
+  val appId: Int,
+  val startTime: Option[DateTime],
+  val untilTime: Option[DateTime]) {
+
+  @transient lazy val eventsDb = Storage.getLEvents()
+
+  @transient lazy val _events = eventsDb.find(
+    appId = appId,
+    startTime = startTime,
+    untilTime = untilTime).toList
+
+  @transient lazy val events: EventSeq = new EventSeq(_events)
+
+  /* Aggregate event data
+   *
+   * @param entityType only aggregate event with entityType
+   * @param startTimeOpt if specified, only aggregate event after (inclusive)
+   * startTimeOpt
+   * @param untilTimeOpt if specified, only aggregate event until (exclusive)
+   * endTimeOpt
+   */
+  def aggregateProperties(
+      entityType: String,
+      startTimeOpt: Option[DateTime] = None,
+      untilTimeOpt: Option[DateTime] = None
+      ): Map[String, DataMap] = {
+
+    events
+    .filter(entityTypeOpt = Some(entityType))
+    .filter(e => EventValidation.isSpecialEvents(e.event))
+    .aggregateByEntityOrdered(
+      init = None,
+      op = ViewAggregators.getDataMapAggregator())
+    .filter{ case (k, v) => (v != None) }
+    .mapValues(_.get)
+
+  }
+
+  /*
+  def aggregateByEntityOrdered[T](
+    predicate: Event => Boolean,
+    init: T,
+    op: (T, Event) => T): Map[String, T] = {
+
+    _events
+      .filter( predicate(_) )
+      .groupBy( _.entityId )
+      .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op))
+      .toMap
+
+  }
+  */
+
+  /*
+  def groupByEntityOrdered[T](
+    predicate: Event => Boolean,
+    map: Event => T): Map[String, Seq[T]] = {
+
+    _events
+      .filter( predicate(_) )
+      .groupBy( _.entityId )
+      .mapValues( _.sortBy(_.eventTime.getMillis).map(map(_)) )
+      .toMap
+  }
+  */
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala b/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
new file mode 100644
index 0000000..6c75402
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/view/PBatchView.scala
@@ -0,0 +1,209 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.view
+
+import org.apache.predictionio.data.storage.hbase.HBPEvents
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Storage
+
+import org.joda.time.DateTime
+
+import org.json4s.JValue
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+
+// each JValue data associated with the time it is set
+private[prediction] case class PropTime(val d: JValue, val t: Long) extends Serializable
+
+private[prediction] case class SetProp (
+  val fields: Map[String, PropTime],
+  // last set time. Note: fields could be empty with valid set time
+  val t: Long) extends Serializable {
+
+  def ++ (that: SetProp): SetProp = {
+    val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+    val common: Map[String, PropTime] = commonKeys.map { k =>
+      val thisData = this.fields(k)
+      val thatData = that.fields(k)
+      // only keep the value with latest time
+      val v = if (thisData.t > thatData.t) thisData else thatData
+      (k, v)
+    }.toMap
+
+    val combinedFields = common ++
+      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+    // keep the latest set time
+    val combinedT = if (this.t > that.t) this.t else that.t
+
+    SetProp(
+      fields = combinedFields,
+      t = combinedT
+    )
+  }
+}
+
+private[prediction] case class UnsetProp (fields: Map[String, Long]) extends Serializable {
+  def ++ (that: UnsetProp): UnsetProp = {
+    val commonKeys = fields.keySet.intersect(that.fields.keySet)
+
+    val common: Map[String, Long] = commonKeys.map { k =>
+      val thisData = this.fields(k)
+      val thatData = that.fields(k)
+      // only keep the value with latest time
+      val v = if (thisData > thatData) thisData else thatData
+      (k, v)
+    }.toMap
+
+    val combinedFields = common ++
+      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
+
+    UnsetProp(
+      fields = combinedFields
+    )
+  }
+}
+
+private[prediction] case class DeleteEntity (t: Long) extends Serializable {
+  def ++ (that: DeleteEntity): DeleteEntity = {
+    if (this.t > that.t) this else that
+  }
+}
+
+private[prediction] case class EventOp (
+  val setProp: Option[SetProp] = None,
+  val unsetProp: Option[UnsetProp] = None,
+  val deleteEntity: Option[DeleteEntity] = None
+) extends Serializable {
+
+  def ++ (that: EventOp): EventOp = {
+    EventOp(
+      setProp = (setProp ++ that.setProp).reduceOption(_ ++ _),
+      unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _),
+      deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _)
+    )
+  }
+
+  def toDataMap(): Option[DataMap] = {
+    setProp.flatMap { set =>
+
+      val unsetKeys: Set[String] = unsetProp.map( unset =>
+        unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet
+      ).getOrElse(Set())
+
+      val combinedFields = deleteEntity.map { delete =>
+        if (delete.t >= set.t) {
+          None
+        } else {
+          val deleteKeys: Set[String] = set.fields
+            .filter { case (k, PropTime(kv, t)) =>
+              (delete.t >= t)
+            }.keySet
+          Some(set.fields -- unsetKeys -- deleteKeys)
+        }
+      }.getOrElse{
+        Some(set.fields -- unsetKeys)
+      }
+
+      // Note: mapValues() doesn't return concrete Map and causes
+      // NotSerializableException issue. Use map(identity) to work around this.
+      // see https://issues.scala-lang.org/browse/SI-7005
+      combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity)))
+    }
+  }
+
+}
+
+private[prediction] object EventOp {
+  def apply(e: Event): EventOp = {
+    val t = e.eventTime.getMillis
+    e.event match {
+      case "$set" => {
+        val fields = e.properties.fields.mapValues(jv =>
+          PropTime(jv, t)
+        ).map(identity)
+
+        EventOp(
+          setProp = Some(SetProp(fields = fields, t = t))
+        )
+      }
+      case "$unset" => {
+        val fields = e.properties.fields.mapValues(jv => t).map(identity)
+        EventOp(
+          unsetProp = Some(UnsetProp(fields = fields))
+        )
+      }
+      case "$delete" => {
+        EventOp(
+          deleteEntity = Some(DeleteEntity(t))
+        )
+      }
+      case _ => {
+        EventOp()
+      }
+    }
+  }
+}
+
+@deprecated("Use PEvents or PEventStore instead.", "0.9.2")
+class PBatchView(
+  val appId: Int,
+  val startTime: Option[DateTime],
+  val untilTime: Option[DateTime],
+  val sc: SparkContext) {
+
+  // NOTE: parallel Events DB interface
+  @transient lazy val eventsDb = Storage.getPEvents()
+
+  @transient lazy val _events: RDD[Event] =
+    eventsDb.getByAppIdAndTimeAndEntity(
+      appId = appId,
+      startTime = startTime,
+      untilTime = untilTime,
+      entityType = None,
+      entityId = None)(sc)
+
+  // TODO: change to use EventSeq?
+  @transient lazy val events: RDD[Event] = _events
+
+  def aggregateProperties(
+    entityType: String,
+    startTimeOpt: Option[DateTime] = None,
+    untilTimeOpt: Option[DateTime] = None
+  ): RDD[(String, DataMap)] = {
+
+    _events
+      .filter( e => ((e.entityType == entityType) &&
+        (EventValidation.isSpecialEvents(e.event))) )
+      .map( e => (e.entityId, EventOp(e) ))
+      .aggregateByKey[EventOp](EventOp())(
+        // within same partition
+        seqOp = { case (u, v) => u ++ v },
+        // across partition
+        combOp = { case (accu, u) => accu ++ u }
+      )
+      .mapValues(_.toDataMap)
+      .filter{ case (k, v) => v.isDefined }
+      .map{ case (k, v) => (k, v.get) }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala b/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala
new file mode 100644
index 0000000..eba3276
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala
@@ -0,0 +1,94 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.view
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Storage
+
+import scala.concurrent.ExecutionContext.Implicits.global // TODO
+
+import grizzled.slf4j.Logger
+import org.joda.time.DateTime
+
+import scala.language.implicitConversions
+
+class TestHBLEvents() {
+  @transient lazy val eventsDb = Storage.getLEvents()
+
+  def run(): Unit = {
+    val r = eventsDb.find(
+      appId = 1,
+      startTime = None,
+      untilTime = None,
+      entityType = Some("pio_user"),
+      entityId = Some("3")).toList
+    println(r)
+  }
+}
+
+class TestSource(val appId: Int) {
+  @transient lazy val logger = Logger[this.type]
+  @transient lazy val batchView = new LBatchView(appId,
+    None, None)
+
+  def run(): Unit = {
+    println(batchView.events)
+  }
+}
+
+object QuickTest {
+
+  def main(args: Array[String]) {
+    val t = new TestHBLEvents()
+    t.run()
+
+    // val ts = new TestSource(args(0).toInt)
+    // ts.run()
+  }
+}
+
+object TestEventTime {
+  @transient lazy val batchView = new LBatchView(9, None, None)
+
+  // implicit def back2list(es: EventSeq) = es.events
+
+  def main(args: Array[String]) {
+    val e = batchView.events.filter(
+      eventOpt = Some("rate"),
+      startTimeOpt = Some(new DateTime(1998, 1, 1, 0, 0))
+      // untilTimeOpt = Some(new DateTime(1997, 1, 1, 0, 0))
+    )
+      // untilTimeOpt = Some(new DateTime(2000, 1, 1, 0, 0)))
+
+    e.foreach { println }
+    println()
+    println()
+    println()
+    val u = batchView.aggregateProperties("pio_item")
+    u.foreach { println }
+    println()
+    println()
+    println()
+
+    // val l: Seq[Event] = e
+    val l = e.map { _.entityId }
+    l.foreach { println }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala
new file mode 100644
index 0000000..ee47a9c
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorException.scala
@@ -0,0 +1,31 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks
+
+/** Webhooks Connnector Exception
+  *
+  * @param message the detail message
+  * @param cause the cause
+  */
+private[prediction] class ConnectorException(message: String, cause: Throwable)
+  extends Exception(message, cause) {
+
+  /** Webhooks Connnector Exception with cause being set to null
+    *
+    * @param message the detail message
+    */
+  def this(message: String) = this(message, null)
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala
new file mode 100644
index 0000000..40feb98
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/ConnectorUtil.scala
@@ -0,0 +1,46 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks
+
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventJson4sSupport
+
+import org.json4s.Formats
+import org.json4s.DefaultFormats
+import org.json4s.JObject
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+
+
+private[prediction] object ConnectorUtil {
+
+  implicit val eventJson4sFormats: Formats = DefaultFormats +
+    new EventJson4sSupport.APISerializer
+
+  // intentionally use EventJson4sSupport.APISerializer to convert
+  // from JSON to Event object. Don't allow connector directly create
+  // Event object so that the Event object formation is consistent
+  // by enforcing JSON format
+
+  def toEvent(connector: JsonConnector, data: JObject): Event = {
+    read[Event](write(connector.toEventJson(data)))
+  }
+
+  def toEvent(connector: FormConnector, data: Map[String, String]): Event = {
+    read[Event](write(connector.toEventJson(data)))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala
new file mode 100644
index 0000000..dd04a21
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/FormConnector.scala
@@ -0,0 +1,32 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks
+
+import org.json4s.JObject
+
+/** Connector for Webhooks connection with Form submission data format
+  */
+private[prediction] trait FormConnector {
+
+  // TODO: support conversion to multiple events?
+
+  /** Convert from original Form submission data to Event JObject
+    * @param data Map of key-value pairs in String type received through webhooks
+    * @return Event JObject
+   */
+  def toEventJson(data: Map[String, String]): JObject
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala
new file mode 100644
index 0000000..eda8059
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/JsonConnector.scala
@@ -0,0 +1,31 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks
+
+import org.json4s.JObject
+
+/** Connector for Webhooks connection */
+private[prediction] trait JsonConnector {
+
+  // TODO: support conversion to multiple events?
+
+  /** Convert from original JObject to Event JObject
+    * @param data original JObject recevived through webhooks
+    * @return Event JObject
+   */
+  def toEventJson(data: JObject): JObject
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala
new file mode 100644
index 0000000..adf8791
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnector.scala
@@ -0,0 +1,123 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks.exampleform
+
+import org.apache.predictionio.data.webhooks.FormConnector
+import org.apache.predictionio.data.webhooks.ConnectorException
+
+import org.json4s.JObject
+
+
+/** Example FormConnector with following types of webhook form data inputs:
+  *
+  * UserAction
+  *
+  *   "type"="userAction"
+  *   "userId"="as34smg4",
+  *   "event"="do_something",
+  *   "context[ip]"="24.5.68.47", // optional
+  *   "context[prop1]"="2.345", // optional
+  *   "context[prop2]"="value1" // optional
+  *   "anotherProperty1"="100",
+  *   "anotherProperty2"="optional1", // optional
+  *   "timestamp"="2015-01-02T00:30:12.984Z"
+  *
+  * UserActionItem
+  *
+  *   "type"="userActionItem"
+  *   "userId"="as34smg4",
+  *   "event"="do_something_on",
+  *   "itemId"="kfjd312bc",
+  *   "context[ip]"="1.23.4.56",
+  *   "context[prop1]"="2.345",
+  *   "context[prop2]"="value1",
+  *   "anotherPropertyA"="4.567", // optional
+  *   "anotherPropertyB"="false", // optional
+  *   "timestamp"="2015-01-15T04:20:23.567Z"
+  *
+  */
+private[prediction] object ExampleFormConnector extends FormConnector {
+
+  override
+  def toEventJson(data: Map[String, String]): JObject = {
+    val json = try {
+      data.get("type") match {
+        case Some("userAction") => userActionToEventJson(data)
+        case Some("userActionItem") => userActionItemToEventJson(data)
+        case Some(x) => throw new ConnectorException(
+          s"Cannot convert unknown type ${x} to event JSON")
+        case None => throw new ConnectorException(
+          s"The field 'type' is required.")
+      }
+    } catch {
+      case e: ConnectorException => throw e
+      case e: Exception => throw new ConnectorException(
+        s"Cannot convert ${data} to event JSON. ${e.getMessage()}", e)
+    }
+    json
+  }
+
+  def userActionToEventJson(data: Map[String, String]): JObject = {
+    import org.json4s.JsonDSL._
+
+    // two level optional data
+    val context = if (data.exists(_._1.startsWith("context["))) {
+      Some(
+        ("ip" -> data.get("context[ip]")) ~
+        ("prop1" -> data.get("context[prop1]").map(_.toDouble)) ~
+        ("prop2" -> data.get("context[prop2]"))
+      )
+    } else {
+      None
+    }
+
+    val json =
+      ("event" -> data("event")) ~
+      ("entityType" -> "user") ~
+      ("entityId" -> data("userId")) ~
+      ("eventTime" -> data("timestamp")) ~
+      ("properties" -> (
+        ("context" -> context) ~
+        ("anotherProperty1" -> data("anotherProperty1").toInt) ~
+        ("anotherProperty2" -> data.get("anotherProperty2"))
+      ))
+    json
+  }
+
+
+  def userActionItemToEventJson(data: Map[String, String]): JObject = {
+    import org.json4s.JsonDSL._
+
+    val json =
+      ("event" -> data("event")) ~
+      ("entityType" -> "user") ~
+      ("entityId" -> data("userId")) ~
+      ("targetEntityType" -> "item") ~
+      ("targetEntityId" -> data("itemId")) ~
+      ("eventTime" -> data("timestamp")) ~
+      ("properties" -> (
+        ("context" -> (
+          ("ip" -> data("context[ip]")) ~
+          ("prop1" -> data("context[prop1]").toDouble) ~
+          ("prop2" -> data("context[prop2]"))
+        )) ~
+        ("anotherPropertyA" -> data.get("anotherPropertyA").map(_.toDouble)) ~
+        ("anotherPropertyB" -> data.get("anotherPropertyB").map(_.toBoolean))
+      ))
+    json
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala
new file mode 100644
index 0000000..2129134
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnector.scala
@@ -0,0 +1,153 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks.examplejson
+
+import org.apache.predictionio.data.webhooks.JsonConnector
+import org.apache.predictionio.data.webhooks.ConnectorException
+
+import org.json4s.Formats
+import org.json4s.DefaultFormats
+import org.json4s.JObject
+
+/** Example JsonConnector with following types of webhooks JSON input:
+  *
+  * UserAction
+  *
+  * {
+  *   "type": "userAction"
+  *   "userId": "as34smg4",
+  *   "event": "do_something",
+  *   "context": {
+  *     "ip": "24.5.68.47",
+  *     "prop1": 2.345,
+  *     "prop2": "value1"
+  *   },
+  *   "anotherProperty1": 100,
+  *   "anotherProperty2": "optional1",
+  *   "timestamp": "2015-01-02T00:30:12.984Z"
+  * }
+  *
+  * UserActionItem
+  *
+  * {
+  *   "type": "userActionItem"
+  *   "userId": "as34smg4",
+  *   "event": "do_something_on",
+  *   "itemId": "kfjd312bc",
+  *   "context": {
+  *     "ip": "1.23.4.56",
+  *     "prop1": 2.345,
+  *     "prop2": "value1"
+  *   },
+  *   "anotherPropertyA": 4.567,
+  *   "anotherPropertyB": false,
+  *   "timestamp": "2015-01-15T04:20:23.567Z"
+  * }
+  */
+private[prediction] object ExampleJsonConnector extends JsonConnector {
+
+  implicit val json4sFormats: Formats = DefaultFormats
+
+  override def toEventJson(data: JObject): JObject = {
+    val common = try {
+      data.extract[Common]
+    } catch {
+      case e: Exception => throw new ConnectorException(
+        s"Cannot extract Common field from ${data}. ${e.getMessage()}", e)
+    }
+
+    val json = try {
+      common.`type` match {
+        case "userAction" =>
+          toEventJson(common = common, userAction = data.extract[UserAction])
+        case "userActionItem" =>
+          toEventJson(common = common, userActionItem = data.extract[UserActionItem])
+        case x: String =>
+          throw new ConnectorException(
+            s"Cannot convert unknown type '${x}' to Event JSON.")
+      }
+    } catch {
+      case e: ConnectorException => throw e
+      case e: Exception => throw new ConnectorException(
+        s"Cannot convert ${data} to eventJson. ${e.getMessage()}", e)
+    }
+
+    json
+  }
+
+  def toEventJson(common: Common, userAction: UserAction): JObject = {
+    import org.json4s.JsonDSL._
+
+    // map to EventAPI JSON
+    val json =
+      ("event" -> userAction.event) ~
+        ("entityType" -> "user") ~
+        ("entityId" -> userAction.userId) ~
+        ("eventTime" -> userAction.timestamp) ~
+        ("properties" -> (
+          ("context" -> userAction.context) ~
+            ("anotherProperty1" -> userAction.anotherProperty1) ~
+            ("anotherProperty2" -> userAction.anotherProperty2)
+          ))
+    json
+  }
+
+  def toEventJson(common: Common, userActionItem: UserActionItem): JObject = {
+    import org.json4s.JsonDSL._
+
+    // map to EventAPI JSON
+    val json =
+      ("event" -> userActionItem.event) ~
+        ("entityType" -> "user") ~
+        ("entityId" -> userActionItem.userId) ~
+        ("targetEntityType" -> "item") ~
+        ("targetEntityId" -> userActionItem.itemId) ~
+        ("eventTime" -> userActionItem.timestamp) ~
+        ("properties" -> (
+          ("context" -> userActionItem.context) ~
+            ("anotherPropertyA" -> userActionItem.anotherPropertyA) ~
+            ("anotherPropertyB" -> userActionItem.anotherPropertyB)
+          ))
+    json
+  }
+
+  // Common required fields
+  case class Common(
+    `type`: String
+  )
+
+  // User Actions fields
+  case class UserAction (
+    userId: String,
+    event: String,
+    context: Option[JObject],
+    anotherProperty1: Int,
+    anotherProperty2: Option[String],
+    timestamp: String
+  )
+
+  // UserActionItem fields
+  case class UserActionItem (
+    userId: String,
+    event: String,
+    itemId: String,
+    context: JObject,
+    anotherPropertyA: Option[Double],
+    anotherPropertyB: Option[Boolean],
+    timestamp: String
+  )
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala
new file mode 100644
index 0000000..abf8a7f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnector.scala
@@ -0,0 +1,305 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+
+package org.apache.predictionio.data.webhooks.mailchimp
+
+import org.apache.predictionio.data.webhooks.FormConnector
+import org.apache.predictionio.data.webhooks.ConnectorException
+import org.apache.predictionio.data.storage.EventValidation
+import org.apache.predictionio.data.Utils
+
+import org.json4s.JObject
+
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+
+private[prediction] object MailChimpConnector extends FormConnector {
+
+  override
+  def toEventJson(data: Map[String, String]): JObject = {
+
+    val json = data.get("type") match {
+      case Some("subscribe") => subscribeToEventJson(data)
+      // UNSUBSCRIBE
+      case Some("unsubscribe") => unsubscribeToEventJson(data)
+      // PROFILE UPDATES
+      case Some("profile") => profileToEventJson(data)
+      // EMAIL UPDATE
+      case Some("upemail") => upemailToEventJson(data)
+      // CLEANED EMAILS
+      case Some("cleaned") => cleanedToEventJson(data)
+      // CAMPAIGN SENDING STATUS
+      case Some("campaign") => campaignToEventJson(data)
+      // invalid type
+      case Some(x) => throw new ConnectorException(
+        s"Cannot convert unknown MailChimp data type ${x} to event JSON")
+      case None => throw new ConnectorException(
+        s"The field 'type' is required for MailChimp data.")
+    }
+    json
+  }
+
+
+  val mailChimpDateTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
+    .withZone(EventValidation.defaultTimeZone)
+
+  def parseMailChimpDateTime(s: String): DateTime = {
+    mailChimpDateTimeFormat.parseDateTime(s)
+  }
+
+  def subscribeToEventJson(data: Map[String, String]): JObject = {
+
+    import org.json4s.JsonDSL._
+
+    /*
+    "type": "subscribe",
+    "fired_at": "2009-03-26 21:35:57",
+    "data[id]": "8a25ff1d98",
+    "data[list_id]": "a6b5da1054",
+    "data[email]": "api@mailchimp.com",
+    "data[email_type]": "html",
+    "data[merges][EMAIL]": "api@mailchimp.com",
+    "data[merges][FNAME]": "MailChimp",
+    "data[merges][LNAME]": "API",
+    "data[merges][INTERESTS]": "Group1,Group2",
+    "data[ip_opt]": "10.20.10.30",
+    "data[ip_signup]": "10.20.10.30"
+    */
+
+    // convert to ISO8601 format
+    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+    // TODO: handle optional fields
+    val json =
+      ("event" -> "subscribe") ~
+      ("entityType" -> "user") ~
+      ("entityId" -> data("data[id]")) ~
+      ("targetEntityType" -> "list") ~
+      ("targetEntityId" -> data("data[list_id]")) ~
+      ("eventTime" -> eventTime) ~
+      ("properties" -> (
+        ("email" -> data("data[email]")) ~
+        ("email_type" -> data("data[email_type]")) ~
+        ("merges" -> (
+          ("EMAIL" -> data("data[merges][EMAIL]")) ~
+          ("FNAME" -> data("data[merges][FNAME]"))) ~
+          ("LNAME" -> data("data[merges][LNAME]")) ~
+          ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
+        )) ~
+        ("ip_opt" -> data("data[ip_opt]")) ~
+        ("ip_signup" -> data("data[ip_signup]")
+      ))
+
+    json
+
+  }
+
+  def unsubscribeToEventJson(data: Map[String, String]): JObject = {
+
+    import org.json4s.JsonDSL._
+
+    /*
+    "action" will either be "unsub" or "delete".
+    The reason will be "manual" unless caused by a spam complaint - then it will be "abuse"
+
+    "type": "unsubscribe",
+    "fired_at": "2009-03-26 21:40:57",
+    "data[action]": "unsub",
+    "data[reason]": "manual",
+    "data[id]": "8a25ff1d98",
+    "data[list_id]": "a6b5da1054",
+    "data[email]": "api+unsub@mailchimp.com",
+    "data[email_type]": "html",
+    "data[merges][EMAIL]": "api+unsub@mailchimp.com",
+    "data[merges][FNAME]": "MailChimp",
+    "data[merges][LNAME]": "API",
+    "data[merges][INTERESTS]": "Group1,Group2",
+    "data[ip_opt]": "10.20.10.30",
+    "data[campaign_id]": "cb398d21d2",
+    */
+
+    // convert to ISO8601 format
+    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+    val json =
+      ("event" -> "unsubscribe") ~
+      ("entityType" -> "user") ~
+      ("entityId" -> data("data[id]")) ~
+      ("targetEntityType" -> "list") ~
+      ("targetEntityId" -> data("data[list_id]")) ~
+      ("eventTime" -> eventTime) ~
+      ("properties" -> (
+        ("action" -> data("data[action]")) ~
+        ("reason" -> data("data[reason]")) ~
+        ("email" -> data("data[email]")) ~
+        ("email_type" -> data("data[email_type]")) ~
+        ("merges" -> (
+          ("EMAIL" -> data("data[merges][EMAIL]")) ~
+          ("FNAME" -> data("data[merges][FNAME]"))) ~
+          ("LNAME" -> data("data[merges][LNAME]")) ~
+          ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
+        )) ~
+        ("ip_opt" -> data("data[ip_opt]")) ~
+        ("campaign_id" -> data("data[campaign_id]")
+      ))
+
+    json
+
+  }
+
+  def profileToEventJson(data: Map[String, String]): JObject = {
+
+    import org.json4s.JsonDSL._
+
+    /*
+    "type": "profile",
+    "fired_at": "2009-03-26 21:31:21",
+    "data[id]": "8a25ff1d98",
+    "data[list_id]": "a6b5da1054",
+    "data[email]": "api@mailchimp.com",
+    "data[email_type]": "html",
+    "data[merges][EMAIL]": "api@mailchimp.com",
+    "data[merges][FNAME]": "MailChimp",
+    "data[merges][LNAME]": "API",
+    "data[merges][INTERESTS]": "Group1,Group2", \\OPTIONAL
+    "data[ip_opt]": "10.20.10.30"
+    */
+
+    // convert to ISO8601 format
+    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+    val json =
+      ("event" -> "profile") ~
+      ("entityType" -> "user") ~
+      ("entityId" -> data("data[id]")) ~
+      ("targetEntityType" -> "list") ~
+      ("targetEntityId" -> data("data[list_id]")) ~
+      ("eventTime" -> eventTime) ~
+      ("properties" -> (
+        ("email" -> data("data[email]")) ~
+        ("email_type" -> data("data[email_type]")) ~
+        ("merges" -> (
+          ("EMAIL" -> data("data[merges][EMAIL]")) ~
+          ("FNAME" -> data("data[merges][FNAME]"))) ~
+          ("LNAME" -> data("data[merges][LNAME]")) ~
+          ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
+        )) ~
+        ("ip_opt" -> data("data[ip_opt]")
+      ))
+
+    json
+
+  }
+
+  def upemailToEventJson(data: Map[String, String]): JObject = {
+
+    import org.json4s.JsonDSL._
+
+    /*
+    "type": "upemail",
+    "fired_at": "2009-03-26 22:15:09",
+    "data[list_id]": "a6b5da1054",
+    "data[new_id]": "51da8c3259",
+    "data[new_email]": "api+new@mailchimp.com",
+    "data[old_email]": "api+old@mailchimp.com"
+    */
+
+    // convert to ISO8601 format
+    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+    val json =
+      ("event" -> "upemail") ~
+      ("entityType" -> "user") ~
+      ("entityId" -> data("data[new_id]")) ~
+      ("targetEntityType" -> "list") ~
+      ("targetEntityId" -> data("data[list_id]")) ~
+      ("eventTime" -> eventTime) ~
+      ("properties" -> (
+        ("new_email" -> data("data[new_email]")) ~
+        ("old_email" -> data("data[old_email]"))
+      ))
+
+    json
+
+  }
+
+  def cleanedToEventJson(data: Map[String, String]): JObject = {
+
+    import org.json4s.JsonDSL._
+
+    /*
+    Reason will be one of "hard" (for hard bounces) or "abuse"
+    "type": "cleaned",
+    "fired_at": "2009-03-26 22:01:00",
+    "data[list_id]": "a6b5da1054",
+    "data[campaign_id]": "4fjk2ma9xd",
+    "data[reason]": "hard",
+    "data[email]": "api+cleaned@mailchimp.com"
+    */
+
+    // convert to ISO8601 format
+    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+    val json =
+      ("event" -> "cleaned") ~
+      ("entityType" -> "list") ~
+      ("entityId" -> data("data[list_id]")) ~
+      ("eventTime" -> eventTime) ~
+      ("properties" -> (
+        ("campaignId" -> data("data[campaign_id]")) ~
+        ("reason" -> data("data[reason]")) ~
+        ("email" -> data("data[email]"))
+      ))
+
+    json
+
+  }
+
+  def campaignToEventJson(data: Map[String, String]): JObject = {
+
+    import org.json4s.JsonDSL._
+
+    /*
+    "type": "campaign",
+    "fired_at": "2009-03-26 21:31:21",
+    "data[id]": "5aa2102003",
+    "data[subject]": "Test Campaign Subject",
+    "data[status]": "sent",
+    "data[reason]": "",
+    "data[list_id]": "a6b5da1054"
+    */
+
+    // convert to ISO8601 format
+    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
+
+    val json =
+      ("event" -> "campaign") ~
+      ("entityType" -> "campaign") ~
+      ("entityId" -> data("data[id]")) ~
+      ("targetEntityType" -> "list") ~
+      ("targetEntityId" -> data("data[list_id]")) ~
+      ("eventTime" -> eventTime) ~
+      ("properties" -> (
+        ("subject" -> data("data[subject]")) ~
+        ("status" -> data("data[status]")) ~
+        ("reason" -> data("data[reason]"))
+      ))
+
+    json
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala b/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala
new file mode 100644
index 0000000..b7548b0
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnector.scala
@@ -0,0 +1,306 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks.segmentio
+
+import org.apache.predictionio.data.webhooks.{ConnectorException, JsonConnector}
+import org.json4s._
+
+private[prediction] object SegmentIOConnector extends JsonConnector {
+
+  // private lazy val supportedAPI = Vector("2", "2.0", "2.0.0")
+
+  implicit val json4sFormats: Formats = DefaultFormats
+
+  override
+  def toEventJson(data: JObject): JObject = {
+    try {
+      val version: String = data.values("version").toString
+/*
+      if (!supportedAPI.contains(version)) {
+        throw new ConnectorException(
+          s"Supported segment.io API versions: [2]. got [$version]"
+        )
+      }
+*/
+    } catch { case _: Throwable \u21d2
+      throw new ConnectorException(s"Failed to get segment.io API version.")
+    }
+
+    val common = try {
+      data.extract[Common]
+    } catch {
+      case e: Throwable \u21d2 throw new ConnectorException(
+        s"Cannot extract Common field from $data. ${e.getMessage}", e
+      )
+    }
+
+    try {
+      common.`type` match {
+        case "identify" \u21d2
+          toEventJson(
+            common = common,
+            identify = data.extract[Events.Identify]
+          )
+
+        case "track" \u21d2
+          toEventJson(
+            common = common,
+            track = data.extract[Events.Track]
+          )
+
+        case "alias" \u21d2
+          toEventJson(
+            common = common,
+            alias = data.extract[Events.Alias]
+          )
+
+        case "page" \u21d2
+          toEventJson(
+            common = common,
+            page = data.extract[Events.Page]
+          )
+
+        case "screen" \u21d2
+          toEventJson(
+            common = common,
+            screen = data.extract[Events.Screen]
+          )
+
+        case "group" \u21d2
+          toEventJson(
+            common = common,
+            group = data.extract[Events.Group]
+          )
+
+        case _ \u21d2
+          throw new ConnectorException(
+            s"Cannot convert unknown type ${common.`type`} to event JSON."
+          )
+      }
+    } catch {
+      case e: ConnectorException => throw e
+      case e: Exception =>
+        throw new ConnectorException(
+          s"Cannot convert $data to event JSON. ${e.getMessage}", e
+        )
+    }
+  }
+
+  def toEventJson(common: Common, identify: Events.Identify ): JObject = {
+    import org.json4s.JsonDSL._
+    val eventProperties = "traits" \u2192 identify.traits
+    toJson(common, eventProperties)
+  }
+
+  def toEventJson(common: Common, track: Events.Track): JObject = {
+    import org.json4s.JsonDSL._
+    val eventProperties =
+      ("properties" \u2192 track.properties) ~
+      ("event" \u2192 track.event)
+    toJson(common, eventProperties)
+  }
+
+  def toEventJson(common: Common, alias: Events.Alias): JObject = {
+    import org.json4s.JsonDSL._
+    toJson(common, "previous_id" \u2192 alias.previous_id)
+  }
+
+  def toEventJson(common: Common, screen: Events.Screen): JObject = {
+    import org.json4s.JsonDSL._
+    val eventProperties =
+      ("name" \u2192 screen.name) ~
+      ("properties" \u2192 screen.properties)
+    toJson(common, eventProperties)
+  }
+
+  def toEventJson(common: Common, page: Events.Page): JObject = {
+    import org.json4s.JsonDSL._
+    val eventProperties =
+      ("name" \u2192 page.name) ~
+      ("properties" \u2192 page.properties)
+    toJson(common, eventProperties)
+  }
+
+  def toEventJson(common: Common, group: Events.Group): JObject = {
+    import org.json4s.JsonDSL._
+    val eventProperties =
+      ("group_id" \u2192 group.group_id) ~
+      ("traits" \u2192 group.traits)
+    toJson(common, eventProperties)
+  }
+
+  private def toJson(common: Common, props: JObject): JsonAST.JObject = {
+    val commonFields = commonToJson(common)
+    JObject(("properties" \u2192 properties(common, props)) :: commonFields.obj)
+  }
+
+  private def properties(common: Common, eventProps: JObject): JObject = {
+    import org.json4s.JsonDSL._
+    common.context map { context \u21d2
+      try {
+        ("context" \u2192 Extraction.decompose(context)) ~ eventProps
+      } catch {
+        case e: Throwable \u21d2
+          throw new ConnectorException(
+            s"Cannot convert $context to event JSON. ${e.getMessage }", e
+          )
+      }
+    } getOrElse eventProps
+  }
+
+  private def commonToJson(common: Common): JObject =
+    commonToJson(common, common.`type`)
+
+  private def commonToJson(common: Common, typ: String): JObject = {
+    import org.json4s.JsonDSL._
+      common.user_id.orElse(common.anonymous_id) match {
+        case Some(userId) \u21d2
+          ("event" \u2192 typ) ~
+            ("entityType" \u2192 "user") ~
+            ("entityId" \u2192 userId) ~
+            ("eventTime" \u2192 common.timestamp)
+
+        case None \u21d2
+          throw new ConnectorException(
+            "there was no `userId` or `anonymousId` in the common fields."
+          )
+      }
+  }
+}
+
+object Events {
+
+  private[prediction] case class Track(
+    event: String,
+    properties: Option[JObject] = None
+  )
+
+  private[prediction] case class Alias(previous_id: String, user_id: String)
+
+  private[prediction] case class Group(
+    group_id: String,
+    traits: Option[JObject] = None
+  )
+
+  private[prediction] case class Screen(
+    name: Option[String] = None,
+    properties: Option[JObject] = None
+  )
+
+  private[prediction] case class Page(
+    name: Option[String] = None,
+    properties: Option[JObject] = None
+  )
+
+  private[prediction] case class Identify(
+    user_id: String,
+    traits: Option[JObject]
+  )
+
+}
+
+object Common {
+
+  private[prediction] case class Integrations(
+    All: Boolean = false,
+    Mixpanel: Boolean = false,
+    Marketo: Boolean = false,
+    Salesforse: Boolean = false
+  )
+
+  private[prediction] case class Context(
+    ip: String,
+    library: Library,
+    user_agent: String,
+    app: Option[App] = None,
+    campaign: Option[Campaign] = None,
+    device: Option[Device] = None,
+    network: Option[Network] = None,
+    location: Option[Location] = None,
+    os: Option[OS] = None,
+    referrer: Option[Referrer] = None,
+    screen: Option[Screen] = None,
+    timezone: Option[String] = None
+  )
+
+  private[prediction] case class Screen(width: Int, height: Int, density: Int)
+
+  private[prediction] case class Referrer(id: String, `type`: String)
+
+  private[prediction] case class OS(name: String, version: String)
+
+  private[prediction] case class Location(
+    city: Option[String] = None,
+    country: Option[String] = None,
+    latitude: Option[Double] = None,
+    longitude: Option[Double] = None,
+    speed: Option[Int] = None
+  )
+
+  case class Page(
+    path: String,
+    referrer: String,
+    search: String,
+    title: String,
+    url: String
+  )
+
+  private[prediction] case class Network(
+    bluetooth: Option[Boolean] = None,
+    carrier: Option[String] = None,
+    cellular: Option[Boolean] = None,
+    wifi: Option[Boolean] = None
+  )
+
+  private[prediction] case class Library(name: String, version: String)
+
+  private[prediction] case class Device(
+    id: Option[String] = None,
+    advertising_id: Option[String] = None,
+    ad_tracking_enabled: Option[Boolean] = None,
+    manufacturer: Option[String] = None,
+    model: Option[String] = None,
+    name: Option[String] = None,
+    `type`: Option[String] = None,
+    token: Option[String] = None
+  )
+
+  private[prediction] case class Campaign(
+    name: Option[String] = None,
+    source: Option[String] = None,
+    medium: Option[String] = None,
+    term: Option[String] = None,
+    content: Option[String] = None
+  )
+
+  private[prediction] case class App(
+    name: Option[String] = None,
+    version: Option[String] = None,
+    build: Option[String] = None
+  )
+
+}
+
+private[prediction] case class Common(
+  `type`: String,
+  sent_at: String,
+  timestamp: String,
+  version: String,
+  anonymous_id: Option[String] = None,
+  user_id: Option[String] = None,
+  context: Option[Common.Context] = None,
+  integrations: Option[Common.Integrations] = None
+)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala b/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala
deleted file mode 100644
index 9f7a74e..0000000
--- a/data/src/test/scala/io/prediction/data/api/EventServiceSpec.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.api
-
-import io.prediction.data.storage.Storage
-
-import akka.testkit.TestProbe
-import akka.actor.ActorSystem
-import akka.actor.Props
-
-import spray.http.HttpEntity
-import spray.http.HttpResponse
-import spray.http.ContentTypes
-import spray.httpx.RequestBuilding.Get
-
-import org.specs2.mutable.Specification
-
-class EventServiceSpec extends Specification {
-
-  val system = ActorSystem("EventServiceSpecSystem")
-
-  val eventClient = Storage.getLEvents()
-  val accessKeysClient = Storage.getMetaDataAccessKeys()
-  val channelsClient = Storage.getMetaDataChannels()
-  
-  val eventServiceActor = system.actorOf(
-    Props(
-      new EventServiceActor(
-        eventClient,
-        accessKeysClient,
-        channelsClient,
-        EventServerConfig()
-      )
-    )
-  )
-
-  "GET / request" should {
-    "properly produce OK HttpResponses" in {
-      val probe = TestProbe()(system)
-      probe.send(eventServiceActor, Get("/"))
-      probe.expectMsg(
-        HttpResponse(
-          200,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"status":"alive"}"""
-          )
-        )
-      )
-      success
-    }
-  }
-
-  step(system.shutdown())
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala b/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala
deleted file mode 100644
index bae0f0b..0000000
--- a/data/src/test/scala/io/prediction/data/api/SegmentIOAuthSpec.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-package io.prediction.data.api
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import io.prediction.data.storage._
-import org.joda.time.DateTime
-import org.specs2.mutable.Specification
-import spray.http.HttpHeaders.RawHeader
-import spray.http.{ContentTypes, HttpEntity, HttpResponse}
-import spray.httpx.RequestBuilding._
-import sun.misc.BASE64Encoder
-
-import scala.concurrent.{Future, ExecutionContext}
-
-class SegmentIOAuthSpec extends Specification {
-
-  val system = ActorSystem("EventServiceSpecSystem")
-  sequential
-  isolated
-  val eventClient = new LEvents {
-    override def init(appId: Int, channelId: Option[Int]): Boolean = true
-
-    override def futureInsert(event: Event, appId: Int, channelId: Option[Int])
-        (implicit ec: ExecutionContext): Future[String] =
-      Future successful "event_id"
-
-    override def futureFind(
-      appId: Int, channelId: Option[Int], startTime: Option[DateTime],
-      untilTime: Option[DateTime], entityType: Option[String],
-      entityId: Option[String], eventNames: Option[Seq[String]],
-      targetEntityType: Option[Option[String]],
-      targetEntityId: Option[Option[String]], limit: Option[Int],
-      reversed: Option[Boolean])
-        (implicit ec: ExecutionContext): Future[Iterator[Event]] =
-      Future successful List.empty[Event].iterator
-
-    override def futureGet(eventId: String, appId: Int, channelId: Option[Int])
-        (implicit ec: ExecutionContext): Future[Option[Event]] =
-      Future successful None
-
-    override def remove(appId: Int, channelId: Option[Int]): Boolean = true
-
-    override def futureDelete(eventId: String, appId: Int, channelId: Option[Int])
-        (implicit ec: ExecutionContext): Future[Boolean] =
-      Future successful true
-
-    override def close(): Unit = {}
-  }
-  val appId = 0
-  val accessKeysClient = new AccessKeys {
-    override def insert(k: AccessKey): Option[String] = null
-    override def getByAppid(appid: Int): Seq[AccessKey] = null
-    override def update(k: AccessKey): Unit = {}
-    override def delete(k: String): Unit = {}
-    override def getAll(): Seq[AccessKey] = null
-
-    override def get(k: String): Option[AccessKey] =
-      k match {
-        case "abc" \u21d2 Some(AccessKey(k, appId, Seq.empty))
-        case _ \u21d2 None
-      }
-  }
-
-  val channelsClient = Storage.getMetaDataChannels()
-  val eventServiceActor = system.actorOf(
-    Props(
-      new EventServiceActor(
-        eventClient,
-        accessKeysClient,
-        channelsClient,
-        EventServerConfig()
-      )
-    )
-  )
-
-  val base64Encoder = new BASE64Encoder
-
-  "Event Service" should {
-
-    "reject with CredentialsRejected with invalid credentials" in {
-      val accessKey = "abc123:"
-      val probe = TestProbe()(system)
-      probe.send(
-        eventServiceActor,
-        Post("/webhooks/segmentio.json")
-          .withHeaders(
-            List(
-              RawHeader("Authorization", s"Basic $accessKey")
-            )
-          )
-      )
-      probe.expectMsg(
-        HttpResponse(
-          401,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"message":"Invalid accessKey."}"""
-          )
-        )
-      )
-      success
-    }
-
-    "reject with CredentialsMissed without credentials" in {
-      val probe = TestProbe()(system)
-      probe.send(
-        eventServiceActor,
-        Post("/webhooks/segmentio.json")
-      )
-      probe.expectMsg(
-        HttpResponse(
-          401,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"message":"Missing accessKey."}"""
-          )
-        )
-      )
-      success
-    }
-
-    "process SegmentIO identity request properly" in {
-      val jsonReq =
-        """
-          |{
-          |  "anonymous_id": "507f191e810c19729de860ea",
-          |  "channel": "browser",
-          |  "context": {
-          |    "ip": "8.8.8.8",
-          |    "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)"
-          |  },
-          |  "message_id": "022bb90c-bbac-11e4-8dfc-aa07a5b093db",
-          |  "timestamp": "2015-02-23T22:28:55.387Z",
-          |  "sent_at": "2015-02-23T22:28:55.111Z",
-          |  "traits": {
-          |    "name": "Peter Gibbons",
-          |    "email": "peter@initech.com",
-          |    "plan": "premium",
-          |    "logins": 5
-          |  },
-          |  "type": "identify",
-          |  "user_id": "97980cfea0067",
-          |  "version": "2"
-          |}
-        """.stripMargin
-
-      val accessKey = "abc:"
-      val accessKeyEncoded = base64Encoder.encodeBuffer(accessKey.getBytes)
-      val probe = TestProbe()(system)
-      probe.send(
-        eventServiceActor,
-        Post(
-          "/webhooks/segmentio.json",
-          HttpEntity(ContentTypes.`application/json`, jsonReq.getBytes)
-        ).withHeaders(
-            List(
-              RawHeader("Authorization", s"Basic $accessKeyEncoded")
-            )
-          )
-      )
-      probe.expectMsg(
-        HttpResponse(
-          201,
-          HttpEntity(
-            contentType = ContentTypes.`application/json`,
-            string = """{"eventId":"event_id"}"""
-          )
-        )
-      )
-      success
-    }
-  }
-
-  step(system.shutdown())
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala b/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala
deleted file mode 100644
index e6d28b3..0000000
--- a/data/src/test/scala/io/prediction/data/storage/BiMapSpec.scala
+++ /dev/null
@@ -1,196 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import org.specs2.mutable._
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.SparkConf
-import org.apache.spark.rdd.RDD
-
-class BiMapSpec extends Specification {
-
-  System.clearProperty("spark.driver.port")
-  System.clearProperty("spark.hostPort")
-  val sc = new SparkContext("local[4]", "BiMapSpec test")
-
-  "BiMap created with map" should {
-
-    val keys = Seq(1, 4, 6)
-    val orgValues = Seq(2, 5, 7)
-    val org = keys.zip(orgValues).toMap
-    val bi = BiMap(org)
-
-    "return correct values for each key of original map" in {
-      val biValues = keys.map(k => bi(k))
-
-      biValues must beEqualTo(orgValues)
-    }
-
-    "get return Option[V]" in {
-      val checkKeys = keys ++ Seq(12345)
-      val biValues = checkKeys.map(k => bi.get(k))
-      val expected = orgValues.map(Some(_)) ++ Seq(None)
-
-      biValues must beEqualTo(expected)
-    }
-
-    "getOrElse return value for each key of original map" in {
-      val biValues = keys.map(k => bi.getOrElse(k, -1))
-
-      biValues must beEqualTo(orgValues)
-    }
-
-    "getOrElse return default values for invalid key" in {
-      val keys = Seq(999, -1, -2)
-      val defaults = Seq(1234, 5678, 987)
-      val biValues = keys.zip(defaults).map{ case (k,d) => bi.getOrElse(k, d) }
-
-      biValues must beEqualTo(defaults)
-    }
-
-    "contains() returns true/false correctly" in {
-      val checkKeys = keys ++ Seq(12345)
-      val biValues = checkKeys.map(k => bi.contains(k))
-      val expected = orgValues.map(_ => true) ++ Seq(false)
-
-      biValues must beEqualTo(expected)
-    }
-
-    "same size as original map" in {
-      (bi.size) must beEqualTo(org.size)
-    }
-
-    "take(2) returns BiMap of size 2" in {
-      bi.take(2).size must beEqualTo(2)
-    }
-
-    "toMap contain same element as original map" in {
-      (bi.toMap) must beEqualTo(org)
-    }
-
-    "toSeq contain same element as original map" in {
-      (bi.toSeq) must containTheSameElementsAs(org.toSeq)
-    }
-
-    "inverse and return correct keys for each values of original map" in {
-      val biKeys = orgValues.map(v => bi.inverse(v))
-      biKeys must beEqualTo(keys)
-    }
-
-    "inverse with same size" in {
-      bi.inverse.size must beEqualTo(org.size)
-    }
-
-    "inverse's inverse reference back to the same original object" in {
-      // NOTE: reference equality
-      bi.inverse.inverse == bi
-    }
-  }
-
-  "BiMap created with duplicated values in map" should {
-    val dup = Map(1 -> 2, 4 -> 7, 6 -> 7)
-    "return IllegalArgumentException" in {
-      BiMap(dup) must throwA[IllegalArgumentException]
-    }
-  }
-
-  "BiMap.stringLong and stringInt" should {
-
-    "create BiMap from set of string" in {
-      val keys = Set("a", "b", "foo", "bar")
-      val values: Seq[Long] = Seq(0, 1, 2, 3)
-
-      val bi = BiMap.stringLong(keys)
-      val biValues = keys.map(k => bi(k))
-
-      val biInt = BiMap.stringInt(keys)
-      val valuesInt: Seq[Int] = values.map(_.toInt)
-      val biIntValues = keys.map(k => biInt(k))
-
-      biValues must containTheSameElementsAs(values) and
-        (biIntValues must containTheSameElementsAs(valuesInt))
-    }
-
-    "create BiMap from Array of unique string" in {
-      val keys = Array("a", "b", "foo", "bar")
-      val values: Seq[Long] = Seq(0, 1, 2, 3)
-
-      val bi = BiMap.stringLong(keys)
-      val biValues = keys.toSeq.map(k => bi(k))
-
-      val biInt = BiMap.stringInt(keys)
-      val valuesInt: Seq[Int] = values.map(_.toInt)
-      val biIntValues = keys.toSeq.map(k => biInt(k))
-
-      biValues must containTheSameElementsAs(values) and
-        (biIntValues must containTheSameElementsAs(valuesInt))
-    }
-
-    "not guarantee sequential index for Array with duplicated string" in {
-      val keys = Array("a", "b", "foo", "bar", "a", "b", "x")
-      val dupValues: Seq[Long] = Seq(0, 1, 2, 3, 4, 5, 6)
-      val values = keys.zip(dupValues).toMap.values.toSeq
-
-      val bi = BiMap.stringLong(keys)
-      val biValues = keys.toSet[String].map(k => bi(k))
-
-      val biInt = BiMap.stringInt(keys)
-      val valuesInt: Seq[Int] = values.map(_.toInt)
-      val biIntValues = keys.toSet[String].map(k => biInt(k))
-
-      biValues must containTheSameElementsAs(values) and
-        (biIntValues must containTheSameElementsAs(valuesInt))
-    }
-
-    "create BiMap from RDD[String]" in {
-
-      val keys = Seq("a", "b", "foo", "bar")
-      val values: Seq[Long] = Seq(0, 1, 2, 3)
-      val rdd = sc.parallelize(keys)
-
-      val bi = BiMap.stringLong(rdd)
-      val biValues = keys.map(k => bi(k))
-
-      val biInt = BiMap.stringInt(rdd)
-      val valuesInt: Seq[Int] = values.map(_.toInt)
-      val biIntValues = keys.map(k => biInt(k))
-
-      biValues must containTheSameElementsAs(values) and
-        (biIntValues must containTheSameElementsAs(valuesInt))
-    }
-
-    "create BiMap from RDD[String] with duplicated string" in {
-
-      val keys = Seq("a", "b", "foo", "bar", "a", "b", "x")
-      val values: Seq[Long] = Seq(0, 1, 2, 3, 4)
-      val rdd = sc.parallelize(keys)
-
-      val bi = BiMap.stringLong(rdd)
-      val biValues = keys.distinct.map(k => bi(k))
-
-      val biInt = BiMap.stringInt(rdd)
-      val valuesInt: Seq[Int] = values.map(_.toInt)
-      val biIntValues = keys.distinct.map(k => biInt(k))
-
-      biValues must containTheSameElementsAs(values) and
-        (biIntValues must containTheSameElementsAs(valuesInt))
-    }
-  }
-
-  step(sc.stop())
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala b/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala
deleted file mode 100644
index 97e9b09..0000000
--- a/data/src/test/scala/io/prediction/data/storage/DataMapSpec.scala
+++ /dev/null
@@ -1,243 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import org.specs2.mutable._
-
-class DataMapSpec extends Specification {
-
-  "DataMap" should {
-
-    val properties = DataMap("""
-      {
-        "prop1" : 1,
-        "prop2" : "value2",
-        "prop3" : [1, 2, 3],
-        "prop4" : true,
-        "prop5" : ["a", "b", "c", "c"],
-        "prop6" : 4.56
-      }
-      """)
-
-    "get Int data" in {
-      properties.get[Int]("prop1") must beEqualTo(1)
-      properties.getOpt[Int]("prop1") must beEqualTo(Some(1))
-    }
-
-    "get String data" in {
-      properties.get[String]("prop2") must beEqualTo("value2")
-      properties.getOpt[String]("prop2") must beEqualTo(Some("value2"))
-    }
-
-    "get List of Int data" in {
-      properties.get[List[Int]]("prop3") must beEqualTo(List(1,2,3))
-      properties.getOpt[List[Int]]("prop3") must beEqualTo(Some(List(1,2,3)))
-    }
-
-    "get Boolean data" in {
-      properties.get[Boolean]("prop4") must beEqualTo(true)
-      properties.getOpt[Boolean]("prop4") must beEqualTo(Some(true))
-    }
-
-    "get List of String data" in {
-      properties.get[List[String]]("prop5") must beEqualTo(List("a", "b", "c", "c"))
-      properties.getOpt[List[String]]("prop5") must beEqualTo(Some(List("a", "b", "c", "c")))
-    }
-
-    "get Set of String data" in {
-      properties.get[Set[String]]("prop5") must beEqualTo(Set("a", "b", "c"))
-      properties.getOpt[Set[String]]("prop5") must beEqualTo(Some(Set("a", "b", "c")))
-    }
-
-    "get Double data" in {
-      properties.get[Double]("prop6") must beEqualTo(4.56)
-      properties.getOpt[Double]("prop6") must beEqualTo(Some(4.56))
-    }
-
-    "get empty optional Int data" in {
-      properties.getOpt[Int]("prop9999") must beEqualTo(None)
-    }
-
-  }
-
-  "DataMap with multi-level data" should {
-    val properties = DataMap("""
-      {
-        "context": {
-          "ip": "1.23.4.56",
-          "prop1": 2.345
-          "prop2": "value1",
-          "prop4": [1, 2, 3]
-        },
-        "anotherPropertyA": 4.567,
-        "anotherPropertyB": false
-      }
-      """)
-
-    "get case class data" in {
-      val expected = DataMapSpec.Context(
-        ip = "1.23.4.56",
-        prop1 = Some(2.345),
-        prop2 = Some("value1"),
-        prop3 = None,
-        prop4 = List(1,2,3)
-      )
-
-      properties.get[DataMapSpec.Context]("context") must beEqualTo(expected)
-    }
-
-    "get empty optional case class data" in {
-      properties.getOpt[DataMapSpec.Context]("context999") must beEqualTo(None)
-    }
-
-    "get double data" in {
-      properties.get[Double]("anotherPropertyA") must beEqualTo(4.567)
-    }
-
-    "get boolean data" in {
-      properties.get[Boolean]("anotherPropertyB") must beEqualTo(false)
-    }
-  }
-
-  "DataMap extract" should {
-
-    "extract to case class object" in {
-      val properties = DataMap("""
-        {
-          "prop1" : 1,
-          "prop2" : "value2",
-          "prop3" : [1, 2, 3],
-          "prop4" : true,
-          "prop5" : ["a", "b", "c", "c"],
-          "prop6" : 4.56
-        }
-        """)
-
-      val result = properties.extract[DataMapSpec.BasicProperty]
-      val expected = DataMapSpec.BasicProperty(
-        prop1 = 1,
-        prop2 = "value2",
-        prop3 = List(1,2,3),
-        prop4 = true,
-        prop5 = List("a", "b", "c", "c"),
-        prop6 = 4.56
-      )
-
-      result must beEqualTo(expected)
-    }
-
-    "extract with optional fields" in {
-      val propertiesEmpty = DataMap("""{}""")
-      val propertiesSome = DataMap("""
-        {
-          "prop1" : 1,
-          "prop5" : ["a", "b", "c", "c"],
-          "prop6" : 4.56
-        }
-        """)
-
-      val resultEmpty = propertiesEmpty.extract[DataMapSpec.OptionProperty]
-      val expectedEmpty = DataMapSpec.OptionProperty(
-        prop1 = None,
-        prop2 = None,
-        prop3 = None,
-        prop4 = None,
-        prop5 = None,
-        prop6 = None
-      )
-
-      val resultSome = propertiesSome.extract[DataMapSpec.OptionProperty]
-      val expectedSome = DataMapSpec.OptionProperty(
-        prop1 = Some(1),
-        prop2 = None,
-        prop3 = None,
-        prop4 = None,
-        prop5 = Some(List("a", "b", "c", "c")),
-        prop6 = Some(4.56)
-      )
-
-      resultEmpty must beEqualTo(expectedEmpty)
-      resultSome must beEqualTo(expectedSome)
-    }
-
-    "extract to multi-level object" in {
-      val properties = DataMap("""
-        {
-          "context": {
-            "ip": "1.23.4.56",
-            "prop1": 2.345
-            "prop2": "value1",
-            "prop4": [1, 2, 3]
-          },
-          "anotherPropertyA": 4.567,
-          "anotherPropertyB": false
-        }
-        """)
-
-      val result = properties.extract[DataMapSpec.MultiLevelProperty]
-      val expected = DataMapSpec.MultiLevelProperty(
-        context = DataMapSpec.Context(
-          ip = "1.23.4.56",
-          prop1 = Some(2.345),
-          prop2 = Some("value1"),
-          prop3 = None,
-          prop4 = List(1,2,3)
-        ),
-        anotherPropertyA = 4.567,
-        anotherPropertyB = false
-      )
-
-      result must beEqualTo(expected)
-    }
-
-  }
-}
-
-object DataMapSpec {
-
-  // define this case class inside object to avoid case class name conflict with other tests
-  case class Context(
-    ip: String,
-    prop1: Option[Double],
-    prop2: Option[String],
-    prop3: Option[Int],
-    prop4: List[Int]
-  )
-
-  case class BasicProperty(
-    prop1: Int,
-    prop2: String,
-    prop3: List[Int],
-    prop4: Boolean,
-    prop5: List[String],
-    prop6: Double
-  )
-
-  case class OptionProperty(
-    prop1: Option[Int],
-    prop2: Option[String],
-    prop3: Option[List[Int]],
-    prop4: Option[Boolean],
-    prop5: Option[List[String]],
-    prop6: Option[Double]
-  )
-
-  case class MultiLevelProperty(
-    context: Context,
-    anotherPropertyA: Double,
-    anotherPropertyB: Boolean
-  )
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala b/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala
deleted file mode 100644
index 77a66d5..0000000
--- a/data/src/test/scala/io/prediction/data/storage/LEventAggregatorSpec.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import org.specs2.mutable._
-
-import org.json4s.JObject
-import org.json4s.native.JsonMethods.parse
-
-import org.joda.time.DateTime
-
-class LEventAggregatorSpec extends Specification with TestEvents {
-
-  "LEventAggregator.aggregateProperties()" should {
-
-    "aggregate two entities' properties as DataMap correctly" in {
-      val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
-      val result: Map[String, DataMap] =
-        LEventAggregator.aggregateProperties(events.toIterator)
-
-      val expected = Map(
-        "u1" -> DataMap(u1),
-        "u2" -> DataMap(u2)
-      )
-
-      result must beEqualTo(expected)
-    }
-
-    "aggregate two entities' properties as PropertyMap correctly" in {
-      val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
-      val result: Map[String, PropertyMap] =
-        LEventAggregator.aggregateProperties(events.toIterator)
-
-      val expected = Map(
-        "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
-        "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
-      )
-
-      result must beEqualTo(expected)
-    }
-
-
-    "aggregate deleted entity correctly" in {
-      val events = Vector(u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2)
-
-      val result = LEventAggregator.aggregateProperties(events.toIterator)
-      val expected = Map(
-        "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
-      )
-
-      result must beEqualTo(expected)
-    }
-  }
-
-
-  "LEventAggregator.aggregatePropertiesSingle()" should {
-
-    "aggregate single entity properties as DataMap correctly" in {
-        val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2)
-        val eventsIt = events.toIterator
-
-        val result: Option[DataMap] = LEventAggregator
-          .aggregatePropertiesSingle(eventsIt)
-        val expected = DataMap(u1)
-
-        result must beEqualTo(Some(expected))
-    }
-
-    "aggregate single entity properties as PropertyMap correctly" in {
-        val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2)
-        val eventsIt = events.toIterator
-
-        val result: Option[PropertyMap] = LEventAggregator
-          .aggregatePropertiesSingle(eventsIt)
-        val expected = PropertyMap(u1, u1BaseTime, u1LastTime)
-
-        result must beEqualTo(Some(expected))
-    }
-
-    "aggregate deleted entity correctly" in {
-      // put the delete event in the middle
-      val events = Vector(u1e4, u1e2, u1ed, u1e3, u1e1, u1e5)
-      val eventsIt = events.toIterator
-
-      val result = LEventAggregator.aggregatePropertiesSingle(eventsIt)
-
-      result must beEqualTo(None)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala b/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala
deleted file mode 100644
index 5b38cdb..0000000
--- a/data/src/test/scala/io/prediction/data/storage/LEventsSpec.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import org.specs2._
-import org.specs2.specification.Step
-
-class LEventsSpec extends Specification with TestEvents {
-  def is = s2"""
-
-  PredictionIO Storage LEvents Specification
-
-    Events can be implemented by:
-    - HBLEvents ${hbEvents}
-    - JDBCLEvents ${jdbcLEvents}
-
-  """
-
-  def hbEvents = sequential ^ s2"""
-
-    HBLEvents should
-    - behave like any LEvents implementation ${events(hbDO)}
-    - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
-
-  """
-
-  def jdbcLEvents = sequential ^ s2"""
-
-    JDBCLEvents should
-    - behave like any LEvents implementation ${events(jdbcDO)}
-
-  """
-
-  val appId = 1
-
-  def events(eventClient: LEvents) = sequential ^ s2"""
-
-    init default ${initDefault(eventClient)}
-    insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)}
-    insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)}
-    insert and delete by ID ${insertAndDelete(eventClient)}
-    insert test user events ${insertTestUserEvents(eventClient)}
-    find user events ${findUserEvents(eventClient)}
-    aggregate user properties ${aggregateUserProperties(eventClient)}
-    aggregate one user properties ${aggregateOneUserProperties(eventClient)}
-    aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)}
-    init channel ${initChannel(eventClient)}
-    insert 2 events to channel ${insertChannel(eventClient)}
-    insert 1 event to channel and delete by ID  ${insertAndDeleteChannel(eventClient)}
-    find events from channel ${findChannel(eventClient)}
-    remove default ${removeDefault(eventClient)}
-    remove channel ${removeChannel(eventClient)}
-
-  """
-
-  val dbName = "test_pio_storage_events_" + hashCode
-  def hbDO = Storage.getDataObject[LEvents](
-    StorageTestUtils.hbaseSourceName,
-    dbName
-  )
-
-  def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName)
-
-  def initDefault(eventClient: LEvents) = {
-    eventClient.init(appId)
-  }
-
-  def insertAndGetEvents(eventClient: LEvents) = {
-
-    // events from TestEvents trait
-    val listOfEvents = List(r1,r2,r3)
-
-    val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
-
-    val insertedEventId: List[String] = insertResp
-
-    val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
-      .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
-
-    val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
-
-    val getEvents = getResp
-
-    insertedEvent must containTheSameElementsAs(getEvents)
-  }
-
-  def insertAndGetTimezone(eventClient: LEvents) = {
-    val listOfEvents = List(tz1, tz2, tz3)
-
-    val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
-
-    val insertedEventId: List[String] = insertResp
-
-    val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
-      .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
-
-    val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
-
-    val getEvents = getResp
-
-    insertedEvent must containTheSameElementsAs(getEvents)
-  }
-
-  def insertAndDelete(eventClient: LEvents) = {
-    val eventId = eventClient.insert(r2, appId)
-
-    val resultBefore = eventClient.get(eventId, appId)
-
-    val expectedBefore = r2.copy(eventId = Some(eventId))
-
-    val deleteStatus = eventClient.delete(eventId, appId)
-
-    val resultAfter = eventClient.get(eventId, appId)
-
-    (resultBefore must beEqualTo(Some(expectedBefore))) and
-    (deleteStatus must beEqualTo(true)) and
-    (resultAfter must beEqualTo(None))
-  }
-
-  def insertTestUserEvents(eventClient: LEvents) = {
-    // events from TestEvents trait
-    val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
-
-    listOfEvents.map{ eventClient.insert(_, appId) }
-
-    success
-  }
-
-  def findUserEvents(eventClient: LEvents) = {
-
-    val results: List[Event] = eventClient.find(
-      appId = appId,
-      entityType = Some("user"))
-      .toList
-      .map(e => e.copy(eventId = None)) // ignore eventID
-
-    // same events in insertTestUserEvents
-    val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
-
-    results must containTheSameElementsAs(expected)
-  }
-
-  def aggregateUserProperties(eventClient: LEvents) = {
-
-    val result: Map[String, PropertyMap] = eventClient.aggregateProperties(
-      appId = appId,
-      entityType = "user")
-
-    val expected = Map(
-      "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
-      "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
-    )
-
-    result must beEqualTo(expected)
-  }
-
-  def aggregateOneUserProperties(eventClient: LEvents) = {
-    val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
-      appId = appId,
-      entityType = "user",
-      entityId = "u1")
-
-    val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime))
-
-    result must beEqualTo(expected)
-  }
-
-  def aggregateNonExistentUserProperties(eventClient: LEvents) = {
-    val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
-      appId = appId,
-      entityType = "user",
-      entityId = "u999999")
-
-    result must beEqualTo(None)
-  }
-
-  val channelId = 12
-
-  def initChannel(eventClient: LEvents) = {
-    eventClient.init(appId, Some(channelId))
-  }
-
-  def insertChannel(eventClient: LEvents) = {
-
-    // events from TestEvents trait
-    val listOfEvents = List(r4,r5)
-
-    listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) )
-
-    success
-  }
-
-  def insertAndDeleteChannel(eventClient: LEvents) = {
-
-    val eventId = eventClient.insert(r2, appId, Some(channelId))
-
-    val resultBefore = eventClient.get(eventId, appId, Some(channelId))
-
-    val expectedBefore = r2.copy(eventId = Some(eventId))
-
-    val deleteStatus = eventClient.delete(eventId, appId, Some(channelId))
-
-    val resultAfter = eventClient.get(eventId, appId, Some(channelId))
-
-    (resultBefore must beEqualTo(Some(expectedBefore))) and
-    (deleteStatus must beEqualTo(true)) and
-    (resultAfter must beEqualTo(None))
-  }
-
-  def findChannel(eventClient: LEvents) = {
-
-    val results: List[Event] = eventClient.find(
-      appId = appId,
-      channelId = Some(channelId)
-    )
-    .toList
-    .map(e => e.copy(eventId = None)) // ignore eventId
-
-    // same events in insertChannel
-    val expected = List(r4, r5)
-
-    results must containTheSameElementsAs(expected)
-  }
-
-  def removeDefault(eventClient: LEvents) = {
-    eventClient.remove(appId)
-  }
-
-  def removeChannel(eventClient: LEvents) = {
-    eventClient.remove(appId, Some(channelId))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala b/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala
deleted file mode 100644
index b00ec7c..0000000
--- a/data/src/test/scala/io/prediction/data/storage/PEventAggregatorSpec.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import org.specs2.mutable._
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-
-class PEventAggregatorSpec extends Specification with TestEvents {
-
-  System.clearProperty("spark.driver.port")
-  System.clearProperty("spark.hostPort")
-  val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
-
-  "PEventAggregator" should {
-
-    "aggregate two entities' properties as DataMap/PropertyMap correctly" in {
-      val events = sc.parallelize(Seq(
-        u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2))
-
-      val users = PEventAggregator.aggregateProperties(events)
-
-      val userMap = users.collectAsMap.toMap
-      val expectedDM = Map(
-        "u1" -> DataMap(u1),
-        "u2" -> DataMap(u2)
-      )
-
-      val expectedPM = Map(
-        "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
-        "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
-      )
-
-      userMap must beEqualTo(expectedDM)
-      userMap must beEqualTo(expectedPM)
-    }
-
-    "aggregate deleted entity correctly" in {
-      // put the delete event in middle
-      val events = sc.parallelize(Seq(
-        u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2))
-
-      val users = PEventAggregator.aggregateProperties(events)
-
-      val userMap = users.collectAsMap.toMap
-      val expectedPM = Map(
-        "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
-      )
-
-      userMap must beEqualTo(expectedPM)
-    }
-
-  }
-
-  step(sc.stop())
-}