You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:38 UTC

[07/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala b/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala
deleted file mode 100644
index 74614b2..0000000
--- a/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import org.specs2._
-import org.specs2.specification.Step
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class PEventsSpec extends Specification with TestEvents {
-
-  System.clearProperty("spark.driver.port")
-  System.clearProperty("spark.hostPort")
-  val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
-
-  val appId = 1
-  val channelId = 6
-  val dbName = "test_pio_storage_events_" + hashCode
-
-  def hbLocal = Storage.getDataObject[LEvents](
-    StorageTestUtils.hbaseSourceName,
-    dbName
-  )
-
-  def hbPar = Storage.getDataObject[PEvents](
-    StorageTestUtils.hbaseSourceName,
-    dbName
-  )
-
-  def jdbcLocal = Storage.getDataObject[LEvents](
-    StorageTestUtils.jdbcSourceName,
-    dbName
-  )
-
-  def jdbcPar = Storage.getDataObject[PEvents](
-    StorageTestUtils.jdbcSourceName,
-    dbName
-  )
-
-  def stopSpark = {
-    sc.stop()
-  }
-
-  def is = s2"""
-
-  PredictionIO Storage PEvents Specification
-
-    PEvents can be implemented by:
-    - HBPEvents ${hbPEvents}
-    - JDBCPEvents ${jdbcPEvents}
-    - (stop Spark) ${Step(sc.stop())}
-
-  """
-
-  def hbPEvents = sequential ^ s2"""
-
-    HBPEvents should
-    - behave like any PEvents implementation ${events(hbLocal, hbPar)}
-    - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
-
-  """
-
-  def jdbcPEvents = sequential ^ s2"""
-
-    JDBCPEvents should
-    - behave like any PEvents implementation ${events(jdbcLocal, jdbcPar)}
-    - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_$appId"))}
-    - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_${appId}_$channelId"))}
-
-  """
-
-  def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2"""
-
-    - (init test) ${initTest(localEventClient)}
-    - (insert test events) ${insertTestEvents(localEventClient)}
-    find in default ${find(parEventClient)}
-    find in channel ${findChannel(parEventClient)}
-    aggregate user properties in default ${aggregateUserProperties(parEventClient)}
-    aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)}
-    write to default ${write(parEventClient)}
-    write to channel ${writeChannel(parEventClient)}
-
-  """
-
-  /* setup */
-
-  // events from TestEvents trait
-  val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2)
-  val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4)
-
-  def initTest(localEventClient: LEvents) = {
-    localEventClient.init(appId)
-    localEventClient.init(appId, Some(channelId))
-  }
-
-  def insertTestEvents(localEventClient: LEvents) = {
-    listOfEvents.map( localEventClient.insert(_, appId) )
-    // insert to channel
-    listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) )
-    success
-  }
-
-  /* following are tests */
-
-  def find(parEventClient: PEvents) = {
-    val resultRDD: RDD[Event] = parEventClient.find(
-      appId = appId
-    )(sc)
-
-    val results = resultRDD.collect.toList
-      .map {_.copy(eventId = None)} // ignore eventId
-
-    results must containTheSameElementsAs(listOfEvents)
-  }
-
-  def findChannel(parEventClient: PEvents) = {
-    val resultRDD: RDD[Event] = parEventClient.find(
-      appId = appId,
-      channelId = Some(channelId)
-    )(sc)
-
-    val results = resultRDD.collect.toList
-      .map {_.copy(eventId = None)} // ignore eventId
-
-    results must containTheSameElementsAs(listOfEventsChannel)
-  }
-
-  def aggregateUserProperties(parEventClient: PEvents) = {
-    val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
-      appId = appId,
-      entityType = "user"
-    )(sc)
-    val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
-
-    val expected = Map(
-      "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
-      "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
-    )
-
-    result must beEqualTo(expected)
-  }
-
-  def aggregateUserPropertiesChannel(parEventClient: PEvents) = {
-    val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
-      appId = appId,
-      channelId = Some(channelId),
-      entityType = "user"
-    )(sc)
-    val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
-
-    val expected = Map(
-      "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime)
-    )
-
-    result must beEqualTo(expected)
-  }
-
-  def write(parEventClient: PEvents) = {
-    val written = List(r5, r6)
-    val writtenRDD = sc.parallelize(written)
-    parEventClient.write(writtenRDD, appId)(sc)
-
-    // read back
-    val resultRDD = parEventClient.find(
-      appId = appId
-    )(sc)
-
-    val results = resultRDD.collect.toList
-      .map { _.copy(eventId = None)} // ignore eventId
-
-    val expected = listOfEvents ++ written
-
-    results must containTheSameElementsAs(expected)
-  }
-
-  def writeChannel(parEventClient: PEvents) = {
-    val written = List(r1, r5, r6)
-    val writtenRDD = sc.parallelize(written)
-    parEventClient.write(writtenRDD, appId, Some(channelId))(sc)
-
-    // read back
-    val resultRDD = parEventClient.find(
-      appId = appId,
-      channelId = Some(channelId)
-    )(sc)
-
-    val results = resultRDD.collect.toList
-      .map { _.copy(eventId = None)} // ignore eventId
-
-    val expected = listOfEventsChannel ++ written
-
-    results must containTheSameElementsAs(expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala b/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala
deleted file mode 100644
index 74615a1..0000000
--- a/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import io.prediction.data.storage.hbase.HBLEvents
-import scalikejdbc._
-
-object StorageTestUtils {
-  val hbaseSourceName = "HBASE"
-  val jdbcSourceName = "PGSQL"
-
-  def dropHBaseNamespace(namespace: String): Unit = {
-    val eventDb = Storage.getDataObject[LEvents](hbaseSourceName, namespace)
-      .asInstanceOf[HBLEvents]
-    val admin = eventDb.client.admin
-    val tableNames = admin.listTableNamesByNamespace(namespace)
-    tableNames.foreach { name =>
-      admin.disableTable(name)
-      admin.deleteTable(name)
-    }
-
-    //Only empty namespaces (no tables) can be removed.
-    admin.deleteNamespace(namespace)
-  }
-
-  def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s =>
-    SQL(s"drop table $table").execute().apply()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/TestEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/TestEvents.scala b/data/src/test/scala/io/prediction/data/storage/TestEvents.scala
deleted file mode 100644
index 4fc2469..0000000
--- a/data/src/test/scala/io/prediction/data/storage/TestEvents.scala
+++ /dev/null
@@ -1,263 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.storage
-
-import org.joda.time.DateTime
-import org.joda.time.DateTimeZone
-
-trait TestEvents {
-
-  val u1BaseTime = new DateTime(654321)
-  val u2BaseTime = new DateTime(6543210)
-  val u3BaseTime = new DateTime(6543410)
-
-  // u1 events
-  val u1e1 = Event(
-    event = "$set",
-    entityType = "user",
-    entityId = "u1",
-    properties = DataMap(
-      """{
-        "a" : 1,
-        "b" : "value2",
-        "d" : [1, 2, 3],
-      }"""),
-    eventTime = u1BaseTime
-  )
-
-  val u1e2 = u1e1.copy(
-    event = "$set",
-    properties = DataMap("""{"a" : 2}"""),
-    eventTime = u1BaseTime.plusDays(1)
-  )
-
-  val u1e3 = u1e1.copy(
-    event = "$set",
-    properties = DataMap("""{"b" : "value4"}"""),
-    eventTime = u1BaseTime.plusDays(2)
-  )
-
-  val u1e4 = u1e1.copy(
-    event = "$unset",
-    properties = DataMap("""{"b" : null}"""),
-    eventTime = u1BaseTime.plusDays(3)
-  )
-
-  val u1e5 = u1e1.copy(
-    event = "$set",
-    properties = DataMap("""{"e" : "new"}"""),
-    eventTime = u1BaseTime.plusDays(4)
-  )
-
-  val u1LastTime = u1BaseTime.plusDays(4)
-  val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}"""
-
-  // delete event for u1
-  val u1ed = u1e1.copy(
-    event = "$delete",
-    properties = DataMap(),
-    eventTime = u1BaseTime.plusDays(5)
-  )
-
-  // u2 events
-  val u2e1 = Event(
-    event = "$set",
-    entityType = "user",
-    entityId = "u2",
-    properties = DataMap(
-      """{
-        "a" : 21,
-        "b" : "value12",
-        "d" : [7, 5, 6],
-      }"""),
-    eventTime = u2BaseTime
-  )
-
-  val u2e2 = u2e1.copy(
-    event = "$unset",
-    properties = DataMap("""{"a" : null}"""),
-    eventTime = u2BaseTime.plusDays(1)
-  )
-
-  val u2e3 = u2e1.copy(
-    event = "$set",
-    properties = DataMap("""{"b" : "value9", "g": "new11"}"""),
-    eventTime = u2BaseTime.plusDays(2)
-  )
-
-  val u2LastTime = u2BaseTime.plusDays(2)
-  val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}"""
-
-  // u3 events
-  val u3e1 = Event(
-    event = "$set",
-    entityType = "user",
-    entityId = "u3",
-    properties = DataMap(
-      """{
-        "a" : 22,
-        "b" : "value13",
-        "d" : [5, 6, 1],
-      }"""),
-    eventTime = u3BaseTime
-  )
-
-  val u3e2 = u3e1.copy(
-    event = "$unset",
-    properties = DataMap("""{"a" : null}"""),
-    eventTime = u3BaseTime.plusDays(1)
-  )
-
-  val u3e3 = u3e1.copy(
-    event = "$set",
-    properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""),
-    eventTime = u3BaseTime.plusDays(2)
-  )
-
-  val u3LastTime = u3BaseTime.plusDays(2)
-  val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}"""
-
-  // some random events
-  val r1 = Event(
-    event = "my_event",
-    entityType = "my_entity_type",
-    entityId = "my_entity_id",
-    targetEntityType = Some("my_target_entity_type"),
-    targetEntityId = Some("my_target_entity_id"),
-    properties = DataMap(
-      """{
-        "prop1" : 1,
-        "prop2" : "value2",
-        "prop3" : [1, 2, 3],
-        "prop4" : true,
-        "prop5" : ["a", "b", "c"],
-        "prop6" : 4.56
-      }"""
-    ),
-    eventTime = DateTime.now,
-    prId = Some("my_prid")
-  )
-  val r2 = Event(
-    event = "my_event2",
-    entityType = "my_entity_type2",
-    entityId = "my_entity_id2"
-  )
-  val r3 = Event(
-    event = "my_event3",
-    entityType = "my_entity_type",
-    entityId = "my_entity_id",
-    targetEntityType = Some("my_target_entity_type"),
-    targetEntityId = Some("my_target_entity_id"),
-    properties = DataMap(
-      """{
-        "propA" : 1.2345,
-        "propB" : "valueB",
-      }"""
-    ),
-    prId = Some("my_prid")
-  )
-  val r4 = Event(
-    event = "my_event4",
-    entityType = "my_entity_type4",
-    entityId = "my_entity_id4",
-    targetEntityType = Some("my_target_entity_type4"),
-    targetEntityId = Some("my_target_entity_id4"),
-    properties = DataMap(
-      """{
-        "prop1" : 1,
-        "prop2" : "value2",
-        "prop3" : [1, 2, 3],
-        "prop4" : true,
-        "prop5" : ["a", "b", "c"],
-        "prop6" : 4.56
-      }"""),
-    eventTime = DateTime.now
-  )
-  val r5 = Event(
-    event = "my_event5",
-    entityType = "my_entity_type5",
-    entityId = "my_entity_id5",
-    targetEntityType = Some("my_target_entity_type5"),
-    targetEntityId = Some("my_target_entity_id5"),
-    properties = DataMap(
-      """{
-        "prop1" : 1,
-        "prop2" : "value2",
-        "prop3" : [1, 2, 3],
-        "prop4" : true,
-        "prop5" : ["a", "b", "c"],
-        "prop6" : 4.56
-      }"""
-    ),
-    eventTime = DateTime.now
-  )
-  val r6 = Event(
-    event = "my_event6",
-    entityType = "my_entity_type6",
-    entityId = "my_entity_id6",
-    targetEntityType = Some("my_target_entity_type6"),
-    targetEntityId = Some("my_target_entity_id6"),
-    properties = DataMap(
-      """{
-        "prop1" : 6,
-        "prop2" : "value2",
-        "prop3" : [6, 7, 8],
-        "prop4" : true,
-        "prop5" : ["a", "b", "c"],
-        "prop6" : 4.56
-      }"""
-    ),
-    eventTime = DateTime.now
-  )
-
-  // timezone
-  val tz1 = Event(
-    event = "my_event",
-    entityType = "my_entity_type",
-    entityId = "my_entity_id0",
-    targetEntityType = Some("my_target_entity_type"),
-    targetEntityId = Some("my_target_entity_id"),
-    properties = DataMap(
-      """{
-        "prop1" : 1,
-        "prop2" : "value2",
-        "prop3" : [1, 2, 3],
-        "prop4" : true,
-        "prop5" : ["a", "b", "c"],
-        "prop6" : 4.56
-      }"""
-    ),
-    eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")),
-    prId = Some("my_prid")
-  )
-
-  val tz2 = Event(
-    event = "my_event",
-    entityType = "my_entity_type",
-    entityId = "my_entity_id1",
-    eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")),
-    prId = Some("my_prid")
-  )
-
-  val tz3 = Event(
-    event = "my_event",
-    entityType = "my_entity_type",
-    entityId = "my_entity_id2",
-    eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")),
-    prId = Some("my_prid")
-  )
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala b/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala
deleted file mode 100644
index 4009e0f..0000000
--- a/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks
-
-import org.specs2.execute.Result
-import org.specs2.mutable._
-
-import org.json4s.JObject
-import org.json4s.DefaultFormats
-import org.json4s.native.JsonMethods.parse
-import org.json4s.native.Serialization.write
-
-/** TestUtil for JsonConnector */
-trait ConnectorTestUtil extends Specification {
-
-  implicit val formats = DefaultFormats
-
-  def check(connector: JsonConnector, original: String, event: String): Result = {
-    val originalJson = parse(original).asInstanceOf[JObject]
-    val eventJson = parse(event).asInstanceOf[JObject]
-    // write and parse back to discard any JNothing field
-    val result = parse(write(connector.toEventJson(originalJson))).asInstanceOf[JObject]
-    result.obj must containTheSameElementsAs(eventJson.obj)
-  }
-
-  def check(connector: FormConnector, original: Map[String, String], event: String) = {
-
-    val eventJson = parse(event).asInstanceOf[JObject]
-    // write and parse back to discard any JNothing field
-    val result = parse(write(connector.toEventJson(original))).asInstanceOf[JObject]
-
-    result.obj must containTheSameElementsAs(eventJson.obj)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala
deleted file mode 100644
index 7f6ad8f..0000000
--- a/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks.exampleform
-
-import io.prediction.data.webhooks.ConnectorTestUtil
-
-import org.specs2.mutable._
-
-/** Test the ExampleFormConnector */
-class ExampleFormConnectorSpec extends Specification with ConnectorTestUtil {
-
-  "ExampleFormConnector" should {
-
-    "convert userAction to Event JSON" in {
-      // webhooks input
-      val userAction = Map(
-        "type" -> "userAction",
-        "userId" -> "as34smg4",
-        "event" -> "do_something",
-        "context[ip]" -> "24.5.68.47", // optional
-        "context[prop1]" -> "2.345", // optional
-        "context[prop2]" -> "value1", // optional
-        "anotherProperty1" -> "100",
-        "anotherProperty2"-> "optional1", // optional
-        "timestamp" -> "2015-01-02T00:30:12.984Z"
-      )
-
-      // expected converted Event JSON
-      val expected = """
-        {
-          "event": "do_something",
-          "entityType": "user",
-          "entityId": "as34smg4",
-          "properties": {
-            "context": {
-              "ip": "24.5.68.47",
-              "prop1": 2.345
-              "prop2": "value1"
-            },
-            "anotherProperty1": 100,
-            "anotherProperty2": "optional1"
-          }
-          "eventTime": "2015-01-02T00:30:12.984Z"
-        }
-      """
-
-      check(ExampleFormConnector, userAction, expected)
-    }
-
-    "convert userAction without optional fields to Event JSON" in {
-      // webhooks input
-      val userAction = Map(
-        "type" -> "userAction",
-        "userId" -> "as34smg4",
-        "event" -> "do_something",
-        "anotherProperty1" -> "100",
-        "timestamp" -> "2015-01-02T00:30:12.984Z"
-      )
-
-      // expected converted Event JSON
-      val expected = """
-        {
-          "event": "do_something",
-          "entityType": "user",
-          "entityId": "as34smg4",
-          "properties": {
-            "anotherProperty1": 100,
-          }
-          "eventTime": "2015-01-02T00:30:12.984Z"
-        }
-      """
-
-      check(ExampleFormConnector, userAction, expected)
-    }
-
-    "convert userActionItem to Event JSON" in {
-      // webhooks input
-      val userActionItem = Map(
-        "type" -> "userActionItem",
-        "userId" -> "as34smg4",
-        "event" -> "do_something_on",
-        "itemId" -> "kfjd312bc",
-        "context[ip]" -> "1.23.4.56",
-        "context[prop1]" -> "2.345",
-        "context[prop2]" -> "value1",
-        "anotherPropertyA" -> "4.567", // optional
-        "anotherPropertyB" -> "false", // optional
-        "timestamp" -> "2015-01-15T04:20:23.567Z"
-      )
-
-      // expected converted Event JSON
-      val expected = """
-        {
-          "event": "do_something_on",
-          "entityType": "user",
-          "entityId": "as34smg4",
-          "targetEntityType": "item",
-          "targetEntityId": "kfjd312bc"
-          "properties": {
-            "context": {
-              "ip": "1.23.4.56",
-              "prop1": 2.345
-              "prop2": "value1"
-            },
-            "anotherPropertyA": 4.567
-            "anotherPropertyB": false
-          }
-          "eventTime": "2015-01-15T04:20:23.567Z"
-        }
-      """
-
-      check(ExampleFormConnector, userActionItem, expected)
-    }
-
-    "convert userActionItem without optional fields to Event JSON" in {
-      // webhooks input
-      val userActionItem = Map(
-        "type" -> "userActionItem",
-        "userId" -> "as34smg4",
-        "event" -> "do_something_on",
-        "itemId" -> "kfjd312bc",
-        "context[ip]" -> "1.23.4.56",
-        "context[prop1]" -> "2.345",
-        "context[prop2]" -> "value1",
-        "timestamp" -> "2015-01-15T04:20:23.567Z"
-      )
-
-      // expected converted Event JSON
-      val expected = """
-        {
-          "event": "do_something_on",
-          "entityType": "user",
-          "entityId": "as34smg4",
-          "targetEntityType": "item",
-          "targetEntityId": "kfjd312bc"
-          "properties": {
-            "context": {
-              "ip": "1.23.4.56",
-              "prop1": 2.345
-              "prop2": "value1"
-            }
-          }
-          "eventTime": "2015-01-15T04:20:23.567Z"
-        }
-      """
-
-      check(ExampleFormConnector, userActionItem, expected)
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala
deleted file mode 100644
index bdf1cc4..0000000
--- a/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks.examplejson
-
-import io.prediction.data.webhooks.ConnectorTestUtil
-
-import org.specs2.mutable._
-
-/** Test the ExampleJsonConnector */
-class ExampleJsonConnectorSpec extends Specification with ConnectorTestUtil {
-
-  "ExampleJsonConnector" should {
-
-    "convert userAction to Event JSON" in {
-      // webhooks input
-      val userAction = """
-        {
-          "type": "userAction"
-          "userId": "as34smg4",
-          "event": "do_something",
-          "context": {
-            "ip": "24.5.68.47",
-            "prop1": 2.345
-            "prop2": "value1"
-          },
-          "anotherProperty1": 100,
-          "anotherProperty2": "optional1",
-          "timestamp": "2015-01-02T00:30:12.984Z"
-        }
-      """
-
-      // expected converted Event JSON
-      val expected = """
-        {
-          "event": "do_something",
-          "entityType": "user",
-          "entityId": "as34smg4",
-          "properties": {
-            "context": {
-              "ip": "24.5.68.47",
-              "prop1": 2.345
-              "prop2": "value1"
-            },
-            "anotherProperty1": 100,
-            "anotherProperty2": "optional1"
-          }
-          "eventTime": "2015-01-02T00:30:12.984Z"
-        }
-      """
-
-      check(ExampleJsonConnector, userAction, expected)
-    }
-
-    "convert userAction without optional field to Event JSON" in {
-      // webhooks input
-      val userAction = """
-        {
-          "type": "userAction"
-          "userId": "as34smg4",
-          "event": "do_something",
-          "anotherProperty1": 100,
-          "timestamp": "2015-01-02T00:30:12.984Z"
-        }
-      """
-
-      // expected converted Event JSON
-      val expected = """
-        {
-          "event": "do_something",
-          "entityType": "user",
-          "entityId": "as34smg4",
-          "properties": {
-            "anotherProperty1": 100,
-          }
-          "eventTime": "2015-01-02T00:30:12.984Z"
-        }
-      """
-
-      check(ExampleJsonConnector, userAction, expected)
-    }
-
-    "convert userActionItem to Event JSON" in {
-      // webhooks input
-      val userActionItem = """
-        {
-          "type": "userActionItem"
-          "userId": "as34smg4",
-          "event": "do_something_on",
-          "itemId": "kfjd312bc",
-          "context": {
-            "ip": "1.23.4.56",
-            "prop1": 2.345
-            "prop2": "value1"
-          },
-          "anotherPropertyA": 4.567
-          "anotherPropertyB": false
-          "timestamp": "2015-01-15T04:20:23.567Z"
-      }
-      """
-
-      // expected converted Event JSON
-      val expected = """
-        {
-          "event": "do_something_on",
-          "entityType": "user",
-          "entityId": "as34smg4",
-          "targetEntityType": "item",
-          "targetEntityId": "kfjd312bc"
-          "properties": {
-            "context": {
-              "ip": "1.23.4.56",
-              "prop1": 2.345
-              "prop2": "value1"
-            },
-            "anotherPropertyA": 4.567
-            "anotherPropertyB": false
-          }
-          "eventTime": "2015-01-15T04:20:23.567Z"
-        }
-      """
-
-      check(ExampleJsonConnector, userActionItem, expected)
-    }
-
-    "convert userActionItem without optional fields to Event JSON" in {
-      // webhooks input
-      val userActionItem = """
-        {
-          "type": "userActionItem"
-          "userId": "as34smg4",
-          "event": "do_something_on",
-          "itemId": "kfjd312bc",
-          "context": {
-            "ip": "1.23.4.56",
-            "prop1": 2.345
-            "prop2": "value1"
-          }
-          "timestamp": "2015-01-15T04:20:23.567Z"
-      }
-      """
-
-      // expected converted Event JSON
-      val expected = """
-        {
-          "event": "do_something_on",
-          "entityType": "user",
-          "entityId": "as34smg4",
-          "targetEntityType": "item",
-          "targetEntityId": "kfjd312bc"
-          "properties": {
-            "context": {
-              "ip": "1.23.4.56",
-              "prop1": 2.345
-              "prop2": "value1"
-            }
-          }
-          "eventTime": "2015-01-15T04:20:23.567Z"
-        }
-      """
-
-      check(ExampleJsonConnector, userActionItem, expected)
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala
deleted file mode 100644
index 56484c2..0000000
--- a/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala
+++ /dev/null
@@ -1,254 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks.mailchimp
-
-import io.prediction.data.webhooks.ConnectorTestUtil
-
-import org.specs2.mutable._
-
-class MailChimpConnectorSpec extends Specification with ConnectorTestUtil {
-
-  // TOOD: test other events
-  // TODO: test different optional fields
-
-  "MailChimpConnector" should {
-
-    "convert subscribe to event JSON" in {
-
-      val subscribe = Map(
-        "type" -> "subscribe",
-        "fired_at" -> "2009-03-26 21:35:57",
-        "data[id]" -> "8a25ff1d98",
-        "data[list_id]" -> "a6b5da1054",
-        "data[email]" -> "api@mailchimp.com",
-        "data[email_type]" -> "html",
-        "data[merges][EMAIL]" -> "api@mailchimp.com",
-        "data[merges][FNAME]" -> "MailChimp",
-        "data[merges][LNAME]" -> "API",
-        "data[merges][INTERESTS]" -> "Group1,Group2", //optional
-        "data[ip_opt]" -> "10.20.10.30",
-        "data[ip_signup]" -> "10.20.10.30"
-      )
-
-      val expected = """
-        {
-          "event" : "subscribe",
-          "entityType" : "user",
-          "entityId" : "8a25ff1d98",
-          "targetEntityType" : "list",
-          "targetEntityId" : "a6b5da1054",
-          "properties" : {
-            "email" : "api@mailchimp.com",
-            "email_type" : "html",
-            "merges" : {
-              "EMAIL" : "api@mailchimp.com",
-              "FNAME" : "MailChimp",
-              "LNAME" : "API"
-              "INTERESTS" : "Group1,Group2"
-            },
-            "ip_opt" : "10.20.10.30",
-            "ip_signup" : "10.20.10.30"
-          },
-          "eventTime" : "2009-03-26T21:35:57.000Z"
-        }
-      """
-
-      check(MailChimpConnector, subscribe, expected)
-    }
-
-    //check unsubscribe to event Json
-    "convert unsubscribe to event JSON" in {
-
-      val unsubscribe = Map(
-        "type" -> "unsubscribe",
-        "fired_at" -> "2009-03-26 21:40:57",
-        "data[action]" -> "unsub",
-        "data[reason]" -> "manual",
-        "data[id]" -> "8a25ff1d98",
-        "data[list_id]" -> "a6b5da1054",
-        "data[email]" -> "api+unsub@mailchimp.com",
-        "data[email_type]" -> "html",
-        "data[merges][EMAIL]" -> "api+unsub@mailchimp.com",
-        "data[merges][FNAME]" -> "MailChimp",
-        "data[merges][LNAME]" -> "API",
-        "data[merges][INTERESTS]" -> "Group1,Group2", //optional 
-        "data[ip_opt]" -> "10.20.10.30",
-        "data[campaign_id]" -> "cb398d21d2"
-      )
-
-      val expected = """
-        {
-          "event" : "unsubscribe",
-          "entityType" : "user",
-          "entityId" : "8a25ff1d98",
-          "targetEntityType" : "list",
-          "targetEntityId" : "a6b5da1054",
-          "properties" : {
-            "action" : "unsub",
-            "reason" : "manual",
-            "email" : "api+unsub@mailchimp.com",
-            "email_type" : "html",
-            "merges" : {
-              "EMAIL" : "api+unsub@mailchimp.com",
-              "FNAME" : "MailChimp",
-              "LNAME" : "API"
-              "INTERESTS" : "Group1,Group2"
-            },
-            "ip_opt" : "10.20.10.30",
-            "campaign_id" : "cb398d21d2"
-          },
-          "eventTime" : "2009-03-26T21:40:57.000Z"
-        }
-      """
-
-      check(MailChimpConnector, unsubscribe, expected)
-    }
-
-    //check profile update to event Json 
-    "convert profile update to event JSON" in {
-
-      val profileUpdate = Map(
-        "type" -> "profile",
-        "fired_at" -> "2009-03-26 21:31:21",
-        "data[id]" -> "8a25ff1d98",
-        "data[list_id]" -> "a6b5da1054",
-        "data[email]" -> "api@mailchimp.com",
-        "data[email_type]" -> "html",
-        "data[merges][EMAIL]" -> "api@mailchimp.com",
-        "data[merges][FNAME]" -> "MailChimp",
-        "data[merges][LNAME]" -> "API",
-        "data[merges][INTERESTS]" -> "Group1,Group2", //optional
-        "data[ip_opt]" -> "10.20.10.30"
-      )
-
-      val expected = """
-        {
-          "event" : "profile",
-          "entityType" : "user",
-          "entityId" : "8a25ff1d98",
-          "targetEntityType" : "list",
-          "targetEntityId" : "a6b5da1054",
-          "properties" : {
-            "email" : "api@mailchimp.com",
-            "email_type" : "html",
-            "merges" : {
-              "EMAIL" : "api@mailchimp.com",
-              "FNAME" : "MailChimp",
-              "LNAME" : "API"
-              "INTERESTS" : "Group1,Group2"
-            },
-            "ip_opt" : "10.20.10.30"
-          },
-          "eventTime" : "2009-03-26T21:31:21.000Z"
-        }
-      """
-
-      check(MailChimpConnector, profileUpdate, expected)
-    }
-
-    //check email update to event Json 
-    "convert email update to event JSON" in {
-
-      val emailUpdate = Map(
-        "type" -> "upemail",
-        "fired_at" -> "2009-03-26 22:15:09",
-        "data[list_id]" -> "a6b5da1054",
-        "data[new_id]" -> "51da8c3259",
-        "data[new_email]" -> "api+new@mailchimp.com",
-        "data[old_email]" -> "api+old@mailchimp.com"
-      )
-
-      val expected = """
-        {
-          "event" : "upemail",
-          "entityType" : "user",
-          "entityId" : "51da8c3259",
-          "targetEntityType" : "list",
-          "targetEntityId" : "a6b5da1054",
-          "properties" : {
-            "new_email" : "api+new@mailchimp.com",
-            "old_email" : "api+old@mailchimp.com"
-          },
-          "eventTime" : "2009-03-26T22:15:09.000Z"
-        }
-      """
-
-      check(MailChimpConnector, emailUpdate, expected)
-    }
-
-    //check cleaned email to event Json 
-    "convert cleaned email to event JSON" in {
-
-      val cleanedEmail = Map(
-        "type" -> "cleaned",
-        "fired_at" -> "2009-03-26 22:01:00",
-        "data[list_id]" -> "a6b5da1054",
-        "data[campaign_id]" -> "4fjk2ma9xd",
-        "data[reason]" -> "hard",
-        "data[email]" -> "api+cleaned@mailchimp.com"
-      )
-
-      val expected = """
-        {
-          "event" : "cleaned",
-          "entityType" : "list",
-          "entityId" : "a6b5da1054",
-          "properties" : {
-            "campaignId" : "4fjk2ma9xd",
-            "reason" : "hard",
-            "email" : "api+cleaned@mailchimp.com"
-          },
-          "eventTime" : "2009-03-26T22:01:00.000Z"
-        }
-      """
-
-      check(MailChimpConnector, cleanedEmail, expected)
-    }
-
-    //check campaign sending status to event Json 
-    "convert campaign sending status to event JSON" in {
-
-      val campaign = Map(
-        "type" -> "campaign",
-        "fired_at" -> "2009-03-26 22:15:09",
-        "data[id]" -> "5aa2102003",
-        "data[subject]" -> "Test Campaign Subject",
-        "data[status]" -> "sent",
-        "data[reason]" -> "",
-        "data[list_id]" -> "a6b5da1054"
-      )
-
-      val expected = """
-        {
-          "event" : "campaign",
-          "entityType" : "campaign",
-          "entityId" : "5aa2102003",
-          "targetEntityType" : "list",
-          "targetEntityId" : "a6b5da1054",
-          "properties" : {
-            "subject" : "Test Campaign Subject",
-            "status" : "sent",
-            "reason" : ""
-          },
-          "eventTime" : "2009-03-26T22:15:09.000Z"
-        }
-      """
-
-      check(MailChimpConnector, campaign, expected)
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala
deleted file mode 100644
index d7587cd..0000000
--- a/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala
+++ /dev/null
@@ -1,335 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks.segmentio
-
-import io.prediction.data.webhooks.ConnectorTestUtil
-
-import org.specs2.mutable._
-
-class SegmentIOConnectorSpec extends Specification with ConnectorTestUtil {
-
-  // TODO: test different optional fields
-
-  val commonFields =
-    s"""
-       |  "anonymous_id": "id",
-       |  "sent_at": "sendAt",
-       |  "version": "2",
-     """.stripMargin
-
-  "SegmentIOConnector" should {
-
-    "convert group with context to event JSON" in {
-      val context =
-        """
-          |  "context": {
-          |    "app": {
-          |      "name": "InitechGlobal",
-          |      "version": "545",
-          |      "build": "3.0.1.545"
-          |    },
-          |    "campaign": {
-          |      "name": "TPS Innovation Newsletter",
-          |      "source": "Newsletter",
-          |      "medium": "email",
-          |      "term": "tps reports",
-          |      "content": "image link"
-          |    },
-          |    "device": {
-          |      "id": "B5372DB0-C21E-11E4-8DFC-AA07A5B093DB",
-          |      "advertising_id": "7A3CBEA0-BDF5-11E4-8DFC-AA07A5B093DB",
-          |      "ad_tracking_enabled": true,
-          |      "manufacturer": "Apple",
-          |      "model": "iPhone7,2",
-          |      "name": "maguro",
-          |      "type": "ios",
-          |      "token": "ff15bc0c20c4aa6cd50854ff165fd265c838e5405bfeb9571066395b8c9da449"
-          |    },
-          |    "ip": "8.8.8.8",
-          |    "library": {
-          |      "name": "analytics-ios",
-          |      "version": "1.8.0"
-          |    },
-          |    "network": {
-          |      "bluetooth": false,
-          |      "carrier": "T-Mobile NL",
-          |      "cellular": true,
-          |      "wifi": false
-          |    },
-          |    "location": {
-          |      "city": "San Francisco",
-          |      "country": "United States",
-          |      "latitude": 40.2964197,
-          |      "longitude": -76.9411617,
-          |      "speed": 0
-          |    },
-          |    "os": {
-          |      "name": "iPhone OS",
-          |      "version": "8.1.3"
-          |    },
-          |    "referrer": {
-          |      "id": "ABCD582CDEFFFF01919",
-          |      "type": "dataxu"
-          |    },
-          |    "screen": {
-          |      "width": 320,
-          |      "height": 568,
-          |      "density": 2
-          |    },
-          |    "timezone": "Europe/Amsterdam",
-          |    "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)"
-          |  }
-        """.stripMargin
-
-      val group =
-        s"""
-           |{ $commonFields
-            |  "type": "group",
-            |  "group_id": "groupId",
-            |  "user_id": "userIdValue",
-            |  "timestamp" : "2012-12-02T00:30:08.276Z",
-            |  "traits": {
-            |    "name": "groupName",
-            |    "employees": 329,
-            |  },
-            |  $context
-            |}
-        """.stripMargin
-
-      val expected =
-        s"""
-          |{
-          |  "event": "group",
-          |  "entityType": "user",
-          |  "entityId": "userIdValue",
-          |  "properties": {
-          |    $context,
-          |    "group_id": "groupId",
-          |    "traits": {
-          |      "name": "groupName",
-          |      "employees": 329
-          |    },
-          |  },
-          |  "eventTime" : "2012-12-02T00:30:08.276Z"
-          |}
-        """.stripMargin
-
-      check(SegmentIOConnector, group, expected)
-    }
-
-    "convert group to event JSON" in {
-      val group =
-        s"""
-          |{ $commonFields
-          |  "type": "group",
-          |  "group_id": "groupId",
-          |  "user_id": "userIdValue",
-          |  "timestamp" : "2012-12-02T00:30:08.276Z",
-          |  "traits": {
-          |    "name": "groupName",
-          |    "employees": 329,
-          |  }
-          |}
-        """.stripMargin
-
-      val expected =
-        """
-          |{
-          |  "event": "group",
-          |  "entityType": "user",
-          |  "entityId": "userIdValue",
-          |  "properties": {
-          |    "group_id": "groupId",
-          |    "traits": {
-          |      "name": "groupName",
-          |      "employees": 329
-          |    }
-          |  },
-          |  "eventTime" : "2012-12-02T00:30:08.276Z"
-          |}
-        """.stripMargin
-
-      check(SegmentIOConnector, group, expected)
-    }
-
-    "convert screen to event JSON" in {
-      val screen =
-        s"""
-          |{ $commonFields
-          |  "type": "screen",
-          |  "name": "screenName",
-          |  "user_id": "userIdValue",
-          |  "timestamp" : "2012-12-02T00:30:08.276Z",
-          |  "properties": {
-          |    "variation": "screenVariation"
-          |  }
-          |}
-        """.stripMargin
-
-      val expected =
-        """
-          |{
-          |  "event": "screen",
-          |  "entityType": "user",
-          |  "entityId": "userIdValue",
-          |  "properties": {
-          |    "properties": {
-          |      "variation": "screenVariation"
-          |    },
-          |    "name": "screenName"
-          |  },
-          |  "eventTime" : "2012-12-02T00:30:08.276Z"
-          |}
-        """.stripMargin
-
-      check(SegmentIOConnector, screen, expected)
-    }
-
-    "convert page to event JSON" in {
-      val page =
-       s"""
-          |{ $commonFields
-          |  "type": "page",
-          |  "name": "pageName",
-          |  "user_id": "userIdValue",
-          |  "timestamp" : "2012-12-02T00:30:08.276Z",
-          |  "properties": {
-          |    "title": "pageTitle",
-          |    "url": "pageUrl"
-          |  }
-          |}
-        """.stripMargin
-
-      val expected =
-        """
-          |{
-          |  "event": "page",
-          |  "entityType": "user",
-          |  "entityId": "userIdValue",
-          |  "properties": {
-          |    "properties": {
-          |      "title": "pageTitle",
-          |      "url": "pageUrl"
-          |    },
-          |    "name": "pageName"
-          |  },
-          |  "eventTime" : "2012-12-02T00:30:08.276Z"
-          |}
-        """.stripMargin
-
-      check(SegmentIOConnector, page, expected)
-    }
-
-    "convert alias to event JSON" in {
-      val alias =
-        s"""
-          |{ $commonFields
-          |  "type": "alias",
-          |  "previous_id": "previousIdValue",
-          |  "user_id": "userIdValue",
-          |  "timestamp" : "2012-12-02T00:30:08.276Z"
-          |}
-        """.stripMargin
-
-      val expected =
-        """
-          |{
-          |  "event": "alias",
-          |  "entityType": "user",
-          |  "entityId": "userIdValue",
-          |  "properties": {
-          |    "previous_id" : "previousIdValue"
-          |  },
-          |  "eventTime" : "2012-12-02T00:30:08.276Z"
-          |}
-        """.stripMargin
-
-      check(SegmentIOConnector, alias, expected)
-    }
-
-    "convert track to event JSON" in {
-      val track =
-       s"""
-          |{ $commonFields
-          |  "user_id": "some_user_id",
-          |  "type": "track",
-          |  "event": "Registered",
-          |  "timestamp" : "2012-12-02T00:30:08.276Z",
-          |  "properties": {
-          |    "plan": "Pro Annual",
-          |    "accountType" : "Facebook"
-          |  }
-          |}
-        """.stripMargin
-
-      val expected =
-        """
-          |{
-          |  "event": "track",
-          |  "entityType": "user",
-          |  "entityId": "some_user_id",
-          |  "properties": {
-          |    "event": "Registered",
-          |    "properties": {
-          |      "plan": "Pro Annual",
-          |      "accountType": "Facebook"
-          |    }
-          |  },
-          |  "eventTime" : "2012-12-02T00:30:08.276Z"
-          |}
-        """.stripMargin
-
-      check(SegmentIOConnector, track, expected)
-    }
-
-    "convert identify to event JSON" in {
-      val identify = s"""
-        { $commonFields
-          "type"      : "identify",
-          "user_id"    : "019mr8mf4r",
-          "traits"    : {
-              "email"            : "achilles@segment.com",
-              "name"             : "Achilles",
-              "subscription_plan" : "Premium",
-              "friendCount"      : 29
-          },
-          "timestamp" : "2012-12-02T00:30:08.276Z"
-        }
-      """
-
-      val expected = """
-        {
-          "event" : "identify",
-          "entityType": "user",
-          "entityId" : "019mr8mf4r",
-          "properties" : {
-            "traits" : {
-              "email"            : "achilles@segment.com",
-              "name"             : "Achilles",
-              "subscription_plan" : "Premium",
-              "friendCount"      : 29
-            }
-          },
-          "eventTime" : "2012-12-02T00:30:08.276Z"
-        }
-      """
-
-      check(SegmentIOConnector, identify, expected)
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
new file mode 100644
index 0000000..62fd89c
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
@@ -0,0 +1,68 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.storage.Storage
+
+import akka.testkit.TestProbe
+import akka.actor.ActorSystem
+import akka.actor.Props
+
+import spray.http.HttpEntity
+import spray.http.HttpResponse
+import spray.http.ContentTypes
+import spray.httpx.RequestBuilding.Get
+
+import org.specs2.mutable.Specification
+
+class EventServiceSpec extends Specification {
+
+  val system = ActorSystem("EventServiceSpecSystem")
+
+  val eventClient = Storage.getLEvents()
+  val accessKeysClient = Storage.getMetaDataAccessKeys()
+  val channelsClient = Storage.getMetaDataChannels()
+  
+  val eventServiceActor = system.actorOf(
+    Props(
+      new EventServiceActor(
+        eventClient,
+        accessKeysClient,
+        channelsClient,
+        EventServerConfig()
+      )
+    )
+  )
+
+  "GET / request" should {
+    "properly produce OK HttpResponses" in {
+      val probe = TestProbe()(system)
+      probe.send(eventServiceActor, Get("/"))
+      probe.expectMsg(
+        HttpResponse(
+          200,
+          HttpEntity(
+            contentType = ContentTypes.`application/json`,
+            string = """{"status":"alive"}"""
+          )
+        )
+      )
+      success
+    }
+  }
+
+  step(system.shutdown())
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
new file mode 100644
index 0000000..bae0f0b
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
@@ -0,0 +1,175 @@
+package io.prediction.data.api
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import io.prediction.data.storage._
+import org.joda.time.DateTime
+import org.specs2.mutable.Specification
+import spray.http.HttpHeaders.RawHeader
+import spray.http.{ContentTypes, HttpEntity, HttpResponse}
+import spray.httpx.RequestBuilding._
+import sun.misc.BASE64Encoder
+
+import scala.concurrent.{Future, ExecutionContext}
+
+class SegmentIOAuthSpec extends Specification {
+
+  val system = ActorSystem("EventServiceSpecSystem")
+  sequential
+  isolated
+  val eventClient = new LEvents {
+    override def init(appId: Int, channelId: Option[Int]): Boolean = true
+
+    override def futureInsert(event: Event, appId: Int, channelId: Option[Int])
+        (implicit ec: ExecutionContext): Future[String] =
+      Future successful "event_id"
+
+    override def futureFind(
+      appId: Int, channelId: Option[Int], startTime: Option[DateTime],
+      untilTime: Option[DateTime], entityType: Option[String],
+      entityId: Option[String], eventNames: Option[Seq[String]],
+      targetEntityType: Option[Option[String]],
+      targetEntityId: Option[Option[String]], limit: Option[Int],
+      reversed: Option[Boolean])
+        (implicit ec: ExecutionContext): Future[Iterator[Event]] =
+      Future successful List.empty[Event].iterator
+
+    override def futureGet(eventId: String, appId: Int, channelId: Option[Int])
+        (implicit ec: ExecutionContext): Future[Option[Event]] =
+      Future successful None
+
+    override def remove(appId: Int, channelId: Option[Int]): Boolean = true
+
+    override def futureDelete(eventId: String, appId: Int, channelId: Option[Int])
+        (implicit ec: ExecutionContext): Future[Boolean] =
+      Future successful true
+
+    override def close(): Unit = {}
+  }
+  val appId = 0
+  val accessKeysClient = new AccessKeys {
+    override def insert(k: AccessKey): Option[String] = null
+    override def getByAppid(appid: Int): Seq[AccessKey] = null
+    override def update(k: AccessKey): Unit = {}
+    override def delete(k: String): Unit = {}
+    override def getAll(): Seq[AccessKey] = null
+
+    override def get(k: String): Option[AccessKey] =
+      k match {
+        case "abc" \u21d2 Some(AccessKey(k, appId, Seq.empty))
+        case _ \u21d2 None
+      }
+  }
+
+  val channelsClient = Storage.getMetaDataChannels()
+  val eventServiceActor = system.actorOf(
+    Props(
+      new EventServiceActor(
+        eventClient,
+        accessKeysClient,
+        channelsClient,
+        EventServerConfig()
+      )
+    )
+  )
+
+  val base64Encoder = new BASE64Encoder
+
+  "Event Service" should {
+
+    "reject with CredentialsRejected with invalid credentials" in {
+      val accessKey = "abc123:"
+      val probe = TestProbe()(system)
+      probe.send(
+        eventServiceActor,
+        Post("/webhooks/segmentio.json")
+          .withHeaders(
+            List(
+              RawHeader("Authorization", s"Basic $accessKey")
+            )
+          )
+      )
+      probe.expectMsg(
+        HttpResponse(
+          401,
+          HttpEntity(
+            contentType = ContentTypes.`application/json`,
+            string = """{"message":"Invalid accessKey."}"""
+          )
+        )
+      )
+      success
+    }
+
+    "reject with CredentialsMissed without credentials" in {
+      val probe = TestProbe()(system)
+      probe.send(
+        eventServiceActor,
+        Post("/webhooks/segmentio.json")
+      )
+      probe.expectMsg(
+        HttpResponse(
+          401,
+          HttpEntity(
+            contentType = ContentTypes.`application/json`,
+            string = """{"message":"Missing accessKey."}"""
+          )
+        )
+      )
+      success
+    }
+
+    "process SegmentIO identity request properly" in {
+      val jsonReq =
+        """
+          |{
+          |  "anonymous_id": "507f191e810c19729de860ea",
+          |  "channel": "browser",
+          |  "context": {
+          |    "ip": "8.8.8.8",
+          |    "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)"
+          |  },
+          |  "message_id": "022bb90c-bbac-11e4-8dfc-aa07a5b093db",
+          |  "timestamp": "2015-02-23T22:28:55.387Z",
+          |  "sent_at": "2015-02-23T22:28:55.111Z",
+          |  "traits": {
+          |    "name": "Peter Gibbons",
+          |    "email": "peter@initech.com",
+          |    "plan": "premium",
+          |    "logins": 5
+          |  },
+          |  "type": "identify",
+          |  "user_id": "97980cfea0067",
+          |  "version": "2"
+          |}
+        """.stripMargin
+
+      val accessKey = "abc:"
+      val accessKeyEncoded = base64Encoder.encodeBuffer(accessKey.getBytes)
+      val probe = TestProbe()(system)
+      probe.send(
+        eventServiceActor,
+        Post(
+          "/webhooks/segmentio.json",
+          HttpEntity(ContentTypes.`application/json`, jsonReq.getBytes)
+        ).withHeaders(
+            List(
+              RawHeader("Authorization", s"Basic $accessKeyEncoded")
+            )
+          )
+      )
+      probe.expectMsg(
+        HttpResponse(
+          201,
+          HttpEntity(
+            contentType = ContentTypes.`application/json`,
+            string = """{"eventId":"event_id"}"""
+          )
+        )
+      )
+      success
+    }
+  }
+
+  step(system.shutdown())
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala
new file mode 100644
index 0000000..c98c882
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala
@@ -0,0 +1,196 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.specs2.mutable._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.SparkConf
+import org.apache.spark.rdd.RDD
+
+class BiMapSpec extends Specification {
+
+  System.clearProperty("spark.driver.port")
+  System.clearProperty("spark.hostPort")
+  val sc = new SparkContext("local[4]", "BiMapSpec test")
+
+  "BiMap created with map" should {
+
+    val keys = Seq(1, 4, 6)
+    val orgValues = Seq(2, 5, 7)
+    val org = keys.zip(orgValues).toMap
+    val bi = BiMap(org)
+
+    "return correct values for each key of original map" in {
+      val biValues = keys.map(k => bi(k))
+
+      biValues must beEqualTo(orgValues)
+    }
+
+    "get return Option[V]" in {
+      val checkKeys = keys ++ Seq(12345)
+      val biValues = checkKeys.map(k => bi.get(k))
+      val expected = orgValues.map(Some(_)) ++ Seq(None)
+
+      biValues must beEqualTo(expected)
+    }
+
+    "getOrElse return value for each key of original map" in {
+      val biValues = keys.map(k => bi.getOrElse(k, -1))
+
+      biValues must beEqualTo(orgValues)
+    }
+
+    "getOrElse return default values for invalid key" in {
+      val keys = Seq(999, -1, -2)
+      val defaults = Seq(1234, 5678, 987)
+      val biValues = keys.zip(defaults).map{ case (k,d) => bi.getOrElse(k, d) }
+
+      biValues must beEqualTo(defaults)
+    }
+
+    "contains() returns true/false correctly" in {
+      val checkKeys = keys ++ Seq(12345)
+      val biValues = checkKeys.map(k => bi.contains(k))
+      val expected = orgValues.map(_ => true) ++ Seq(false)
+
+      biValues must beEqualTo(expected)
+    }
+
+    "same size as original map" in {
+      (bi.size) must beEqualTo(org.size)
+    }
+
+    "take(2) returns BiMap of size 2" in {
+      bi.take(2).size must beEqualTo(2)
+    }
+
+    "toMap contain same element as original map" in {
+      (bi.toMap) must beEqualTo(org)
+    }
+
+    "toSeq contain same element as original map" in {
+      (bi.toSeq) must containTheSameElementsAs(org.toSeq)
+    }
+
+    "inverse and return correct keys for each values of original map" in {
+      val biKeys = orgValues.map(v => bi.inverse(v))
+      biKeys must beEqualTo(keys)
+    }
+
+    "inverse with same size" in {
+      bi.inverse.size must beEqualTo(org.size)
+    }
+
+    "inverse's inverse reference back to the same original object" in {
+      // NOTE: reference equality
+      bi.inverse.inverse == bi
+    }
+  }
+
+  "BiMap created with duplicated values in map" should {
+    val dup = Map(1 -> 2, 4 -> 7, 6 -> 7)
+    "return IllegalArgumentException" in {
+      BiMap(dup) must throwA[IllegalArgumentException]
+    }
+  }
+
+  "BiMap.stringLong and stringInt" should {
+
+    "create BiMap from set of string" in {
+      val keys = Set("a", "b", "foo", "bar")
+      val values: Seq[Long] = Seq(0, 1, 2, 3)
+
+      val bi = BiMap.stringLong(keys)
+      val biValues = keys.map(k => bi(k))
+
+      val biInt = BiMap.stringInt(keys)
+      val valuesInt: Seq[Int] = values.map(_.toInt)
+      val biIntValues = keys.map(k => biInt(k))
+
+      biValues must containTheSameElementsAs(values) and
+        (biIntValues must containTheSameElementsAs(valuesInt))
+    }
+
+    "create BiMap from Array of unique string" in {
+      val keys = Array("a", "b", "foo", "bar")
+      val values: Seq[Long] = Seq(0, 1, 2, 3)
+
+      val bi = BiMap.stringLong(keys)
+      val biValues = keys.toSeq.map(k => bi(k))
+
+      val biInt = BiMap.stringInt(keys)
+      val valuesInt: Seq[Int] = values.map(_.toInt)
+      val biIntValues = keys.toSeq.map(k => biInt(k))
+
+      biValues must containTheSameElementsAs(values) and
+        (biIntValues must containTheSameElementsAs(valuesInt))
+    }
+
+    "not guarantee sequential index for Array with duplicated string" in {
+      val keys = Array("a", "b", "foo", "bar", "a", "b", "x")
+      val dupValues: Seq[Long] = Seq(0, 1, 2, 3, 4, 5, 6)
+      val values = keys.zip(dupValues).toMap.values.toSeq
+
+      val bi = BiMap.stringLong(keys)
+      val biValues = keys.toSet[String].map(k => bi(k))
+
+      val biInt = BiMap.stringInt(keys)
+      val valuesInt: Seq[Int] = values.map(_.toInt)
+      val biIntValues = keys.toSet[String].map(k => biInt(k))
+
+      biValues must containTheSameElementsAs(values) and
+        (biIntValues must containTheSameElementsAs(valuesInt))
+    }
+
+    "create BiMap from RDD[String]" in {
+
+      val keys = Seq("a", "b", "foo", "bar")
+      val values: Seq[Long] = Seq(0, 1, 2, 3)
+      val rdd = sc.parallelize(keys)
+
+      val bi = BiMap.stringLong(rdd)
+      val biValues = keys.map(k => bi(k))
+
+      val biInt = BiMap.stringInt(rdd)
+      val valuesInt: Seq[Int] = values.map(_.toInt)
+      val biIntValues = keys.map(k => biInt(k))
+
+      biValues must containTheSameElementsAs(values) and
+        (biIntValues must containTheSameElementsAs(valuesInt))
+    }
+
+    "create BiMap from RDD[String] with duplicated string" in {
+
+      val keys = Seq("a", "b", "foo", "bar", "a", "b", "x")
+      val values: Seq[Long] = Seq(0, 1, 2, 3, 4)
+      val rdd = sc.parallelize(keys)
+
+      val bi = BiMap.stringLong(rdd)
+      val biValues = keys.distinct.map(k => bi(k))
+
+      val biInt = BiMap.stringInt(rdd)
+      val valuesInt: Seq[Int] = values.map(_.toInt)
+      val biIntValues = keys.distinct.map(k => biInt(k))
+
+      biValues must containTheSameElementsAs(values) and
+        (biIntValues must containTheSameElementsAs(valuesInt))
+    }
+  }
+
+  step(sc.stop())
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala
new file mode 100644
index 0000000..46ae8dd
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala
@@ -0,0 +1,243 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.specs2.mutable._
+
+class DataMapSpec extends Specification {
+
+  "DataMap" should {
+
+    val properties = DataMap("""
+      {
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c", "c"],
+        "prop6" : 4.56
+      }
+      """)
+
+    "get Int data" in {
+      properties.get[Int]("prop1") must beEqualTo(1)
+      properties.getOpt[Int]("prop1") must beEqualTo(Some(1))
+    }
+
+    "get String data" in {
+      properties.get[String]("prop2") must beEqualTo("value2")
+      properties.getOpt[String]("prop2") must beEqualTo(Some("value2"))
+    }
+
+    "get List of Int data" in {
+      properties.get[List[Int]]("prop3") must beEqualTo(List(1,2,3))
+      properties.getOpt[List[Int]]("prop3") must beEqualTo(Some(List(1,2,3)))
+    }
+
+    "get Boolean data" in {
+      properties.get[Boolean]("prop4") must beEqualTo(true)
+      properties.getOpt[Boolean]("prop4") must beEqualTo(Some(true))
+    }
+
+    "get List of String data" in {
+      properties.get[List[String]]("prop5") must beEqualTo(List("a", "b", "c", "c"))
+      properties.getOpt[List[String]]("prop5") must beEqualTo(Some(List("a", "b", "c", "c")))
+    }
+
+    "get Set of String data" in {
+      properties.get[Set[String]]("prop5") must beEqualTo(Set("a", "b", "c"))
+      properties.getOpt[Set[String]]("prop5") must beEqualTo(Some(Set("a", "b", "c")))
+    }
+
+    "get Double data" in {
+      properties.get[Double]("prop6") must beEqualTo(4.56)
+      properties.getOpt[Double]("prop6") must beEqualTo(Some(4.56))
+    }
+
+    "get empty optional Int data" in {
+      properties.getOpt[Int]("prop9999") must beEqualTo(None)
+    }
+
+  }
+
+  "DataMap with multi-level data" should {
+    val properties = DataMap("""
+      {
+        "context": {
+          "ip": "1.23.4.56",
+          "prop1": 2.345
+          "prop2": "value1",
+          "prop4": [1, 2, 3]
+        },
+        "anotherPropertyA": 4.567,
+        "anotherPropertyB": false
+      }
+      """)
+
+    "get case class data" in {
+      val expected = DataMapSpec.Context(
+        ip = "1.23.4.56",
+        prop1 = Some(2.345),
+        prop2 = Some("value1"),
+        prop3 = None,
+        prop4 = List(1,2,3)
+      )
+
+      properties.get[DataMapSpec.Context]("context") must beEqualTo(expected)
+    }
+
+    "get empty optional case class data" in {
+      properties.getOpt[DataMapSpec.Context]("context999") must beEqualTo(None)
+    }
+
+    "get double data" in {
+      properties.get[Double]("anotherPropertyA") must beEqualTo(4.567)
+    }
+
+    "get boolean data" in {
+      properties.get[Boolean]("anotherPropertyB") must beEqualTo(false)
+    }
+  }
+
+  "DataMap extract" should {
+
+    "extract to case class object" in {
+      val properties = DataMap("""
+        {
+          "prop1" : 1,
+          "prop2" : "value2",
+          "prop3" : [1, 2, 3],
+          "prop4" : true,
+          "prop5" : ["a", "b", "c", "c"],
+          "prop6" : 4.56
+        }
+        """)
+
+      val result = properties.extract[DataMapSpec.BasicProperty]
+      val expected = DataMapSpec.BasicProperty(
+        prop1 = 1,
+        prop2 = "value2",
+        prop3 = List(1,2,3),
+        prop4 = true,
+        prop5 = List("a", "b", "c", "c"),
+        prop6 = 4.56
+      )
+
+      result must beEqualTo(expected)
+    }
+
+    "extract with optional fields" in {
+      val propertiesEmpty = DataMap("""{}""")
+      val propertiesSome = DataMap("""
+        {
+          "prop1" : 1,
+          "prop5" : ["a", "b", "c", "c"],
+          "prop6" : 4.56
+        }
+        """)
+
+      val resultEmpty = propertiesEmpty.extract[DataMapSpec.OptionProperty]
+      val expectedEmpty = DataMapSpec.OptionProperty(
+        prop1 = None,
+        prop2 = None,
+        prop3 = None,
+        prop4 = None,
+        prop5 = None,
+        prop6 = None
+      )
+
+      val resultSome = propertiesSome.extract[DataMapSpec.OptionProperty]
+      val expectedSome = DataMapSpec.OptionProperty(
+        prop1 = Some(1),
+        prop2 = None,
+        prop3 = None,
+        prop4 = None,
+        prop5 = Some(List("a", "b", "c", "c")),
+        prop6 = Some(4.56)
+      )
+
+      resultEmpty must beEqualTo(expectedEmpty)
+      resultSome must beEqualTo(expectedSome)
+    }
+
+    "extract to multi-level object" in {
+      val properties = DataMap("""
+        {
+          "context": {
+            "ip": "1.23.4.56",
+            "prop1": 2.345
+            "prop2": "value1",
+            "prop4": [1, 2, 3]
+          },
+          "anotherPropertyA": 4.567,
+          "anotherPropertyB": false
+        }
+        """)
+
+      val result = properties.extract[DataMapSpec.MultiLevelProperty]
+      val expected = DataMapSpec.MultiLevelProperty(
+        context = DataMapSpec.Context(
+          ip = "1.23.4.56",
+          prop1 = Some(2.345),
+          prop2 = Some("value1"),
+          prop3 = None,
+          prop4 = List(1,2,3)
+        ),
+        anotherPropertyA = 4.567,
+        anotherPropertyB = false
+      )
+
+      result must beEqualTo(expected)
+    }
+
+  }
+}
+
+object DataMapSpec {
+
+  // define this case class inside object to avoid case class name conflict with other tests
+  case class Context(
+    ip: String,
+    prop1: Option[Double],
+    prop2: Option[String],
+    prop3: Option[Int],
+    prop4: List[Int]
+  )
+
+  case class BasicProperty(
+    prop1: Int,
+    prop2: String,
+    prop3: List[Int],
+    prop4: Boolean,
+    prop5: List[String],
+    prop6: Double
+  )
+
+  case class OptionProperty(
+    prop1: Option[Int],
+    prop2: Option[String],
+    prop3: Option[List[Int]],
+    prop4: Option[Boolean],
+    prop5: Option[List[String]],
+    prop6: Option[Double]
+  )
+
+  case class MultiLevelProperty(
+    context: Context,
+    anotherPropertyA: Double,
+    anotherPropertyB: Boolean
+  )
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala
new file mode 100644
index 0000000..8c02186
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala
@@ -0,0 +1,103 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.specs2.mutable._
+
+import org.json4s.JObject
+import org.json4s.native.JsonMethods.parse
+
+import org.joda.time.DateTime
+
+class LEventAggregatorSpec extends Specification with TestEvents {
+
+  "LEventAggregator.aggregateProperties()" should {
+
+    "aggregate two entities' properties as DataMap correctly" in {
+      val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+      val result: Map[String, DataMap] =
+        LEventAggregator.aggregateProperties(events.toIterator)
+
+      val expected = Map(
+        "u1" -> DataMap(u1),
+        "u2" -> DataMap(u2)
+      )
+
+      result must beEqualTo(expected)
+    }
+
+    "aggregate two entities' properties as PropertyMap correctly" in {
+      val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+      val result: Map[String, PropertyMap] =
+        LEventAggregator.aggregateProperties(events.toIterator)
+
+      val expected = Map(
+        "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+        "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+      )
+
+      result must beEqualTo(expected)
+    }
+
+
+    "aggregate deleted entity correctly" in {
+      val events = Vector(u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+      val result = LEventAggregator.aggregateProperties(events.toIterator)
+      val expected = Map(
+        "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+      )
+
+      result must beEqualTo(expected)
+    }
+  }
+
+
+  "LEventAggregator.aggregatePropertiesSingle()" should {
+
+    "aggregate single entity properties as DataMap correctly" in {
+        val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2)
+        val eventsIt = events.toIterator
+
+        val result: Option[DataMap] = LEventAggregator
+          .aggregatePropertiesSingle(eventsIt)
+        val expected = DataMap(u1)
+
+        result must beEqualTo(Some(expected))
+    }
+
+    "aggregate single entity properties as PropertyMap correctly" in {
+        val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2)
+        val eventsIt = events.toIterator
+
+        val result: Option[PropertyMap] = LEventAggregator
+          .aggregatePropertiesSingle(eventsIt)
+        val expected = PropertyMap(u1, u1BaseTime, u1LastTime)
+
+        result must beEqualTo(Some(expected))
+    }
+
+    "aggregate deleted entity correctly" in {
+      // put the delete event in the middle
+      val events = Vector(u1e4, u1e2, u1ed, u1e3, u1e1, u1e5)
+      val eventsIt = events.toIterator
+
+      val result = LEventAggregator.aggregatePropertiesSingle(eventsIt)
+
+      result must beEqualTo(None)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
new file mode 100644
index 0000000..0639613
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
@@ -0,0 +1,245 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.specs2._
+import org.specs2.specification.Step
+
+class LEventsSpec extends Specification with TestEvents {
+  def is = s2"""
+
+  PredictionIO Storage LEvents Specification
+
+    Events can be implemented by:
+    - HBLEvents ${hbEvents}
+    - JDBCLEvents ${jdbcLEvents}
+
+  """
+
+  def hbEvents = sequential ^ s2"""
+
+    HBLEvents should
+    - behave like any LEvents implementation ${events(hbDO)}
+    - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
+
+  """
+
+  def jdbcLEvents = sequential ^ s2"""
+
+    JDBCLEvents should
+    - behave like any LEvents implementation ${events(jdbcDO)}
+
+  """
+
+  val appId = 1
+
+  def events(eventClient: LEvents) = sequential ^ s2"""
+
+    init default ${initDefault(eventClient)}
+    insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)}
+    insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)}
+    insert and delete by ID ${insertAndDelete(eventClient)}
+    insert test user events ${insertTestUserEvents(eventClient)}
+    find user events ${findUserEvents(eventClient)}
+    aggregate user properties ${aggregateUserProperties(eventClient)}
+    aggregate one user properties ${aggregateOneUserProperties(eventClient)}
+    aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)}
+    init channel ${initChannel(eventClient)}
+    insert 2 events to channel ${insertChannel(eventClient)}
+    insert 1 event to channel and delete by ID  ${insertAndDeleteChannel(eventClient)}
+    find events from channel ${findChannel(eventClient)}
+    remove default ${removeDefault(eventClient)}
+    remove channel ${removeChannel(eventClient)}
+
+  """
+
+  val dbName = "test_pio_storage_events_" + hashCode
+  def hbDO = Storage.getDataObject[LEvents](
+    StorageTestUtils.hbaseSourceName,
+    dbName
+  )
+
+  def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName)
+
+  def initDefault(eventClient: LEvents) = {
+    eventClient.init(appId)
+  }
+
+  def insertAndGetEvents(eventClient: LEvents) = {
+
+    // events from TestEvents trait
+    val listOfEvents = List(r1,r2,r3)
+
+    val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+    val insertedEventId: List[String] = insertResp
+
+    val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+      .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+    val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+    val getEvents = getResp
+
+    insertedEvent must containTheSameElementsAs(getEvents)
+  }
+
+  def insertAndGetTimezone(eventClient: LEvents) = {
+    val listOfEvents = List(tz1, tz2, tz3)
+
+    val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+    val insertedEventId: List[String] = insertResp
+
+    val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+      .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+    val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+    val getEvents = getResp
+
+    insertedEvent must containTheSameElementsAs(getEvents)
+  }
+
+  def insertAndDelete(eventClient: LEvents) = {
+    val eventId = eventClient.insert(r2, appId)
+
+    val resultBefore = eventClient.get(eventId, appId)
+
+    val expectedBefore = r2.copy(eventId = Some(eventId))
+
+    val deleteStatus = eventClient.delete(eventId, appId)
+
+    val resultAfter = eventClient.get(eventId, appId)
+
+    (resultBefore must beEqualTo(Some(expectedBefore))) and
+    (deleteStatus must beEqualTo(true)) and
+    (resultAfter must beEqualTo(None))
+  }
+
+  def insertTestUserEvents(eventClient: LEvents) = {
+    // events from TestEvents trait
+    val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+    listOfEvents.map{ eventClient.insert(_, appId) }
+
+    success
+  }
+
+  def findUserEvents(eventClient: LEvents) = {
+
+    val results: List[Event] = eventClient.find(
+      appId = appId,
+      entityType = Some("user"))
+      .toList
+      .map(e => e.copy(eventId = None)) // ignore eventID
+
+    // same events in insertTestUserEvents
+    val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+    results must containTheSameElementsAs(expected)
+  }
+
+  def aggregateUserProperties(eventClient: LEvents) = {
+
+    val result: Map[String, PropertyMap] = eventClient.aggregateProperties(
+      appId = appId,
+      entityType = "user")
+
+    val expected = Map(
+      "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+      "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+    )
+
+    result must beEqualTo(expected)
+  }
+
+  def aggregateOneUserProperties(eventClient: LEvents) = {
+    val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+      appId = appId,
+      entityType = "user",
+      entityId = "u1")
+
+    val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime))
+
+    result must beEqualTo(expected)
+  }
+
+  def aggregateNonExistentUserProperties(eventClient: LEvents) = {
+    val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+      appId = appId,
+      entityType = "user",
+      entityId = "u999999")
+
+    result must beEqualTo(None)
+  }
+
+  val channelId = 12
+
+  def initChannel(eventClient: LEvents) = {
+    eventClient.init(appId, Some(channelId))
+  }
+
+  def insertChannel(eventClient: LEvents) = {
+
+    // events from TestEvents trait
+    val listOfEvents = List(r4,r5)
+
+    listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) )
+
+    success
+  }
+
+  def insertAndDeleteChannel(eventClient: LEvents) = {
+
+    val eventId = eventClient.insert(r2, appId, Some(channelId))
+
+    val resultBefore = eventClient.get(eventId, appId, Some(channelId))
+
+    val expectedBefore = r2.copy(eventId = Some(eventId))
+
+    val deleteStatus = eventClient.delete(eventId, appId, Some(channelId))
+
+    val resultAfter = eventClient.get(eventId, appId, Some(channelId))
+
+    (resultBefore must beEqualTo(Some(expectedBefore))) and
+    (deleteStatus must beEqualTo(true)) and
+    (resultAfter must beEqualTo(None))
+  }
+
+  def findChannel(eventClient: LEvents) = {
+
+    val results: List[Event] = eventClient.find(
+      appId = appId,
+      channelId = Some(channelId)
+    )
+    .toList
+    .map(e => e.copy(eventId = None)) // ignore eventId
+
+    // same events in insertChannel
+    val expected = List(r4, r5)
+
+    results must containTheSameElementsAs(expected)
+  }
+
+  def removeDefault(eventClient: LEvents) = {
+    eventClient.remove(appId)
+  }
+
+  def removeChannel(eventClient: LEvents) = {
+    eventClient.remove(appId, Some(channelId))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala
new file mode 100644
index 0000000..21790ad
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala
@@ -0,0 +1,72 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.specs2.mutable._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+
+class PEventAggregatorSpec extends Specification with TestEvents {
+
+  System.clearProperty("spark.driver.port")
+  System.clearProperty("spark.hostPort")
+  val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
+
+  "PEventAggregator" should {
+
+    "aggregate two entities' properties as DataMap/PropertyMap correctly" in {
+      val events = sc.parallelize(Seq(
+        u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2))
+
+      val users = PEventAggregator.aggregateProperties(events)
+
+      val userMap = users.collectAsMap.toMap
+      val expectedDM = Map(
+        "u1" -> DataMap(u1),
+        "u2" -> DataMap(u2)
+      )
+
+      val expectedPM = Map(
+        "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+        "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+      )
+
+      userMap must beEqualTo(expectedDM)
+      userMap must beEqualTo(expectedPM)
+    }
+
+    "aggregate deleted entity correctly" in {
+      // put the delete event in middle
+      val events = sc.parallelize(Seq(
+        u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2))
+
+      val users = PEventAggregator.aggregateProperties(events)
+
+      val userMap = users.collectAsMap.toMap
+      val expectedPM = Map(
+        "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+      )
+
+      userMap must beEqualTo(expectedPM)
+    }
+
+  }
+
+  step(sc.stop())
+}