You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/03/16 14:52:15 UTC

[2/7] incubator-predictionio git commit: Move storage depended tests to each storage project

Move storage depended tests to each storage project

Closes #362


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/647b480b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/647b480b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/647b480b

Branch: refs/heads/feature/xbuild
Commit: 647b480b0f0f693ec1b4d04dc9a2cf1492f0a0fe
Parents: 606be41
Author: Naoki Takezoe <ta...@gmail.com>
Authored: Wed Mar 15 09:42:30 2017 -0700
Committer: Donald Szeto <do...@apache.org>
Committed: Wed Mar 15 09:42:30 2017 -0700

----------------------------------------------------------------------
 .../predictionio/data/storage/LEventsSpec.scala | 248 -----------------
 .../predictionio/data/storage/PEventsSpec.scala | 213 ---------------
 .../data/storage/StorageTestUtils.scala         |  45 ----
 .../data/storage/hbase/LEventsSpec.scala        | 239 +++++++++++++++++
 .../data/storage/hbase/PEventsSpec.scala        | 192 +++++++++++++
 .../data/storage/hbase/StorageTestUtils.scala   |  40 +++
 .../data/storage/hbase/TestEvents.scala         | 266 +++++++++++++++++++
 .../data/storage/jdbc/LEventsSpec.scala         | 236 ++++++++++++++++
 .../data/storage/jdbc/PEventsSpec.scala         | 193 ++++++++++++++
 .../data/storage/jdbc/StorageTestUtils.scala    |  29 ++
 .../data/storage/jdbc/TestEvents.scala          | 266 +++++++++++++++++++
 11 files changed, 1461 insertions(+), 506 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
