You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/03/16 14:52:15 UTC
[2/7] incubator-predictionio git commit: Move storage depended tests
to each storage project
Move storage depended tests to each storage project
Closes #362
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/647b480b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/647b480b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/647b480b
Branch: refs/heads/feature/xbuild
Commit: 647b480b0f0f693ec1b4d04dc9a2cf1492f0a0fe
Parents: 606be41
Author: Naoki Takezoe <ta...@gmail.com>
Authored: Wed Mar 15 09:42:30 2017 -0700
Committer: Donald Szeto <do...@apache.org>
Committed: Wed Mar 15 09:42:30 2017 -0700
----------------------------------------------------------------------
.../predictionio/data/storage/LEventsSpec.scala | 248 -----------------
.../predictionio/data/storage/PEventsSpec.scala | 213 ---------------
.../data/storage/StorageTestUtils.scala | 45 ----
.../data/storage/hbase/LEventsSpec.scala | 239 +++++++++++++++++
.../data/storage/hbase/PEventsSpec.scala | 192 +++++++++++++
.../data/storage/hbase/StorageTestUtils.scala | 40 +++
.../data/storage/hbase/TestEvents.scala | 266 +++++++++++++++++++
.../data/storage/jdbc/LEventsSpec.scala | 236 ++++++++++++++++
.../data/storage/jdbc/PEventsSpec.scala | 193 ++++++++++++++
.../data/storage/jdbc/StorageTestUtils.scala | 29 ++
.../data/storage/jdbc/TestEvents.scala | 266 +++++++++++++++++++
11 files changed, 1461 insertions(+), 506 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
deleted file mode 100644
index 3938072..0000000
--- a/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage
-
-import org.specs2._
-import org.specs2.specification.Step
-
-class LEventsSpec extends Specification with TestEvents {
- def is = s2"""
-
- PredictionIO Storage LEvents Specification
-
- Events can be implemented by:
- - HBLEvents ${hbEvents}
- - JDBCLEvents ${jdbcLEvents}
-
- """
-
- def hbEvents = sequential ^ s2"""
-
- HBLEvents should
- - behave like any LEvents implementation ${events(hbDO)}
- - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
-
- """
-
- def jdbcLEvents = sequential ^ s2"""
-
- JDBCLEvents should
- - behave like any LEvents implementation ${events(jdbcDO)}
-
- """
-
- val appId = 1
-
- def events(eventClient: LEvents) = sequential ^ s2"""
-
- init default ${initDefault(eventClient)}
- insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)}
- insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)}
- insert and delete by ID ${insertAndDelete(eventClient)}
- insert test user events ${insertTestUserEvents(eventClient)}
- find user events ${findUserEvents(eventClient)}
- aggregate user properties ${aggregateUserProperties(eventClient)}
- aggregate one user properties ${aggregateOneUserProperties(eventClient)}
- aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)}
- init channel ${initChannel(eventClient)}
- insert 2 events to channel ${insertChannel(eventClient)}
- insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)}
- find events from channel ${findChannel(eventClient)}
- remove default ${removeDefault(eventClient)}
- remove channel ${removeChannel(eventClient)}
-
- """
-
- val dbName = "test_pio_storage_events_" + hashCode
- def hbDO = Storage.getDataObject[LEvents](
- StorageTestUtils.hbaseSourceName,
- dbName
- )
-
- def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName)
-
- def initDefault(eventClient: LEvents) = {
- eventClient.init(appId)
- }
-
- def insertAndGetEvents(eventClient: LEvents) = {
-
- // events from TestEvents trait
- val listOfEvents = List(r1,r2,r3)
-
- val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
-
- val insertedEventId: List[String] = insertResp
-
- val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
- .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
-
- val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
-
- val getEvents = getResp
-
- insertedEvent must containTheSameElementsAs(getEvents)
- }
-
- def insertAndGetTimezone(eventClient: LEvents) = {
- val listOfEvents = List(tz1, tz2, tz3)
-
- val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
-
- val insertedEventId: List[String] = insertResp
-
- val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
- .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
-
- val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
-
- val getEvents = getResp
-
- insertedEvent must containTheSameElementsAs(getEvents)
- }
-
- def insertAndDelete(eventClient: LEvents) = {
- val eventId = eventClient.insert(r2, appId)
-
- val resultBefore = eventClient.get(eventId, appId)
-
- val expectedBefore = r2.copy(eventId = Some(eventId))
-
- val deleteStatus = eventClient.delete(eventId, appId)
-
- val resultAfter = eventClient.get(eventId, appId)
-
- (resultBefore must beEqualTo(Some(expectedBefore))) and
- (deleteStatus must beEqualTo(true)) and
- (resultAfter must beEqualTo(None))
- }
-
- def insertTestUserEvents(eventClient: LEvents) = {
- // events from TestEvents trait
- val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
-
- listOfEvents.map{ eventClient.insert(_, appId) }
-
- success
- }
-
- def findUserEvents(eventClient: LEvents) = {
-
- val results: List[Event] = eventClient.find(
- appId = appId,
- entityType = Some("user"))
- .toList
- .map(e => e.copy(eventId = None)) // ignore eventID
-
- // same events in insertTestUserEvents
- val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
-
- results must containTheSameElementsAs(expected)
- }
-
- def aggregateUserProperties(eventClient: LEvents) = {
-
- val result: Map[String, PropertyMap] = eventClient.aggregateProperties(
- appId = appId,
- entityType = "user")
-
- val expected = Map(
- "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
- "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
- )
-
- result must beEqualTo(expected)
- }
-
- def aggregateOneUserProperties(eventClient: LEvents) = {
- val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
- appId = appId,
- entityType = "user",
- entityId = "u1")
-
- val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime))
-
- result must beEqualTo(expected)
- }
-
- def aggregateNonExistentUserProperties(eventClient: LEvents) = {
- val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
- appId = appId,
- entityType = "user",
- entityId = "u999999")
-
- result must beEqualTo(None)
- }
-
- val channelId = 12
-
- def initChannel(eventClient: LEvents) = {
- eventClient.init(appId, Some(channelId))
- }
-
- def insertChannel(eventClient: LEvents) = {
-
- // events from TestEvents trait
- val listOfEvents = List(r4,r5)
-
- listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) )
-
- success
- }
-
- def insertAndDeleteChannel(eventClient: LEvents) = {
-
- val eventId = eventClient.insert(r2, appId, Some(channelId))
-
- val resultBefore = eventClient.get(eventId, appId, Some(channelId))
-
- val expectedBefore = r2.copy(eventId = Some(eventId))
-
- val deleteStatus = eventClient.delete(eventId, appId, Some(channelId))
-
- val resultAfter = eventClient.get(eventId, appId, Some(channelId))
-
- (resultBefore must beEqualTo(Some(expectedBefore))) and
- (deleteStatus must beEqualTo(true)) and
- (resultAfter must beEqualTo(None))
- }
-
- def findChannel(eventClient: LEvents) = {
-
- val results: List[Event] = eventClient.find(
- appId = appId,
- channelId = Some(channelId)
- )
- .toList
- .map(e => e.copy(eventId = None)) // ignore eventId
-
- // same events in insertChannel
- val expected = List(r4, r5)
-
- results must containTheSameElementsAs(expected)
- }
-
- def removeDefault(eventClient: LEvents) = {
- eventClient.remove(appId)
- }
-
- def removeChannel(eventClient: LEvents) = {
- eventClient.remove(appId, Some(channelId))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala
deleted file mode 100644
index ccd71a4..0000000
--- a/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage
-
-import org.specs2._
-import org.specs2.specification.Step
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class PEventsSpec extends Specification with TestEvents {
-
- System.clearProperty("spark.driver.port")
- System.clearProperty("spark.hostPort")
- val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
-
- val appId = 1
- val channelId = 6
- val dbName = "test_pio_storage_events_" + hashCode
-
- def hbLocal = Storage.getDataObject[LEvents](
- StorageTestUtils.hbaseSourceName,
- dbName
- )
-
- def hbPar = Storage.getDataObject[PEvents](
- StorageTestUtils.hbaseSourceName,
- dbName
- )
-
- def jdbcLocal = Storage.getDataObject[LEvents](
- StorageTestUtils.jdbcSourceName,
- dbName
- )
-
- def jdbcPar = Storage.getDataObject[PEvents](
- StorageTestUtils.jdbcSourceName,
- dbName
- )
-
- def stopSpark = {
- sc.stop()
- }
-
- def is = s2"""
-
- PredictionIO Storage PEvents Specification
-
- PEvents can be implemented by:
- - HBPEvents ${hbPEvents}
- - JDBCPEvents ${jdbcPEvents}
- - (stop Spark) ${Step(sc.stop())}
-
- """
-
- def hbPEvents = sequential ^ s2"""
-
- HBPEvents should
- - behave like any PEvents implementation ${events(hbLocal, hbPar)}
- - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
-
- """
-
- def jdbcPEvents = sequential ^ s2"""
-
- JDBCPEvents should
- - behave like any PEvents implementation ${events(jdbcLocal, jdbcPar)}
- - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_$appId"))}
- - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_${appId}_$channelId"))}
-
- """
-
- def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2"""
-
- - (init test) ${initTest(localEventClient)}
- - (insert test events) ${insertTestEvents(localEventClient)}
- find in default ${find(parEventClient)}
- find in channel ${findChannel(parEventClient)}
- aggregate user properties in default ${aggregateUserProperties(parEventClient)}
- aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)}
- write to default ${write(parEventClient)}
- write to channel ${writeChannel(parEventClient)}
-
- """
-
- /* setup */
-
- // events from TestEvents trait
- val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2)
- val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4)
-
- def initTest(localEventClient: LEvents) = {
- localEventClient.init(appId)
- localEventClient.init(appId, Some(channelId))
- }
-
- def insertTestEvents(localEventClient: LEvents) = {
- listOfEvents.map( localEventClient.insert(_, appId) )
- // insert to channel
- listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) )
- success
- }
-
- /* following are tests */
-
- def find(parEventClient: PEvents) = {
- val resultRDD: RDD[Event] = parEventClient.find(
- appId = appId
- )(sc)
-
- val results = resultRDD.collect.toList
- .map {_.copy(eventId = None)} // ignore eventId
-
- results must containTheSameElementsAs(listOfEvents)
- }
-
- def findChannel(parEventClient: PEvents) = {
- val resultRDD: RDD[Event] = parEventClient.find(
- appId = appId,
- channelId = Some(channelId)
- )(sc)
-
- val results = resultRDD.collect.toList
- .map {_.copy(eventId = None)} // ignore eventId
-
- results must containTheSameElementsAs(listOfEventsChannel)
- }
-
- def aggregateUserProperties(parEventClient: PEvents) = {
- val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
- appId = appId,
- entityType = "user"
- )(sc)
- val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
-
- val expected = Map(
- "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
- "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
- )
-
- result must beEqualTo(expected)
- }
-
- def aggregateUserPropertiesChannel(parEventClient: PEvents) = {
- val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
- appId = appId,
- channelId = Some(channelId),
- entityType = "user"
- )(sc)
- val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
-
- val expected = Map(
- "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime)
- )
-
- result must beEqualTo(expected)
- }
-
- def write(parEventClient: PEvents) = {
- val written = List(r5, r6)
- val writtenRDD = sc.parallelize(written)
- parEventClient.write(writtenRDD, appId)(sc)
-
- // read back
- val resultRDD = parEventClient.find(
- appId = appId
- )(sc)
-
- val results = resultRDD.collect.toList
- .map { _.copy(eventId = None)} // ignore eventId
-
- val expected = listOfEvents ++ written
-
- results must containTheSameElementsAs(expected)
- }
-
- def writeChannel(parEventClient: PEvents) = {
- val written = List(r1, r5, r6)
- val writtenRDD = sc.parallelize(written)
- parEventClient.write(writtenRDD, appId, Some(channelId))(sc)
-
- // read back
- val resultRDD = parEventClient.find(
- appId = appId,
- channelId = Some(channelId)
- )(sc)
-
- val results = resultRDD.collect.toList
- .map { _.copy(eventId = None)} // ignore eventId
-
- val expected = listOfEventsChannel ++ written
-
- results must containTheSameElementsAs(expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala b/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala
deleted file mode 100644
index 747076a..0000000
--- a/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage
-
-import org.apache.predictionio.data.storage.hbase.HBLEvents
-import scalikejdbc._
-
-object StorageTestUtils {
- val hbaseSourceName = "HBASE"
- val jdbcSourceName = "PGSQL"
-
- def dropHBaseNamespace(namespace: String): Unit = {
- val eventDb = Storage.getDataObject[LEvents](hbaseSourceName, namespace)
- .asInstanceOf[HBLEvents]
- val admin = eventDb.client.admin
- val tableNames = admin.listTableNamesByNamespace(namespace)
- tableNames.foreach { name =>
- admin.disableTable(name)
- admin.deleteTable(name)
- }
-
- //Only empty namespaces (no tables) can be removed.
- admin.deleteNamespace(namespace)
- }
-
- def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s =>
- SQL(s"drop table $table").execute().apply()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala
new file mode 100644
index 0000000..c813ced
--- /dev/null
+++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/LEventsSpec.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage.{Event, LEvents, PropertyMap, Storage}
+import org.specs2._
+import org.specs2.specification.Step
+
+class LEventsSpec extends Specification with TestEvents {
+ def is = s2"""
+
+ PredictionIO Storage LEvents Specification
+
+ Events can be implemented by:
+ - HBLEvents ${hbEvents}
+
+ """
+
+ def hbEvents = sequential ^ s2"""
+
+ HBLEvents should
+ - behave like any LEvents implementation ${events(hbDO)}
+ - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
+
+ """
+
+ val appId = 1
+
+ def events(eventClient: LEvents) = sequential ^ s2"""
+
+ init default ${initDefault(eventClient)}
+ insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)}
+ insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)}
+ insert and delete by ID ${insertAndDelete(eventClient)}
+ insert test user events ${insertTestUserEvents(eventClient)}
+ find user events ${findUserEvents(eventClient)}
+ aggregate user properties ${aggregateUserProperties(eventClient)}
+ aggregate one user properties ${aggregateOneUserProperties(eventClient)}
+ aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)}
+ init channel ${initChannel(eventClient)}
+ insert 2 events to channel ${insertChannel(eventClient)}
+ insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)}
+ find events from channel ${findChannel(eventClient)}
+ remove default ${removeDefault(eventClient)}
+ remove channel ${removeChannel(eventClient)}
+
+ """
+
+ val dbName = "test_pio_storage_events_" + hashCode
+ def hbDO = Storage.getDataObject[LEvents](
+ StorageTestUtils.hbaseSourceName,
+ dbName
+ )
+
+ def initDefault(eventClient: LEvents) = {
+ eventClient.init(appId)
+ }
+
+ def insertAndGetEvents(eventClient: LEvents) = {
+
+ // events from TestEvents trait
+ val listOfEvents = List(r1,r2,r3)
+
+ val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+ val insertedEventId: List[String] = insertResp
+
+ val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+ .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+ val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+ val getEvents = getResp
+
+ insertedEvent must containTheSameElementsAs(getEvents)
+ }
+
+ def insertAndGetTimezone(eventClient: LEvents) = {
+ val listOfEvents = List(tz1, tz2, tz3)
+
+ val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+ val insertedEventId: List[String] = insertResp
+
+ val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+ .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+ val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+ val getEvents = getResp
+
+ insertedEvent must containTheSameElementsAs(getEvents)
+ }
+
+ def insertAndDelete(eventClient: LEvents) = {
+ val eventId = eventClient.insert(r2, appId)
+
+ val resultBefore = eventClient.get(eventId, appId)
+
+ val expectedBefore = r2.copy(eventId = Some(eventId))
+
+ val deleteStatus = eventClient.delete(eventId, appId)
+
+ val resultAfter = eventClient.get(eventId, appId)
+
+ (resultBefore must beEqualTo(Some(expectedBefore))) and
+ (deleteStatus must beEqualTo(true)) and
+ (resultAfter must beEqualTo(None))
+ }
+
+ def insertTestUserEvents(eventClient: LEvents) = {
+ // events from TestEvents trait
+ val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+ listOfEvents.map{ eventClient.insert(_, appId) }
+
+ success
+ }
+
+ def findUserEvents(eventClient: LEvents) = {
+
+ val results: List[Event] = eventClient.find(
+ appId = appId,
+ entityType = Some("user"))
+ .toList
+ .map(e => e.copy(eventId = None)) // ignore eventID
+
+ // same events in insertTestUserEvents
+ val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+ results must containTheSameElementsAs(expected)
+ }
+
+ def aggregateUserProperties(eventClient: LEvents) = {
+
+ val result: Map[String, PropertyMap] = eventClient.aggregateProperties(
+ appId = appId,
+ entityType = "user")
+
+ val expected = Map(
+ "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+ "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+ )
+
+ result must beEqualTo(expected)
+ }
+
+ def aggregateOneUserProperties(eventClient: LEvents) = {
+ val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+ appId = appId,
+ entityType = "user",
+ entityId = "u1")
+
+ val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime))
+
+ result must beEqualTo(expected)
+ }
+
+ def aggregateNonExistentUserProperties(eventClient: LEvents) = {
+ val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+ appId = appId,
+ entityType = "user",
+ entityId = "u999999")
+
+ result must beEqualTo(None)
+ }
+
+ val channelId = 12
+
+ def initChannel(eventClient: LEvents) = {
+ eventClient.init(appId, Some(channelId))
+ }
+
+ def insertChannel(eventClient: LEvents) = {
+
+ // events from TestEvents trait
+ val listOfEvents = List(r4,r5)
+
+ listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) )
+
+ success
+ }
+
+ def insertAndDeleteChannel(eventClient: LEvents) = {
+
+ val eventId = eventClient.insert(r2, appId, Some(channelId))
+
+ val resultBefore = eventClient.get(eventId, appId, Some(channelId))
+
+ val expectedBefore = r2.copy(eventId = Some(eventId))
+
+ val deleteStatus = eventClient.delete(eventId, appId, Some(channelId))
+
+ val resultAfter = eventClient.get(eventId, appId, Some(channelId))
+
+ (resultBefore must beEqualTo(Some(expectedBefore))) and
+ (deleteStatus must beEqualTo(true)) and
+ (resultAfter must beEqualTo(None))
+ }
+
+ def findChannel(eventClient: LEvents) = {
+
+ val results: List[Event] = eventClient.find(
+ appId = appId,
+ channelId = Some(channelId)
+ )
+ .toList
+ .map(e => e.copy(eventId = None)) // ignore eventId
+
+ // same events in insertChannel
+ val expected = List(r4, r5)
+
+ results must containTheSameElementsAs(expected)
+ }
+
+ def removeDefault(eventClient: LEvents) = {
+ eventClient.remove(appId)
+ }
+
+ def removeChannel(eventClient: LEvents) = {
+ eventClient.remove(appId, Some(channelId))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala
new file mode 100644
index 0000000..d675e55
--- /dev/null
+++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/PEventsSpec.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage._
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.specs2._
+import org.specs2.specification.Step
+
+class PEventsSpec extends Specification with TestEvents {
+
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+ val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
+
+ val appId = 1
+ val channelId = 6
+ val dbName = "test_pio_storage_events_" + hashCode
+
+ def hbLocal = Storage.getDataObject[LEvents](
+ StorageTestUtils.hbaseSourceName,
+ dbName
+ )
+
+ def hbPar = Storage.getDataObject[PEvents](
+ StorageTestUtils.hbaseSourceName,
+ dbName
+ )
+
+ def stopSpark = {
+ sc.stop()
+ }
+
+ def is = s2"""
+
+ PredictionIO Storage PEvents Specification
+
+ PEvents can be implemented by:
+ - HBPEvents ${hbPEvents}
+ - (stop Spark) ${Step(sc.stop())}
+
+ """
+
+ def hbPEvents = sequential ^ s2"""
+
+ HBPEvents should
+ - behave like any PEvents implementation ${events(hbLocal, hbPar)}
+ - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
+
+ """
+
+ def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2"""
+
+ - (init test) ${initTest(localEventClient)}
+ - (insert test events) ${insertTestEvents(localEventClient)}
+ find in default ${find(parEventClient)}
+ find in channel ${findChannel(parEventClient)}
+ aggregate user properties in default ${aggregateUserProperties(parEventClient)}
+ aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)}
+ write to default ${write(parEventClient)}
+ write to channel ${writeChannel(parEventClient)}
+
+ """
+
+ /* setup */
+
+ // events from TestEvents trait
+ val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2)
+ val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4)
+
+ def initTest(localEventClient: LEvents) = {
+ localEventClient.init(appId)
+ localEventClient.init(appId, Some(channelId))
+ }
+
+ def insertTestEvents(localEventClient: LEvents) = {
+ listOfEvents.map( localEventClient.insert(_, appId) )
+ // insert to channel
+ listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) )
+ success
+ }
+
+ /* following are tests */
+
+ def find(parEventClient: PEvents) = {
+ val resultRDD: RDD[Event] = parEventClient.find(
+ appId = appId
+ )(sc)
+
+ val results = resultRDD.collect.toList
+ .map {_.copy(eventId = None)} // ignore eventId
+
+ results must containTheSameElementsAs(listOfEvents)
+ }
+
+ def findChannel(parEventClient: PEvents) = {
+ val resultRDD: RDD[Event] = parEventClient.find(
+ appId = appId,
+ channelId = Some(channelId)
+ )(sc)
+
+ val results = resultRDD.collect.toList
+ .map {_.copy(eventId = None)} // ignore eventId
+
+ results must containTheSameElementsAs(listOfEventsChannel)
+ }
+
+ def aggregateUserProperties(parEventClient: PEvents) = {
+ val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
+ appId = appId,
+ entityType = "user"
+ )(sc)
+ val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
+
+ val expected = Map(
+ "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+ "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+ )
+
+ result must beEqualTo(expected)
+ }
+
+ def aggregateUserPropertiesChannel(parEventClient: PEvents) = {
+ val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
+ appId = appId,
+ channelId = Some(channelId),
+ entityType = "user"
+ )(sc)
+ val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
+
+ val expected = Map(
+ "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime)
+ )
+
+ result must beEqualTo(expected)
+ }
+
+ def write(parEventClient: PEvents) = {
+ val written = List(r5, r6)
+ val writtenRDD = sc.parallelize(written)
+ parEventClient.write(writtenRDD, appId)(sc)
+
+ // read back
+ val resultRDD = parEventClient.find(
+ appId = appId
+ )(sc)
+
+ val results = resultRDD.collect.toList
+ .map { _.copy(eventId = None)} // ignore eventId
+
+ val expected = listOfEvents ++ written
+
+ results must containTheSameElementsAs(expected)
+ }
+
+ def writeChannel(parEventClient: PEvents) = {
+ val written = List(r1, r5, r6)
+ val writtenRDD = sc.parallelize(written)
+ parEventClient.write(writtenRDD, appId, Some(channelId))(sc)
+
+ // read back
+ val resultRDD = parEventClient.find(
+ appId = appId,
+ channelId = Some(channelId)
+ )(sc)
+
+ val results = resultRDD.collect.toList
+ .map { _.copy(eventId = None)} // ignore eventId
+
+ val expected = listOfEventsChannel ++ written
+
+ results must containTheSameElementsAs(expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala
new file mode 100644
index 0000000..161cf90
--- /dev/null
+++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/StorageTestUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage.{LEvents, Storage}
+
+object StorageTestUtils {
+ val hbaseSourceName = "HBASE"
+
+ def dropHBaseNamespace(namespace: String): Unit = {
+ val eventDb = Storage.getDataObject[LEvents](hbaseSourceName, namespace)
+ .asInstanceOf[HBLEvents]
+ val admin = eventDb.client.admin
+ val tableNames = admin.listTableNamesByNamespace(namespace)
+ tableNames.foreach { name =>
+ admin.disableTable(name)
+ admin.deleteTable(name)
+ }
+
+ // Only empty namespaces (no tables) can be removed.
+ admin.deleteNamespace(namespace)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala
----------------------------------------------------------------------
diff --git a/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala
new file mode 100644
index 0000000..2171864
--- /dev/null
+++ b/storage/hbase/src/test/scala/org/apache/predictionio/data/storage/hbase/TestEvents.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.hbase
+
+import org.apache.predictionio.data.storage.{DataMap, Event}
+import org.joda.time.{DateTime, DateTimeZone}
+
+trait TestEvents {
+
+ val u1BaseTime = new DateTime(654321)
+ val u2BaseTime = new DateTime(6543210)
+ val u3BaseTime = new DateTime(6543410)
+
+ // u1 events
+ val u1e1 = Event(
+ event = "$set",
+ entityType = "user",
+ entityId = "u1",
+ properties = DataMap(
+ """{
+ "a" : 1,
+ "b" : "value2",
+ "d" : [1, 2, 3],
+ }"""),
+ eventTime = u1BaseTime
+ )
+
+ val u1e2 = u1e1.copy(
+ event = "$set",
+ properties = DataMap("""{"a" : 2}"""),
+ eventTime = u1BaseTime.plusDays(1)
+ )
+
+ val u1e3 = u1e1.copy(
+ event = "$set",
+ properties = DataMap("""{"b" : "value4"}"""),
+ eventTime = u1BaseTime.plusDays(2)
+ )
+
+ val u1e4 = u1e1.copy(
+ event = "$unset",
+ properties = DataMap("""{"b" : null}"""),
+ eventTime = u1BaseTime.plusDays(3)
+ )
+
+ val u1e5 = u1e1.copy(
+ event = "$set",
+ properties = DataMap("""{"e" : "new"}"""),
+ eventTime = u1BaseTime.plusDays(4)
+ )
+
+ val u1LastTime = u1BaseTime.plusDays(4)
+ val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}"""
+
+ // delete event for u1
+ val u1ed = u1e1.copy(
+ event = "$delete",
+ properties = DataMap(),
+ eventTime = u1BaseTime.plusDays(5)
+ )
+
+ // u2 events
+ val u2e1 = Event(
+ event = "$set",
+ entityType = "user",
+ entityId = "u2",
+ properties = DataMap(
+ """{
+ "a" : 21,
+ "b" : "value12",
+ "d" : [7, 5, 6],
+ }"""),
+ eventTime = u2BaseTime
+ )
+
+ val u2e2 = u2e1.copy(
+ event = "$unset",
+ properties = DataMap("""{"a" : null}"""),
+ eventTime = u2BaseTime.plusDays(1)
+ )
+
+ val u2e3 = u2e1.copy(
+ event = "$set",
+ properties = DataMap("""{"b" : "value9", "g": "new11"}"""),
+ eventTime = u2BaseTime.plusDays(2)
+ )
+
+ val u2LastTime = u2BaseTime.plusDays(2)
+ val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}"""
+
+ // u3 events
+ val u3e1 = Event(
+ event = "$set",
+ entityType = "user",
+ entityId = "u3",
+ properties = DataMap(
+ """{
+ "a" : 22,
+ "b" : "value13",
+ "d" : [5, 6, 1],
+ }"""),
+ eventTime = u3BaseTime
+ )
+
+ val u3e2 = u3e1.copy(
+ event = "$unset",
+ properties = DataMap("""{"a" : null}"""),
+ eventTime = u3BaseTime.plusDays(1)
+ )
+
+ val u3e3 = u3e1.copy(
+ event = "$set",
+ properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""),
+ eventTime = u3BaseTime.plusDays(2)
+ )
+
+ val u3LastTime = u3BaseTime.plusDays(2)
+ val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}"""
+
+ // some random events
+ val r1 = Event(
+ event = "my_event",
+ entityType = "my_entity_type",
+ entityId = "my_entity_id",
+ targetEntityType = Some("my_target_entity_type"),
+ targetEntityId = Some("my_target_entity_id"),
+ properties = DataMap(
+ """{
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56
+ }"""
+ ),
+ eventTime = DateTime.now,
+ prId = Some("my_prid")
+ )
+ val r2 = Event(
+ event = "my_event2",
+ entityType = "my_entity_type2",
+ entityId = "my_entity_id2"
+ )
+ val r3 = Event(
+ event = "my_event3",
+ entityType = "my_entity_type",
+ entityId = "my_entity_id",
+ targetEntityType = Some("my_target_entity_type"),
+ targetEntityId = Some("my_target_entity_id"),
+ properties = DataMap(
+ """{
+ "propA" : 1.2345,
+ "propB" : "valueB",
+ }"""
+ ),
+ prId = Some("my_prid")
+ )
+ val r4 = Event(
+ event = "my_event4",
+ entityType = "my_entity_type4",
+ entityId = "my_entity_id4",
+ targetEntityType = Some("my_target_entity_type4"),
+ targetEntityId = Some("my_target_entity_id4"),
+ properties = DataMap(
+ """{
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56
+ }"""),
+ eventTime = DateTime.now
+ )
+ val r5 = Event(
+ event = "my_event5",
+ entityType = "my_entity_type5",
+ entityId = "my_entity_id5",
+ targetEntityType = Some("my_target_entity_type5"),
+ targetEntityId = Some("my_target_entity_id5"),
+ properties = DataMap(
+ """{
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56
+ }"""
+ ),
+ eventTime = DateTime.now
+ )
+ val r6 = Event(
+ event = "my_event6",
+ entityType = "my_entity_type6",
+ entityId = "my_entity_id6",
+ targetEntityType = Some("my_target_entity_type6"),
+ targetEntityId = Some("my_target_entity_id6"),
+ properties = DataMap(
+ """{
+ "prop1" : 6,
+ "prop2" : "value2",
+ "prop3" : [6, 7, 8],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56
+ }"""
+ ),
+ eventTime = DateTime.now
+ )
+
+ // timezone
+ val tz1 = Event(
+ event = "my_event",
+ entityType = "my_entity_type",
+ entityId = "my_entity_id0",
+ targetEntityType = Some("my_target_entity_type"),
+ targetEntityId = Some("my_target_entity_id"),
+ properties = DataMap(
+ """{
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56
+ }"""
+ ),
+ eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")),
+ prId = Some("my_prid")
+ )
+
+ val tz2 = Event(
+ event = "my_event",
+ entityType = "my_entity_type",
+ entityId = "my_entity_id1",
+ eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")),
+ prId = Some("my_prid")
+ )
+
+ val tz3 = Event(
+ event = "my_event",
+ entityType = "my_entity_type",
+ entityId = "my_entity_id2",
+ eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")),
+ prId = Some("my_prid")
+ )
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala
new file mode 100644
index 0000000..d723d07
--- /dev/null
+++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/LEventsSpec.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import org.apache.predictionio.data.storage.{Event, LEvents, PropertyMap, Storage}
+import org.specs2._
+import org.specs2.specification.Step
+
+class LEventsSpec extends Specification with TestEvents {
+ def is = s2"""
+
+ PredictionIO Storage LEvents Specification
+
+ Events can be implemented by:
+ - JDBCLEvents ${jdbcLEvents}
+
+ """
+
+ def jdbcLEvents = sequential ^ s2"""
+
+ JDBCLEvents should
+ - behave like any LEvents implementation ${events(jdbcDO)}
+
+ """
+
+ val appId = 1
+
+ def events(eventClient: LEvents) = sequential ^ s2"""
+
+ init default ${initDefault(eventClient)}
+ insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)}
+ insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)}
+ insert and delete by ID ${insertAndDelete(eventClient)}
+ insert test user events ${insertTestUserEvents(eventClient)}
+ find user events ${findUserEvents(eventClient)}
+ aggregate user properties ${aggregateUserProperties(eventClient)}
+ aggregate one user properties ${aggregateOneUserProperties(eventClient)}
+ aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)}
+ init channel ${initChannel(eventClient)}
+ insert 2 events to channel ${insertChannel(eventClient)}
+ insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)}
+ find events from channel ${findChannel(eventClient)}
+ remove default ${removeDefault(eventClient)}
+ remove channel ${removeChannel(eventClient)}
+
+ """
+
+ val dbName = "test_pio_storage_events_" + hashCode
+
+ def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName)
+
+ def initDefault(eventClient: LEvents) = {
+ eventClient.init(appId)
+ }
+
+ def insertAndGetEvents(eventClient: LEvents) = {
+
+ // events from TestEvents trait
+ val listOfEvents = List(r1,r2,r3)
+
+ val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+ val insertedEventId: List[String] = insertResp
+
+ val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+ .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+ val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+ val getEvents = getResp
+
+ insertedEvent must containTheSameElementsAs(getEvents)
+ }
+
+ def insertAndGetTimezone(eventClient: LEvents) = {
+ val listOfEvents = List(tz1, tz2, tz3)
+
+ val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+ val insertedEventId: List[String] = insertResp
+
+ val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+ .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+ val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+ val getEvents = getResp
+
+ insertedEvent must containTheSameElementsAs(getEvents)
+ }
+
+ def insertAndDelete(eventClient: LEvents) = {
+ val eventId = eventClient.insert(r2, appId)
+
+ val resultBefore = eventClient.get(eventId, appId)
+
+ val expectedBefore = r2.copy(eventId = Some(eventId))
+
+ val deleteStatus = eventClient.delete(eventId, appId)
+
+ val resultAfter = eventClient.get(eventId, appId)
+
+ (resultBefore must beEqualTo(Some(expectedBefore))) and
+ (deleteStatus must beEqualTo(true)) and
+ (resultAfter must beEqualTo(None))
+ }
+
+ def insertTestUserEvents(eventClient: LEvents) = {
+ // events from TestEvents trait
+ val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+ listOfEvents.map{ eventClient.insert(_, appId) }
+
+ success
+ }
+
+ def findUserEvents(eventClient: LEvents) = {
+
+ val results: List[Event] = eventClient.find(
+ appId = appId,
+ entityType = Some("user"))
+ .toList
+ .map(e => e.copy(eventId = None)) // ignore eventID
+
+ // same events in insertTestUserEvents
+ val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+ results must containTheSameElementsAs(expected)
+ }
+
+ def aggregateUserProperties(eventClient: LEvents) = {
+
+ val result: Map[String, PropertyMap] = eventClient.aggregateProperties(
+ appId = appId,
+ entityType = "user")
+
+ val expected = Map(
+ "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+ "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+ )
+
+ result must beEqualTo(expected)
+ }
+
+ def aggregateOneUserProperties(eventClient: LEvents) = {
+ val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+ appId = appId,
+ entityType = "user",
+ entityId = "u1")
+
+ val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime))
+
+ result must beEqualTo(expected)
+ }
+
+ def aggregateNonExistentUserProperties(eventClient: LEvents) = {
+ val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+ appId = appId,
+ entityType = "user",
+ entityId = "u999999")
+
+ result must beEqualTo(None)
+ }
+
+ val channelId = 12
+
+ def initChannel(eventClient: LEvents) = {
+ eventClient.init(appId, Some(channelId))
+ }
+
+ def insertChannel(eventClient: LEvents) = {
+
+ // events from TestEvents trait
+ val listOfEvents = List(r4,r5)
+
+ listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) )
+
+ success
+ }
+
+ def insertAndDeleteChannel(eventClient: LEvents) = {
+
+ val eventId = eventClient.insert(r2, appId, Some(channelId))
+
+ val resultBefore = eventClient.get(eventId, appId, Some(channelId))
+
+ val expectedBefore = r2.copy(eventId = Some(eventId))
+
+ val deleteStatus = eventClient.delete(eventId, appId, Some(channelId))
+
+ val resultAfter = eventClient.get(eventId, appId, Some(channelId))
+
+ (resultBefore must beEqualTo(Some(expectedBefore))) and
+ (deleteStatus must beEqualTo(true)) and
+ (resultAfter must beEqualTo(None))
+ }
+
+ def findChannel(eventClient: LEvents) = {
+
+ val results: List[Event] = eventClient.find(
+ appId = appId,
+ channelId = Some(channelId)
+ )
+ .toList
+ .map(e => e.copy(eventId = None)) // ignore eventId
+
+ // same events in insertChannel
+ val expected = List(r4, r5)
+
+ results must containTheSameElementsAs(expected)
+ }
+
+ def removeDefault(eventClient: LEvents) = {
+ eventClient.remove(appId)
+ }
+
+ def removeChannel(eventClient: LEvents) = {
+ eventClient.remove(appId, Some(channelId))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala
new file mode 100644
index 0000000..71ebf5f
--- /dev/null
+++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/PEventsSpec.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import org.apache.predictionio.data.storage._
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.specs2._
+import org.specs2.specification.Step
+
+class PEventsSpec extends Specification with TestEvents {
+
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+ val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
+
+ val appId = 1
+ val channelId = 6
+ val dbName = "test_pio_storage_events_" + hashCode
+
+ def jdbcLocal = Storage.getDataObject[LEvents](
+ StorageTestUtils.jdbcSourceName,
+ dbName
+ )
+
+ def jdbcPar = Storage.getDataObject[PEvents](
+ StorageTestUtils.jdbcSourceName,
+ dbName
+ )
+
+ def stopSpark = {
+ sc.stop()
+ }
+
+ def is = s2"""
+
+ PredictionIO Storage PEvents Specification
+
+ PEvents can be implemented by:
+ - JDBCPEvents ${jdbcPEvents}
+ - (stop Spark) ${Step(sc.stop())}
+
+ """
+
+ def jdbcPEvents = sequential ^ s2"""
+
+ JDBCPEvents should
+ - behave like any PEvents implementation ${events(jdbcLocal, jdbcPar)}
+ - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_$appId"))}
+ - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_${appId}_$channelId"))}
+
+ """
+
+ def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2"""
+
+ - (init test) ${initTest(localEventClient)}
+ - (insert test events) ${insertTestEvents(localEventClient)}
+ find in default ${find(parEventClient)}
+ find in channel ${findChannel(parEventClient)}
+ aggregate user properties in default ${aggregateUserProperties(parEventClient)}
+ aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)}
+ write to default ${write(parEventClient)}
+ write to channel ${writeChannel(parEventClient)}
+
+ """
+
+ /* setup */
+
+ // events from TestEvents trait
+ val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2)
+ val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4)
+
+ def initTest(localEventClient: LEvents) = {
+ localEventClient.init(appId)
+ localEventClient.init(appId, Some(channelId))
+ }
+
+ def insertTestEvents(localEventClient: LEvents) = {
+ listOfEvents.map( localEventClient.insert(_, appId) )
+ // insert to channel
+ listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) )
+ success
+ }
+
+ /* following are tests */
+
+ def find(parEventClient: PEvents) = {
+ val resultRDD: RDD[Event] = parEventClient.find(
+ appId = appId
+ )(sc)
+
+ val results = resultRDD.collect.toList
+ .map {_.copy(eventId = None)} // ignore eventId
+
+ results must containTheSameElementsAs(listOfEvents)
+ }
+
+ def findChannel(parEventClient: PEvents) = {
+ val resultRDD: RDD[Event] = parEventClient.find(
+ appId = appId,
+ channelId = Some(channelId)
+ )(sc)
+
+ val results = resultRDD.collect.toList
+ .map {_.copy(eventId = None)} // ignore eventId
+
+ results must containTheSameElementsAs(listOfEventsChannel)
+ }
+
+ def aggregateUserProperties(parEventClient: PEvents) = {
+ val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
+ appId = appId,
+ entityType = "user"
+ )(sc)
+ val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
+
+ val expected = Map(
+ "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+ "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+ )
+
+ result must beEqualTo(expected)
+ }
+
+ def aggregateUserPropertiesChannel(parEventClient: PEvents) = {
+ val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
+ appId = appId,
+ channelId = Some(channelId),
+ entityType = "user"
+ )(sc)
+ val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
+
+ val expected = Map(
+ "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime)
+ )
+
+ result must beEqualTo(expected)
+ }
+
+ def write(parEventClient: PEvents) = {
+ val written = List(r5, r6)
+ val writtenRDD = sc.parallelize(written)
+ parEventClient.write(writtenRDD, appId)(sc)
+
+ // read back
+ val resultRDD = parEventClient.find(
+ appId = appId
+ )(sc)
+
+ val results = resultRDD.collect.toList
+ .map { _.copy(eventId = None)} // ignore eventId
+
+ val expected = listOfEvents ++ written
+
+ results must containTheSameElementsAs(expected)
+ }
+
+ def writeChannel(parEventClient: PEvents) = {
+ val written = List(r1, r5, r6)
+ val writtenRDD = sc.parallelize(written)
+ parEventClient.write(writtenRDD, appId, Some(channelId))(sc)
+
+ // read back
+ val resultRDD = parEventClient.find(
+ appId = appId,
+ channelId = Some(channelId)
+ )(sc)
+
+ val results = resultRDD.collect.toList
+ .map { _.copy(eventId = None)} // ignore eventId
+
+ val expected = listOfEventsChannel ++ written
+
+ results must containTheSameElementsAs(expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala
new file mode 100644
index 0000000..4bf90cf
--- /dev/null
+++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/StorageTestUtils.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import scalikejdbc._
+
+object StorageTestUtils {
+ val jdbcSourceName = "PGSQL"
+
+ def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s =>
+ SQL(s"drop table $table").execute().apply()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/647b480b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala
----------------------------------------------------------------------
diff --git a/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala
new file mode 100644
index 0000000..2cb08e5
--- /dev/null
+++ b/storage/jdbc/src/test/scala/org/apache/predictionio/data/storage/jdbc/TestEvents.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.jdbc
+
+import org.apache.predictionio.data.storage.{DataMap, Event}
+import org.joda.time.{DateTime, DateTimeZone}
+
+trait TestEvents {
+
+ val u1BaseTime = new DateTime(654321)
+ val u2BaseTime = new DateTime(6543210)
+ val u3BaseTime = new DateTime(6543410)
+
+ // u1 events
+ val u1e1 = Event(
+ event = "$set",
+ entityType = "user",
+ entityId = "u1",
+ properties = DataMap(
+ """{
+ "a" : 1,
+ "b" : "value2",
+ "d" : [1, 2, 3],
+ }"""),
+ eventTime = u1BaseTime
+ )
+
+ val u1e2 = u1e1.copy(
+ event = "$set",
+ properties = DataMap("""{"a" : 2}"""),
+ eventTime = u1BaseTime.plusDays(1)
+ )
+
+ val u1e3 = u1e1.copy(
+ event = "$set",
+ properties = DataMap("""{"b" : "value4"}"""),
+ eventTime = u1BaseTime.plusDays(2)
+ )
+
+ val u1e4 = u1e1.copy(
+ event = "$unset",
+ properties = DataMap("""{"b" : null}"""),
+ eventTime = u1BaseTime.plusDays(3)
+ )
+
+ val u1e5 = u1e1.copy(
+ event = "$set",
+ properties = DataMap("""{"e" : "new"}"""),
+ eventTime = u1BaseTime.plusDays(4)
+ )
+
+ val u1LastTime = u1BaseTime.plusDays(4)
+ val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}"""
+
+ // delete event for u1
+ val u1ed = u1e1.copy(
+ event = "$delete",
+ properties = DataMap(),
+ eventTime = u1BaseTime.plusDays(5)
+ )
+
+ // u2 events
+ val u2e1 = Event(
+ event = "$set",
+ entityType = "user",
+ entityId = "u2",
+ properties = DataMap(
+ """{
+ "a" : 21,
+ "b" : "value12",
+ "d" : [7, 5, 6],
+ }"""),
+ eventTime = u2BaseTime
+ )
+
+ val u2e2 = u2e1.copy(
+ event = "$unset",
+ properties = DataMap("""{"a" : null}"""),
+ eventTime = u2BaseTime.plusDays(1)
+ )
+
+ val u2e3 = u2e1.copy(
+ event = "$set",
+ properties = DataMap("""{"b" : "value9", "g": "new11"}"""),
+ eventTime = u2BaseTime.plusDays(2)
+ )
+
+ val u2LastTime = u2BaseTime.plusDays(2)
+ val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}"""
+
+ // u3 events
+ val u3e1 = Event(
+ event = "$set",
+ entityType = "user",
+ entityId = "u3",
+ properties = DataMap(
+ """{
+ "a" : 22,
+ "b" : "value13",
+ "d" : [5, 6, 1],
+ }"""),
+ eventTime = u3BaseTime
+ )
+
+ val u3e2 = u3e1.copy(
+ event = "$unset",
+ properties = DataMap("""{"a" : null}"""),
+ eventTime = u3BaseTime.plusDays(1)
+ )
+
+ val u3e3 = u3e1.copy(
+ event = "$set",
+ properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""),
+ eventTime = u3BaseTime.plusDays(2)
+ )
+
+ val u3LastTime = u3BaseTime.plusDays(2)
+ val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}"""
+
+ // some random events
+ val r1 = Event(
+ event = "my_event",
+ entityType = "my_entity_type",
+ entityId = "my_entity_id",
+ targetEntityType = Some("my_target_entity_type"),
+ targetEntityId = Some("my_target_entity_id"),
+ properties = DataMap(
+ """{
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56
+ }"""
+ ),
+ eventTime = DateTime.now,
+ prId = Some("my_prid")
+ )
+ val r2 = Event(
+ event = "my_event2",
+ entityType = "my_entity_type2",
+ entityId = "my_entity_id2"
+ )
+ val r3 = Event(
+ event = "my_event3",
+ entityType = "my_entity_type",
+ entityId = "my_entity_id",
+ targetEntityType = Some("my_target_entity_type"),
+ targetEntityId = Some("my_target_entity_id"),
+ properties = DataMap(
+ """{
+ "propA" : 1.2345,
+ "propB" : "valueB",
+ }"""
+ ),
+ prId = Some("my_prid")
+ )
+ val r4 = Event(
+ event = "my_event4",
+ entityType = "my_entity_type4",
+ entityId = "my_entity_id4",
+ targetEntityType = Some("my_target_entity_type4"),
+ targetEntityId = Some("my_target_entity_id4"),
+ properties = DataMap(
+ """{
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56
+ }"""),
+ eventTime = DateTime.now
+ )
+ val r5 = Event(
+ event = "my_event5",
+ entityType = "my_entity_type5",
+ entityId = "my_entity_id5",
+ targetEntityType = Some("my_target_entity_type5"),
+ targetEntityId = Some("my_target_entity_id5"),
+ properties = DataMap(
+ """{
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56
+ }"""
+ ),
+ eventTime = DateTime.now
+ )
+ val r6 = Event(
+ event = "my_event6",
+ entityType = "my_entity_type6",
+ entityId = "my_entity_id6",
+ targetEntityType = Some("my_target_entity_type6"),
+ targetEntityId = Some("my_target_entity_id6"),
+ properties = DataMap(
+ """{
+ "prop1" : 6,
+ "prop2" : "value2",
+ "prop3" : [6, 7, 8],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56
+ }"""
+ ),
+ eventTime = DateTime.now
+ )
+
+ // timezone
+ val tz1 = Event(
+ event = "my_event",
+ entityType = "my_entity_type",
+ entityId = "my_entity_id0",
+ targetEntityType = Some("my_target_entity_type"),
+ targetEntityId = Some("my_target_entity_id"),
+ properties = DataMap(
+ """{
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c"],
+ "prop6" : 4.56
+ }"""
+ ),
+ eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")),
+ prId = Some("my_prid")
+ )
+
+ val tz2 = Event(
+ event = "my_event",
+ entityType = "my_entity_type",
+ entityId = "my_entity_id1",
+ eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")),
+ prId = Some("my_prid")
+ )
+
+ val tz3 = Event(
+ event = "my_event",
+ entityType = "my_entity_type",
+ entityId = "my_entity_id2",
+ eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")),
+ prId = Some("my_prid")
+ )
+
+}