deleted file mode 100644
index 3938072..0000000
--- a/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage
-
-import org.specs2._
-import org.specs2.specification.Step
-
-class LEventsSpec extends Specification with TestEvents {
-  def is = s2"""
-
-  PredictionIO Storage LEvents Specification
-
-    Events can be implemented by:
-    - HBLEvents ${hbEvents}
-    - JDBCLEvents ${jdbcLEvents}
-
-  """
-
-  def hbEvents = sequential ^ s2"""
-
-    HBLEvents should
-    - behave like any LEvents implementation ${events(hbDO)}
-    - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
-
-  """
-
-  def jdbcLEvents = sequential ^ s2"""
-
-    JDBCLEvents should
-    - behave like any LEvents implementation ${events(jdbcDO)}
-
-  """
-
-  val appId = 1
-
-  def events(eventClient: LEvents) = sequential ^ s2"""
-
-    init default ${initDefault(eventClient)}
-    insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)}
-    insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)}
-    insert and delete by ID ${insertAndDelete(eventClient)}
-    insert test user events ${insertTestUserEvents(eventClient)}
-    find user events ${findUserEvents(eventClient)}
-    aggregate user properties ${aggregateUserProperties(eventClient)}
-    aggregate one user properties ${aggregateOneUserProperties(eventClient)}
-    aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)}
-    init channel ${initChannel(eventClient)}
-    insert 2 events to channel ${insertChannel(eventClient)}
-    insert 1 event to channel and delete by ID  ${insertAndDeleteChannel(eventClient)}
-    find events from channel ${findChannel(eventClient)}
-    remove default ${removeDefault(eventClient)}
-    remove channel ${removeChannel(eventClient)}
-
-  """
-
-  val dbName = "test_pio_storage_events_" + hashCode
-  def hbDO = Storage.getDataObject[LEvents](
-    StorageTestUtils.hbaseSourceName,
-    dbName
-  )
-
-  def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName)
-
-  def initDefault(eventClient: LEvents) = {
-    eventClient.init(appId)
-  }
-
-  def insertAndGetEvents(eventClient: LEvents) = {
-
-    // events from TestEvents trait
-    val listOfEvents = List(r1,r2,r3)
-
-    val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
-
-    val insertedEventId: List[String] = insertResp
-
-    val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
-      .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
-
-    val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
-
-    val getEvents = getResp
-
-    insertedEvent must containTheSameElementsAs(getEvents)
-  }
-
-  def insertAndGetTimezone(eventClient: LEvents) = {
-    val listOfEvents = List(tz1, tz2, tz3)
-
-    val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
-
-    val insertedEventId: List[String] = insertResp
-
-    val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
-      .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
-
-    val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
-
-    val getEvents = getResp
-
-    insertedEvent must containTheSameElementsAs(getEvents)
-  }
-
-  def insertAndDelete(eventClient: LEvents) = {
-    val eventId = eventClient.insert(r2, appId)
-
-    val resultBefore = eventClient.get(eventId, appId)
-
-    val expectedBefore = r2.copy(eventId = Some(eventId))
-
-    val deleteStatus = eventClient.delete(eventId, appId)
-
-    val resultAfter = eventClient.get(eventId, appId)
-
-    (resultBefore must beEqualTo(Some(expectedBefore))) and
-    (deleteStatus must beEqualTo(true)) and
-    (resultAfter must beEqualTo(None))
-  }
-
-  def insertTestUserEvents(eventClient: LEvents) = {
-    // events from TestEvents trait
-    val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
-
-    listOfEvents.map{ eventClient.insert(_, appId) }
-
-    success
-  }
-
-  def findUserEvents(eventClient: LEvents) = {
-
-    val results: List[Event] = eventClient.find(
-      appId = appId,
-      entityType = Some("user"))
-      .toList
-      .map(e => e.copy(eventId = None)) // ignore eventID
-
-    // same events in insertTestUserEvents
-    val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
-
-    results must containTheSameElementsAs(expected)
-  }
-
-  def aggregateUserProperties(eventClient: LEvents) = {
-
-    val result: Map[String, PropertyMap] = eventClient.aggregateProperties(
-      appId = appId,
-      entityType = "user")
-
-    val expected = Map(
-      "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
-      "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
-    )
-
-    result must beEqualTo(expected)
-  }
-
-  def aggregateOneUserProperties(eventClient: LEvents) = {
-    val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
-      appId = appId,
-      entityType = "user",
-      entityId = "u1")
-
-    val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime))
-
-    result must beEqualTo(expected)
-  }
-
-  def aggregateNonExistentUserProperties(eventClient: LEvents) = {
-    val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
-      appId = appId,
-      entityType = "user",
-      entityId = "u999999")
-
-    result must beEqualTo(None)
-  }
-
-  val channelId = 12
-
-  def initChannel(eventClient: LEvents) = {
-    eventClient.init(appId, Some(channelId))
-  }
-
-  def insertChannel(eventClient: LEvents) = {
-
-    // events from TestEvents trait
-    val listOfEvents = List(r4,r5)
-
-    listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) )
-
-    success
-  }
-
-  def insertAndDeleteChannel(eventClient: LEvents) = {
-
-    val eventId = eventClient.insert(r2, appId, Some(channelId))
-
-    val resultBefore = eventClient.get(eventId, appId, Some(channelId))
-
-    val expectedBefore = r2.copy(eventId = Some(eventId))
-
-    val deleteStatus = eventClient.delete(eventId, appId, Some(channelId))
-
-    val resultAfter = eventClient.get(eventId, appId, Some(channelId))
-
-    (resultBefore must beEqualTo(Some(expectedBefore))) and
-    (deleteStatus must beEqualTo(true)) and
-    (resultAfter must beEqualTo(None))
-  }
-
-  def findChannel(eventClient: LEvents) = {
-
-    val results: List[Event] = eventClient.find(
-      appId = appId,
-      channelId = Some(channelId)
-    )
-    .toList
-    .map(e => e.copy(eventId = None)) // ignore eventId
-
-    // same events in insertChannel
-    val expected = List(r4, r5)
-
-    results must containTheSameElementsAs(expected)
-  }
-
-  def removeDefault(eventClient: LEvents) = {
-    eventClient.remove(appId)
-  }
-
-  def removeChannel(eventClient: LEvents) = {
-    eventClient.remove(appId, Some(channelId))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala
deleted file mode 100644
index ccd71a4..0000000
--- a/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage
-
-import org.specs2._
-import org.specs2.specification.Step
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class PEventsSpec extends Specification with TestEvents {
-
-  System.clearProperty("spark.driver.port")
-  System.clearProperty("spark.hostPort")
-  val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
-
-  val appId = 1
-  val channelId = 6
-  val dbName = "test_pio_storage_events_" + hashCode
-
-  def hbLocal = Storage.getDataObject[LEvents](
-    StorageTestUtils.hbaseSourceName,
-    dbName
-  )
-
-  def hbPar = Storage.getDataObject[PEvents](
-    StorageTestUtils.hbaseSourceName,
-    dbName
-  )
-
-  def jdbcLocal = Storage.getDataObject[LEvents](
-    StorageTestUtils.jdbcSourceName,
-    dbName
-  )
-
-  def jdbcPar = Storage.getDataObject[PEvents](
-    StorageTestUtils.jdbcSourceName,
-    dbName
-  )
-
-  def stopSpark = {
-    sc.stop()
-  }
-
-  def is = s2"""
-
-  PredictionIO Storage PEvents Specification
-
-    PEvents can be implemented by:
-    - HBPEvents ${hbPEvents}
-    - JDBCPEvents ${jdbcPEvents}
-    - (stop Spark) ${Step(sc.stop())}
-
-  """
-
-  def hbPEvents = sequential ^ s2"""
-
-    HBPEvents should
-    - behave like any PEvents implementation ${events(hbLocal, hbPar)}
-    - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
-
-  """
-
-  def jdbcPEvents = sequential ^ s2"""
-
-    JDBCPEvents should
-    - behave like any PEvents implementation ${events(jdbcLocal, jdbcPar)}
-    - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_$appId"))}
-    - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_${appId}_$channelId"))}
-
-  """
-
-  def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2"""
-
-    - (init test) ${initTest(localEventClient)}
-    - (insert test events) ${insertTestEvents(localEventClient)}
-    find in default ${find(parEventClient)}
-    find in channel ${findChannel(parEventClient)}
-    aggregate user properties in default ${aggregateUserProperties(parEventClient)}
-    aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)}
-    write to default ${write(parEventClient)}
-    write to channel ${writeChannel(parEventClient)}
-
-  """
-
-  /* setup */
-
-  // events from TestEvents trait
-  val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2)
-  val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4)
-
-  def initTest(localEventClient: LEvents) = {
-    localEventClient.init(appId)
-    localEventClient.init(appId, Some(channelId))
-  }
-
-  def insertTestEvents(localEventClient: LEvents) = {
-    listOfEvents.map( localEventClient.insert(_, appId) )
-    // insert to channel
-    listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) )
-    success
-  }
-
-  /* following are tests */
-
-  def find(parEventClient: PEvents) = {
-    val resultRDD: RDD[Event] = parEventClient.find(
-      appId = appId
-    )(sc)
-
-    val results = resultRDD.collect.toList
-      .map {_.copy(eventId = None)} // ignore eventId
-
-    results must containTheSameElementsAs(listOfEvents)
-  }
-
-  def findChannel(parEventClient: PEvents) = {
-    val resultRDD: RDD[Event] = parEventClient.find(
-      appId = appId,
-      channelId = Some(channelId)
-    )(sc)
-
-    val results = resultRDD.collect.toList
-      .map {_.copy(eventId = None)} // ignore eventId
-
-    results must containTheSameElementsAs(listOfEventsChannel)
-  }
-
-  def aggregateUserProperties(parEventClient: PEvents) = {
-    val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
-      appId = appId,
-      entityType = "user"
-    )(sc)
-    val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
-
-    val expected = Map(
-      "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
-      "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
-    )
-
-    result must beEqualTo(expected)
-  }
-
-  def aggregateUserPropertiesChannel(parEventClient: PEvents) = {
-    val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
-      appId = appId,
-      channelId = Some(channelId),
-      entityType = "user"
-    )(sc)
-    val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
-
-    val expected = Map(
-      "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime)
-    )
-
-    result must beEqualTo(expected)
-  }
-
-  def write(parEventClient: PEvents) = {
-    val written = List(r5, r6)
-    val writtenRDD = sc.parallelize(written)
-    parEventClient.write(writtenRDD, appId)(sc)
-
-    // read back
-    val resultRDD = parEventClient.find(
-      appId = appId
-    )(sc)
-
-    val results = resultRDD.collect.toList
-      .map { _.copy(eventId = None)} // ignore eventId
-
-    val expected = listOfEvents ++ written
-
-    results must containTheSameElementsAs(expected)
-  }
-
-  def writeChannel(parEventClient: PEvents) = {
-    val written = List(r1, r5, r6)
-    val writtenRDD = sc.parallelize(written)
-    parEventClient.write(writtenRDD, appId, Some(channelId))(sc)
-
-    // read back
-    val resultRDD = parEventClient.find(
-      appId = appId,
-      channelId = Some(channelId)
-    )(sc)
-
-    val results = resultRDD.collect.toList
-      .map { _.copy(eventId = None)} // ignore eventId
-
-    val expected = listOfEventsChannel ++ written
-
-    results must containTheSameElementsAs(expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala b/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala
deleted file mode 100644
index 747076a..0000000
--- a/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage
-
-import org.apache.predictionio.data.storage.hbase.HBLEvents
-import scalikejdbc._
-
-object StorageTestUtils {
-  val hbaseSourceName = "HBASE"
-  val jdbcSourceName = "PGSQL"
-
-  def dropHBaseNamespace(namespace: String): Unit = {
-    val eventDb = Storage.getDataObject[LEvents](hbaseSourceName, namespace)
-      .asInstanceOf[HBLEvents]
-    val admin = eventDb.client.admin
-    val tableNames = admin.listTableNamesByNamespace(namespace)
-    tableNames.foreach { name =>
-      admin.disableTable(name)
-      admin.deleteTable(name)
-    }
-
-    //Only empty namespaces (no tables) can be removed.
-    admin.deleteNamespace(namespace)
-  }
-
-  def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s =>
-    SQL(s"drop table $table").execute().apply()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala
new file mode 100644
index 0000000..c813ced
--- /dev/null
+++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage.{Event, LEvents, PropertyMap, Storage}
+import org.specs2._
+import org.specs2.specification.Step
+
+class LEventsSpec extends Specification with TestEvents {
+  def is = s2"""
+
+  PredictionIO Storage LEvents Specification
+
+    Events can be implemented by:
+    - HBLEvents ${hbEvents}
+
+  """
+
+  def hbEvents = sequential ^ s2"""
+
+    HBLEvents should
+    - behave like any LEvents implementation ${events(hbDO)}
+    - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
+
+  """
+
+  val appId = 1
+
+  def events(eventClient: LEvents) = sequential ^ s2"""
+
+    init default ${initDefault(eventClient)}
+    insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)}
+    insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)}
+    insert and delete by ID ${insertAndDelete(eventClient)}
+    insert test user events ${insertTestUserEvents(eventClient)}
+    find user events ${findUserEvents(eventClient)}
+    aggregate user properties ${aggregateUserProperties(eventClient)}
+    aggregate one user properties ${aggregateOneUserProperties(eventClient)}
+    aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)}
+    init channel ${initChannel(eventClient)}
+    insert 2 events to channel ${insertChannel(eventClient)}
+    insert 1 event to channel and delete by ID  ${insertAndDeleteChannel(eventClient)}
+    find events from channel ${findChannel(eventClient)}
+    remove default ${removeDefault(eventClient)}
+    remove channel ${removeChannel(eventClient)}
+
+  """
+
+  val dbName = "test_pio_storage_events_" + hashCode
+  def hbDO = Storage.getDataObject[LEvents](
+    StorageTestUtils.hbaseSourceName,
+    dbName
+  )
+
+  def initDefault(eventClient: LEvents) = {
+    eventClient.init(appId)
+  }
+
+  def insertAndGetEvents(eventClient: LEvents) = {
+
+    // events from TestEvents trait
+    val listOfEvents = List(r1,r2,r3)
+
+    val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+    val insertedEventId: List[String] = insertResp
+
+    val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+      .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+    val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+    val getEvents = getResp
+
+    insertedEvent must containTheSameElementsAs(getEvents)
+  }
+
+  def insertAndGetTimezone(eventClient: LEvents) = {
+    val listOfEvents = List(tz1, tz2, tz3)
+
+    val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+    val insertedEventId: List[String] = insertResp
+
+    val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+      .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+    val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+    val getEvents = getResp
+
+    insertedEvent must containTheSameElementsAs(getEvents)
+  }
+
+  def insertAndDelete(eventClient: LEvents) = {
+    val eventId = eventClient.insert(r2, appId)
+
+    val resultBefore = eventClient.get(eventId, appId)
+
+    val expectedBefore = r2.copy(eventId = Some(eventId))
+
+    val deleteStatus = eventClient.delete(eventId, appId)
+
+    val resultAfter = eventClient.get(eventId, appId)
+
+    (resultBefore must beEqualTo(Some(expectedBefore))) and
+    (deleteStatus must beEqualTo(true)) and
+    (resultAfter must beEqualTo(None))
+  }
+
+  def insertTestUserEvents(eventClient: LEvents) = {
+    // events from TestEvents trait
+    val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+    listOfEvents.map{ eventClient.insert(_, appId) }
+
+    success
+  }
+
+  def findUserEvents(eventClient: LEvents) = {
+
+    val results: List[Event] = eventClient.find(
+      appId = appId,
+      entityType = Some("user"))
+      .toList
+      .map(e => e.copy(eventId = None)) // ignore eventID
+
+    // same events in insertTestUserEvents
+    val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+    results must containTheSameElementsAs(expected)
+  }
+
+  def aggregateUserProperties(eventClient: LEvents) = {
+
+    val result: Map[String, PropertyMap] = eventClient.aggregateProperties(
+      appId = appId,
+      entityType = "user")
+
+    val expected = Map(
+      "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+      "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+    )
+
+    result must beEqualTo(expected)
+  }
+
+  def aggregateOneUserProperties(eventClient: LEvents) = {
+    val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+      appId = appId,
+      entityType = "user",
+      entityId = "u1")
+
+    val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime))
+
+    result must beEqualTo(expected)
+  }
+
+  def aggregateNonExistentUserProperties(eventClient: LEvents) = {
+    val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+      appId = appId,
+      entityType = "user",
+      entityId = "u999999")
+
+    result must beEqualTo(None)
+  }
+
+  val channelId = 12
+
+  def initChannel(eventClient: LEvents) = {
+    eventClient.init(appId, Some(channelId))
+  }
+
+  def insertChannel(eventClient: LEvents) = {
+
+    // events from TestEvents trait
+    val listOfEvents = List(r4,r5)
+
+    listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) )
+
+    success
+  }
+
+  def insertAndDeleteChannel(eventClient: LEvents) = {
+
+    val eventId = eventClient.insert(r2, appId, Some(channelId))
+
+    val resultBefore = eventClient.get(eventId, appId, Some(channelId))
+
+    val expectedBefore = r2.copy(eventId = Some(eventId))
+
+    val deleteStatus = eventClient.delete(eventId, appId, Some(channelId))
+
+    val resultAfter = eventClient.get(eventId, appId, Some(channelId))
+
+    (resultBefore must beEqualTo(Some(expectedBefore))) and
+    (deleteStatus must beEqualTo(true)) and
+    (resultAfter must beEqualTo(None))
+  }
+
+  def findChannel(eventClient: LEvents) = {
+
+    val results: List[Event] = eventClient.find(
+      appId = appId,
+      channelId = Some(channelId)
+    )
+    .toList
+    .map(e => e.copy(eventId = None)) // ignore eventId
+
+    // same events in insertChannel
+    val expected = List(r4, r5)
+
+    results must containTheSameElementsAs(expected)
+  }
+
+  def removeDefault(eventClient: LEvents) = {
+    eventClient.remove(appId)
+  }
+
+  def removeChannel(eventClient: LEvents) = {
+    eventClient.remove(appId, Some(channelId))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala
new file mode 100644
index 0000000..d675e55
--- /dev/null
+++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage._
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.specs2._
+import org.specs2.specification.Step
+
+class PEventsSpec extends Specification with TestEvents {
+
+  System.clearProperty("spark.driver.port")
+  System.clearProperty("spark.hostPort")
+  val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
+
+  val appId = 1
+  val channelId = 6
+  val dbName = "test_pio_storage_events_" + hashCode
+
+  def hbLocal = Storage.getDataObject[LEvents](
+    StorageTestUtils.hbaseSourceName,
+    dbName
+  )
+
+  def hbPar = Storage.getDataObject[PEvents](
+    StorageTestUtils.hbaseSourceName,
+    dbName
+  )
+
+  def stopSpark = {
+    sc.stop()
+  }
+
+  def is = s2"""
+
+  PredictionIO Storage PEvents Specification
+
+    PEvents can be implemented by:
+    - HBPEvents ${hbPEvents}
+    - (stop Spark) ${Step(sc.stop())}
+
+  """
+
+  def hbPEvents = sequential ^ s2"""
+
+    HBPEvents should
+    - behave like any PEvents implementation ${events(hbLocal, hbPar)}
+    - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
+
+  """
+
+  def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2"""
+
+    - (init test) ${initTest(localEventClient)}
+    - (insert test events) ${insertTestEvents(localEventClient)}
+    find in default ${find(parEventClient)}
+    find in channel ${findChannel(parEventClient)}
+    aggregate user properties in default ${aggregateUserProperties(parEventClient)}
+    aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)}
+    write to default ${write(parEventClient)}
+    write to channel ${writeChannel(parEventClient)}
+
+  """
+
+  /* setup */
+
+  // events from TestEvents trait
+  val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2)
+  val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4)
+
+  def initTest(localEventClient: LEvents) = {
+    localEventClient.init(appId)
+    localEventClient.init(appId, Some(channelId))
+  }
+
+  def insertTestEvents(localEventClient: LEvents) = {
+    listOfEvents.map( localEventClient.insert(_, appId) )
+    // insert to channel
+    listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) )
+    success
+  }
+
+  /* following are tests */
+
+  def find(parEventClient: PEvents) = {
+    val resultRDD: RDD[Event] = parEventClient.find(
+      appId = appId
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map {_.copy(eventId = None)} // ignore eventId
+
+    results must containTheSameElementsAs(listOfEvents)
+  }
+
+  def findChannel(parEventClient: PEvents) = {
+    val resultRDD: RDD[Event] = parEventClient.find(
+      appId = appId,
+      channelId = Some(channelId)
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map {_.copy(eventId = None)} // ignore eventId
+
+    results must containTheSameElementsAs(listOfEventsChannel)
+  }
+
+  def aggregateUserProperties(parEventClient: PEvents) = {
+    val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
+      appId = appId,
+      entityType = "user"
+    )(sc)
+    val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
+
+    val expected = Map(
+      "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+      "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+    )
+
+    result must beEqualTo(expected)
+  }
+
+  def aggregateUserPropertiesChannel(parEventClient: PEvents) = {
+    val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
+      appId = appId,
+      channelId = Some(channelId),
+      entityType = "user"
+    )(sc)
+    val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
+
+    val expected = Map(
+      "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime)
+    )
+
+    result must beEqualTo(expected)
+  }
+
+  def write(parEventClient: PEvents) = {
+    val written = List(r5, r6)
+    val writtenRDD = sc.parallelize(written)
+    parEventClient.write(writtenRDD, appId)(sc)
+
+    // read back
+    val resultRDD = parEventClient.find(
+      appId = appId
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map { _.copy(eventId = None)} // ignore eventId
+
+    val expected = listOfEvents ++ written
+
+    results must containTheSameElementsAs(expected)
+  }
+
+  def writeChannel(parEventClient: PEvents) = {
+    val written = List(r1, r5, r6)
+    val writtenRDD = sc.parallelize(written)
+    parEventClient.write(writtenRDD, appId, Some(channelId))(sc)
+
+    // read back
+    val resultRDD = parEventClient.find(
+      appId = appId,
+      channelId = Some(channelId)
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map { _.copy(eventId = None)} // ignore eventId
+
+    val expected = listOfEventsChannel ++ written
+
+    results must containTheSameElementsAs(expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala
new file mode 100644
index 0000000..161cf90
--- /dev/null
+++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage.{LEvents, Storage}
+
+object StorageTestUtils {
+  val hbaseSourceName = "HBASE"
+
+  def dropHBaseNamespace(namespace: String): Unit = {
+    val eventDb = Storage.getDataObject[LEvents](hbaseSourceName, namespace)
+      .asInstanceOf[HBLEvents]
+    val admin = eventDb.client.admin
+    val tableNames = admin.listTableNamesByNamespace(namespace)
+    tableNames.foreach { name =>
+      admin.disableTable(name)
+      admin.deleteTable(name)
+    }
+
+    // Only empty namespaces (no tables) can be removed.
+    admin.deleteNamespace(namespace)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala
new file mode 100644
index 0000000..2171864
--- /dev/null
+++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage.{DataMap, Event}
+import org.joda.time.{DateTime, DateTimeZone}
+
+trait TestEvents {
+
+  val u1BaseTime = new DateTime(654321)
+  val u2BaseTime = new DateTime(6543210)
+  val u3BaseTime = new DateTime(6543410)
+
+  // u1 events
+  val u1e1 = Event(
+    event = "$set",
+    entityType = "user",
+    entityId = "u1",
+    properties = DataMap(
+      """{
+        "a" : 1,
+        "b" : "value2",
+        "d" : [1, 2, 3],
+      }"""),
+    eventTime = u1BaseTime
+  )
+
+  val u1e2 = u1e1.copy(
+    event = "$set",
+    properties = DataMap("""{"a" : 2}"""),
+    eventTime = u1BaseTime.plusDays(1)
+  )
+
+  val u1e3 = u1e1.copy(
+    event = "$set",
+    properties = DataMap("""{"b" : "value4"}"""),
+    eventTime = u1BaseTime.plusDays(2)
+  )
+
+  val u1e4 = u1e1.copy(
+    event = "$unset",
+    properties = DataMap("""{"b" : null}"""),
+    eventTime = u1BaseTime.plusDays(3)
+  )
+
+  val u1e5 = u1e1.copy(
+    event = "$set",
+    properties = DataMap("""{"e" : "new"}"""),
+    eventTime = u1BaseTime.plusDays(4)
+  )
+
+  val u1LastTime = u1BaseTime.plusDays(4)
+  val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}"""
+
+  // delete event for u1
+  val u1ed = u1e1.copy(
+    event = "$delete",
+    properties = DataMap(),
+    eventTime = u1BaseTime.plusDays(5)
+  )
+
+  // u2 events
+  val u2e1 = Event(
+    event = "$set",
+    entityType = "user",
+    entityId = "u2",
+    properties = DataMap(
+      """{
+        "a" : 21,
+        "b" : "value12",
+        "d" : [7, 5, 6],
+      }"""),
+    eventTime = u2BaseTime
+  )
+
+  val u2e2 = u2e1.copy(
+    event = "$unset",
+    properties = DataMap("""{"a" : null}"""),
+    eventTime = u2BaseTime.plusDays(1)
+  )
+
+  val u2e3 = u2e1.copy(
+    event = "$set",
+    properties = DataMap("""{"b" : "value9", "g": "new11"}"""),
+    eventTime = u2BaseTime.plusDays(2)
+  )
+
+  val u2LastTime = u2BaseTime.plusDays(2)
+  val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}"""
+
+  // u3 events
+  val u3e1 = Event(
+    event = "$set",
+    entityType = "user",
+    entityId = "u3",
+    properties = DataMap(
+      """{
+        "a" : 22,
+        "b" : "value13",
+        "d" : [5, 6, 1],
+      }"""),
+    eventTime = u3BaseTime
+  )
+
+  val u3e2 = u3e1.copy(
+    event = "$unset",
+    properties = DataMap("""{"a" : null}"""),
+    eventTime = u3BaseTime.plusDays(1)
+  )
+
+  val u3e3 = u3e1.copy(
+    event = "$set",
+    properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""),
+    eventTime = u3BaseTime.plusDays(2)
+  )
+
+  val u3LastTime = u3BaseTime.plusDays(2)
+  val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}"""
+
+  // some random events
+  val r1 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id",
+    targetEntityType = Some("my_target_entity_type"),
+    targetEntityId = Some("my_target_entity_id"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = DateTime.now,
+    prId = Some("my_prid")
+  )
+  val r2 = Event(
+    event = "my_event2",
+    entityType = "my_entity_type2",
+    entityId = "my_entity_id2"
+  )
+  val r3 = Event(
+    event = "my_event3",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id",
+    targetEntityType = Some("my_target_entity_type"),
+    targetEntityId = Some("my_target_entity_id"),
+    properties = DataMap(
+      """{
+        "propA" : 1.2345,
+        "propB" : "valueB",
+      }"""
+    ),
+    prId = Some("my_prid")
+  )
+  val r4 = Event(
+    event = "my_event4",
+    entityType = "my_entity_type4",
+    entityId = "my_entity_id4",
+    targetEntityType = Some("my_target_entity_type4"),
+    targetEntityId = Some("my_target_entity_id4"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""),
+    eventTime = DateTime.now
+  )
+  val r5 = Event(
+    event = "my_event5",
+    entityType = "my_entity_type5",
+    entityId = "my_entity_id5",
+    targetEntityType = Some("my_target_entity_type5"),
+    targetEntityId = Some("my_target_entity_id5"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = DateTime.now
+  )
+  val r6 = Event(
+    event = "my_event6",
+    entityType = "my_entity_type6",
+    entityId = "my_entity_id6",
+    targetEntityType = Some("my_target_entity_type6"),
+    targetEntityId = Some("my_target_entity_id6"),
+    properties = DataMap(
+      """{
+        "prop1" : 6,
+        "prop2" : "value2",
+        "prop3" : [6, 7, 8],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = DateTime.now
+  )
+
+  // timezone
+  val tz1 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id0",
+    targetEntityType = Some("my_target_entity_type"),
+    targetEntityId = Some("my_target_entity_id"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")),
+    prId = Some("my_prid")
+  )
+
+  val tz2 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id1",
+    eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")),
+    prId = Some("my_prid")
+  )
+
+  val tz3 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id2",
+    eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")),
+    prId = Some("my_prid")
+  )
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala
new file mode 100644
index 0000000..d723d07
--- /dev/null
+++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import org.apache.predictionio.data.storage.{Event, LEvents, PropertyMap, Storage}
+import org.specs2._
+import org.specs2.specification.Step
+
+class LEventsSpec extends Specification with TestEvents {
+  def is = s2"""
+
+  PredictionIO Storage LEvents Specification
+
+    Events can be implemented by:
+    - JDBCLEvents ${jdbcLEvents}
+
+  """
+
+  def jdbcLEvents = sequential ^ s2"""
+
+    JDBCLEvents should
+    - behave like any LEvents implementation ${events(jdbcDO)}
+
+  """
+
+  val appId = 1
+
+  def events(eventClient: LEvents) = sequential ^ s2"""
+
+    init default ${initDefault(eventClient)}
+    insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)}
+    insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)}
+    insert and delete by ID ${insertAndDelete(eventClient)}
+    insert test user events ${insertTestUserEvents(eventClient)}
+    find user events ${findUserEvents(eventClient)}
+    aggregate user properties ${aggregateUserProperties(eventClient)}
+    aggregate one user properties ${aggregateOneUserProperties(eventClient)}
+    aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)}
+    init channel ${initChannel(eventClient)}
+    insert 2 events to channel ${insertChannel(eventClient)}
+    insert 1 event to channel and delete by ID  ${insertAndDeleteChannel(eventClient)}
+    find events from channel ${findChannel(eventClient)}
+    remove default ${removeDefault(eventClient)}
+    remove channel ${removeChannel(eventClient)}
+
+  """
+
+  val dbName = "test_pio_storage_events_" + hashCode
+
+  def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName)
+
+  def initDefault(eventClient: LEvents) = {
+    eventClient.init(appId)
+  }
+
+  def insertAndGetEvents(eventClient: LEvents) = {
+
+    // events from TestEvents trait
+    val listOfEvents = List(r1,r2,r3)
+
+    val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+    val insertedEventId: List[String] = insertResp
+
+    val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+      .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+    val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+    val getEvents = getResp
+
+    insertedEvent must containTheSameElementsAs(getEvents)
+  }
+
+  def insertAndGetTimezone(eventClient: LEvents) = {
+    val listOfEvents = List(tz1, tz2, tz3)
+
+    val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+    val insertedEventId: List[String] = insertResp
+
+    val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+      .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+    val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+    val getEvents = getResp
+
+    insertedEvent must containTheSameElementsAs(getEvents)
+  }
+
+  def insertAndDelete(eventClient: LEvents) = {
+    val eventId = eventClient.insert(r2, appId)
+
+    val resultBefore = eventClient.get(eventId, appId)
+
+    val expectedBefore = r2.copy(eventId = Some(eventId))
+
+    val deleteStatus = eventClient.delete(eventId, appId)
+
+    val resultAfter = eventClient.get(eventId, appId)
+
+    (resultBefore must beEqualTo(Some(expectedBefore))) and
+    (deleteStatus must beEqualTo(true)) and
+    (resultAfter must beEqualTo(None))
+  }
+
+  def insertTestUserEvents(eventClient: LEvents) = {
+    // events from TestEvents trait
+    val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+    listOfEvents.map{ eventClient.insert(_, appId) }
+
+    success
+  }
+
+  def findUserEvents(eventClient: LEvents) = {
+
+    val results: List[Event] = eventClient.find(
+      appId = appId,
+      entityType = Some("user"))
+      .toList
+      .map(e => e.copy(eventId = None)) // ignore eventID
+
+    // same events in insertTestUserEvents
+    val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+    results must containTheSameElementsAs(expected)
+  }
+
+  def aggregateUserProperties(eventClient: LEvents) = {
+
+    val result: Map[String, PropertyMap] = eventClient.aggregateProperties(
+      appId = appId,
+      entityType = "user")
+
+    val expected = Map(
+      "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+      "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+    )
+
+    result must beEqualTo(expected)
+  }
+
+  def aggregateOneUserProperties(eventClient: LEvents) = {
+    val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+      appId = appId,
+      entityType = "user",
+      entityId = "u1")
+
+    val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime))
+
+    result must beEqualTo(expected)
+  }
+
+  def aggregateNonExistentUserProperties(eventClient: LEvents) = {
+    val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+      appId = appId,
+      entityType = "user",
+      entityId = "u999999")
+
+    result must beEqualTo(None)
+  }
+
+  val channelId = 12
+
+  def initChannel(eventClient: LEvents) = {
+    eventClient.init(appId, Some(channelId))
+  }
+
+  def insertChannel(eventClient: LEvents) = {
+
+    // events from TestEvents trait
+    val listOfEvents = List(r4,r5)
+
+    listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) )
+
+    success
+  }
+
+  def insertAndDeleteChannel(eventClient: LEvents) = {
+
+    val eventId = eventClient.insert(r2, appId, Some(channelId))
+
+    val resultBefore = eventClient.get(eventId, appId, Some(channelId))
+
+    val expectedBefore = r2.copy(eventId = Some(eventId))
+
+    val deleteStatus = eventClient.delete(eventId, appId, Some(channelId))
+
+    val resultAfter = eventClient.get(eventId, appId, Some(channelId))
+
+    (resultBefore must beEqualTo(Some(expectedBefore))) and
+    (deleteStatus must beEqualTo(true)) and
+    (resultAfter must beEqualTo(None))
+  }
+
+  def findChannel(eventClient: LEvents) = {
+
+    val results: List[Event] = eventClient.find(
+      appId = appId,
+      channelId = Some(channelId)
+    )
+    .toList
+    .map(e => e.copy(eventId = None)) // ignore eventId
+
+    // same events in insertChannel
+    val expected = List(r4, r5)
+
+    results must containTheSameElementsAs(expected)
+  }
+
+  def removeDefault(eventClient: LEvents) = {
+    eventClient.remove(appId)
+  }
+
+  def removeChannel(eventClient: LEvents) = {
+    eventClient.remove(appId, Some(channelId))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala
new file mode 100644
index 0000000..71ebf5f
--- /dev/null
+++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import org.apache.predictionio.data.storage._
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.specs2._
+import org.specs2.specification.Step
+
+class PEventsSpec extends Specification with TestEvents {
+
+  System.clearProperty("spark.driver.port")
+  System.clearProperty("spark.hostPort")
+  val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
+
+  val appId = 1
+  val channelId = 6
+  val dbName = "test_pio_storage_events_" + hashCode
+
+  def jdbcLocal = Storage.getDataObject[LEvents](
+    StorageTestUtils.jdbcSourceName,
+    dbName
+  )
+
+  def jdbcPar = Storage.getDataObject[PEvents](
+    StorageTestUtils.jdbcSourceName,
+    dbName
+  )
+
+  def stopSpark = {
+    sc.stop()
+  }
+
+  def is = s2"""
+
+  PredictionIO Storage PEvents Specification
+
+    PEvents can be implemented by:
+    - JDBCPEvents ${jdbcPEvents}
+    - (stop Spark) ${Step(sc.stop())}
+
+  """
+
+  def jdbcPEvents = sequential ^ s2"""
+
+    JDBCPEvents should
+    - behave like any PEvents implementation ${events(jdbcLocal, jdbcPar)}
+    - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_$appId"))}
+    - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_${appId}_$channelId"))}
+
+  """
+
+  def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2"""
+
+    - (init test) ${initTest(localEventClient)}
+    - (insert test events) ${insertTestEvents(localEventClient)}
+    find in default ${find(parEventClient)}
+    find in channel ${findChannel(parEventClient)}
+    aggregate user properties in default ${aggregateUserProperties(parEventClient)}
+    aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)}
+    write to default ${write(parEventClient)}
+    write to channel ${writeChannel(parEventClient)}
+
+  """
+
+  /* setup */
+
+  // events from TestEvents trait
+  val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2)
+  val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4)
+
+  def initTest(localEventClient: LEvents) = {
+    localEventClient.init(appId)
+    localEventClient.init(appId, Some(channelId))
+  }
+
+  def insertTestEvents(localEventClient: LEvents) = {
+    listOfEvents.map( localEventClient.insert(_, appId) )
+    // insert to channel
+    listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) )
+    success
+  }
+
+  /* following are tests */
+
+  def find(parEventClient: PEvents) = {
+    val resultRDD: RDD[Event] = parEventClient.find(
+      appId = appId
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map {_.copy(eventId = None)} // ignore eventId
+
+    results must containTheSameElementsAs(listOfEvents)
+  }
+
+  def findChannel(parEventClient: PEvents) = {
+    val resultRDD: RDD[Event] = parEventClient.find(
+      appId = appId,
+      channelId = Some(channelId)
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map {_.copy(eventId = None)} // ignore eventId
+
+    results must containTheSameElementsAs(listOfEventsChannel)
+  }
+
+  def aggregateUserProperties(parEventClient: PEvents) = {
+    val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
+      appId = appId,
+      entityType = "user"
+    )(sc)
+    val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
+
+    val expected = Map(
+      "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+      "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+    )
+
+    result must beEqualTo(expected)
+  }
+
+  def aggregateUserPropertiesChannel(parEventClient: PEvents) = {
+    val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
+      appId = appId,
+      channelId = Some(channelId),
+      entityType = "user"
+    )(sc)
+    val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
+
+    val expected = Map(
+      "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime)
+    )
+
+    result must beEqualTo(expected)
+  }
+
+  def write(parEventClient: PEvents) = {
+    val written = List(r5, r6)
+    val writtenRDD = sc.parallelize(written)
+    parEventClient.write(writtenRDD, appId)(sc)
+
+    // read back
+    val resultRDD = parEventClient.find(
+      appId = appId
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map { _.copy(eventId = None)} // ignore eventId
+
+    val expected = listOfEvents ++ written
+
+    results must containTheSameElementsAs(expected)
+  }
+
+  def writeChannel(parEventClient: PEvents) = {
+    val written = List(r1, r5, r6)
+    val writtenRDD = sc.parallelize(written)
+    parEventClient.write(writtenRDD, appId, Some(channelId))(sc)
+
+    // read back
+    val resultRDD = parEventClient.find(
+      appId = appId,
+      channelId = Some(channelId)
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map { _.copy(eventId = None)} // ignore eventId
+
+    val expected = listOfEventsChannel ++ written
+
+    results must containTheSameElementsAs(expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala
new file mode 100644
index 0000000..4bf90cf
--- /dev/null
+++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import scalikejdbc._
+
+object StorageTestUtils {
+  val jdbcSourceName = "PGSQL"
+
+  def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s =>
+    SQL(s"drop table $table").execute().apply()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala
new file mode 100644
index 0000000..2cb08e5
--- /dev/null
+++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import org.apache.predictionio.data.storage.{DataMap, Event}
+import org.joda.time.{DateTime, DateTimeZone}
+
+trait TestEvents {
+
+  val u1BaseTime = new DateTime(654321)
+  val u2BaseTime = new DateTime(6543210)
+  val u3BaseTime = new DateTime(6543410)
+
+  // u1 events
+  val u1e1 = Event(
+    event = "$set",
+    entityType = "user",
+    entityId = "u1",
+    properties = DataMap(
+      """{
+        "a" : 1,
+        "b" : "value2",
+        "d" : [1, 2, 3],
+      }"""),
+    eventTime = u1BaseTime
+  )
+
+  val u1e2 = u1e1.copy(
+    event = "$set",
+    properties = DataMap("""{"a" : 2}"""),
+    eventTime = u1BaseTime.plusDays(1)
+  )
+
+  val u1e3 = u1e1.copy(
+    event = "$set",
+    properties = DataMap("""{"b" : "value4"}"""),
+    eventTime = u1BaseTime.plusDays(2)
+  )
+
+  val u1e4 = u1e1.copy(
+    event = "$unset",
+    properties = DataMap("""{"b" : null}"""),
+    eventTime = u1BaseTime.plusDays(3)
+  )
+
+  val u1e5 = u1e1.copy(
+    event = "$set",
+    properties = DataMap("""{"e" : "new"}"""),
+    eventTime = u1BaseTime.plusDays(4)
+  )
+
+  val u1LastTime = u1BaseTime.plusDays(4)
+  val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}"""
+
+  // delete event for u1
+  val u1ed = u1e1.copy(
+    event = "$delete",
+    properties = DataMap(),
+    eventTime = u1BaseTime.plusDays(5)
+  )
+
+  // u2 events
+  val u2e1 = Event(
+    event = "$set",
+    entityType = "user",
+    entityId = "u2",
+    properties = DataMap(
+      """{
+        "a" : 21,
+        "b" : "value12",
+        "d" : [7, 5, 6],
+      }"""),
+    eventTime = u2BaseTime
+  )
+
+  val u2e2 = u2e1.copy(
+    event = "$unset",
+    properties = DataMap("""{"a" : null}"""),
+    eventTime = u2BaseTime.plusDays(1)
+  )
+
+  val u2e3 = u2e1.copy(
+    event = "$set",
+    properties = DataMap("""{"b" : "value9", "g": "new11"}"""),
+    eventTime = u2BaseTime.plusDays(2)
+  )
+
+  val u2LastTime = u2BaseTime.plusDays(2)
+  val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}"""
+
+  // u3 events
+  val u3e1 = Event(
+    event = "$set",
+    entityType = "user",
+    entityId = "u3",
+    properties = DataMap(
+      """{
+        "a" : 22,
+        "b" : "value13",
+        "d" : [5, 6, 1],
+      }"""),
+    eventTime = u3BaseTime
+  )
+
+  val u3e2 = u3e1.copy(
+    event = "$unset",
+    properties = DataMap("""{"a" : null}"""),
+    eventTime = u3BaseTime.plusDays(1)
+  )
+
+  val u3e3 = u3e1.copy(
+    event = "$set",
+    properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""),
+    eventTime = u3BaseTime.plusDays(2)
+  )
+
+  val u3LastTime = u3BaseTime.plusDays(2)
+  val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}"""
+
+  // some random events
+  val r1 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id",
+    targetEntityType = Some("my_target_entity_type"),
+    targetEntityId = Some("my_target_entity_id"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = DateTime.now,
+    prId = Some("my_prid")
+  )
+  val r2 = Event(
+    event = "my_event2",
+    entityType = "my_entity_type2",
+    entityId = "my_entity_id2"
+  )
+  val r3 = Event(
+    event = "my_event3",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id",
+    targetEntityType = Some("my_target_entity_type"),
+    targetEntityId = Some("my_target_entity_id"),
+    properties = DataMap(
+      """{
+        "propA" : 1.2345,
+        "propB" : "valueB",
+      }"""
+    ),
+    prId = Some("my_prid")
+  )
+  val r4 = Event(
+    event = "my_event4",
+    entityType = "my_entity_type4",
+    entityId = "my_entity_id4",
+    targetEntityType = Some("my_target_entity_type4"),
+    targetEntityId = Some("my_target_entity_id4"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""),
+    eventTime = DateTime.now
+  )
+  val r5 = Event(
+    event = "my_event5",
+    entityType = "my_entity_type5",
+    entityId = "my_entity_id5",
+    targetEntityType = Some("my_target_entity_type5"),
+    targetEntityId = Some("my_target_entity_id5"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = DateTime.now
+  )
+  val r6 = Event(
+    event = "my_event6",
+    entityType = "my_entity_type6",
+    entityId = "my_entity_id6",
+    targetEntityType = Some("my_target_entity_type6"),
+    targetEntityId = Some("my_target_entity_id6"),
+    properties = DataMap(
+      """{
+        "prop1" : 6,
+        "prop2" : "value2",
+        "prop3" : [6, 7, 8],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = DateTime.now
+  )
+
+  // timezone
+  val tz1 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id0",
+    targetEntityType = Some("my_target_entity_type"),
+    targetEntityId = Some("my_target_entity_id"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")),
+    prId = Some("my_prid")
+  )
+
+  val tz2 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id1",
+    eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")),
+    prId = Some("my_prid")
+  )
+
+  val tz3 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id2",
+    eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")),
+    prId = Some("my_prid")
+  )
+
+}