You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:38 UTC
[07/34] incubator-predictionio git commit: rename all except examples
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala b/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala
deleted file mode 100644
index 74614b2..0000000
--- a/data/src/test/scala/io/prediction/data/storage/PEventsSpec.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import org.specs2._
-import org.specs2.specification.Step
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class PEventsSpec extends Specification with TestEvents {
-
- System.clearProperty("spark.driver.port")
- System.clearProperty("spark.hostPort")
- val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
-
- val appId = 1
- val channelId = 6
- val dbName = "test_pio_storage_events_" + hashCode
-
- def hbLocal = Storage.getDataObject[LEvents](
- StorageTestUtils.hbaseSourceName,
- dbName
- )
-
- def hbPar = Storage.getDataObject[PEvents](
- StorageTestUtils.hbaseSourceName,
- dbName
- )
-
- def jdbcLocal = Storage.getDataObject[LEvents](
- StorageTestUtils.jdbcSourceName,
- dbName
- )
-
- def jdbcPar = Storage.getDataObject[PEvents](
- StorageTestUtils.jdbcSourceName,
- dbName
- )
-
- def stopSpark = {
- sc.stop()
- }
-
- def is = s2"""
-
- PredictionIO Storage PEvents Specification
-
- PEvents can be implemented by:
- - HBPEvents ${hbPEvents}
- - JDBCPEvents ${jdbcPEvents}
- - (stop Spark) ${Step(sc.stop())}
-
- """
-
- def hbPEvents = sequential ^ s2"""
-
- HBPEvents should
- - behave like any PEvents implementation ${events(hbLocal, hbPar)}
- - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
-
- """
-
- def jdbcPEvents = sequential ^ s2"""
-
- JDBCPEvents should
- - behave like any PEvents implementation ${events(jdbcLocal, jdbcPar)}
- - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_$appId"))}
- - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_${appId}_$channelId"))}
-
- """
-
- def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2"""
-
- - (init test) ${initTest(localEventClient)}
- - (insert test events) ${insertTestEvents(localEventClient)}
- find in default ${find(parEventClient)}
- find in channel ${findChannel(parEventClient)}
- aggregate user properties in default ${aggregateUserProperties(parEventClient)}
- aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)}
- write to default ${write(parEventClient)}
- write to channel ${writeChannel(parEventClient)}
-
- """
-
- /* setup */
-
- // events from TestEvents trait
- val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2)
- val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4)
-
- def initTest(localEventClient: LEvents) = {
- localEventClient.init(appId)
- localEventClient.init(appId, Some(channelId))
- }
-
- def insertTestEvents(localEventClient: LEvents) = {
- listOfEvents.map( localEventClient.insert(_, appId) )
- // insert to channel
- listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) )
- success
- }
-
- /* following are tests */
-
- def find(parEventClient: PEvents) = {
- val resultRDD: RDD[Event] = parEventClient.find(
- appId = appId
- )(sc)
-
- val results = resultRDD.collect.toList
- .map {_.copy(eventId = None)} // ignore eventId
-
- results must containTheSameElementsAs(listOfEvents)
- }
-
- def findChannel(parEventClient: PEvents) = {
- val resultRDD: RDD[Event] = parEventClient.find(
- appId = appId,
- channelId = Some(channelId)
- )(sc)
-
- val results = resultRDD.collect.toList
- .map {_.copy(eventId = None)} // ignore eventId
-
- results must containTheSameElementsAs(listOfEventsChannel)
- }
-
- def aggregateUserProperties(parEventClient: PEvents) = {
- val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
- appId = appId,
- entityType = "user"
- )(sc)
- val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
-
- val expected = Map(
- "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
- "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
- )
-
- result must beEqualTo(expected)
- }
-
- def aggregateUserPropertiesChannel(parEventClient: PEvents) = {
- val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
- appId = appId,
- channelId = Some(channelId),
- entityType = "user"
- )(sc)
- val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
-
- val expected = Map(
- "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime)
- )
-
- result must beEqualTo(expected)
- }
-
- def write(parEventClient: PEvents) = {
- val written = List(r5, r6)
- val writtenRDD = sc.parallelize(written)
- parEventClient.write(writtenRDD, appId)(sc)
-
- // read back
- val resultRDD = parEventClient.find(
- appId = appId
- )(sc)
-
- val results = resultRDD.collect.toList
- .map { _.copy(eventId = None)} // ignore eventId
-
- val expected = listOfEvents ++ written
-
- results must containTheSameElementsAs(expected)
- }
-
- def writeChannel(parEventClient: PEvents) = {
- val written = List(r1, r5, r6)
- val writtenRDD = sc.parallelize(written)
- parEventClient.write(writtenRDD, appId, Some(channelId))(sc)
-
- // read back
- val resultRDD = parEventClient.find(
- appId = appId,
- channelId = Some(channelId)
- )(sc)
-
- val results = resultRDD.collect.toList
- .map { _.copy(eventId = None)} // ignore eventId
-
- val expected = listOfEventsChannel ++ written
-
- results must containTheSameElementsAs(expected)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala b/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala
deleted file mode 100644
index 74615a1..0000000
--- a/data/src/test/scala/io/prediction/data/storage/StorageTestUtils.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import io.prediction.data.storage.hbase.HBLEvents
-import scalikejdbc._
-
-object StorageTestUtils {
- val hbaseSourceName = "HBASE"
- val jdbcSourceName = "PGSQL"
-
- def dropHBaseNamespace(namespace: String): Unit = {
- val eventDb = Storage.getDataObject[LEvents](hbaseSourceName, namespace)
- .asInstanceOf[HBLEvents]
- val admin = eventDb.client.admin
- val tableNames = admin.listTableNamesByNamespace(namespace)
- tableNames.foreach { name =>
- admin.disableTable(name)
- admin.deleteTable(name)
- }
-
- //Only empty namespaces (no tables) can be removed.
- admin.deleteNamespace(namespace)
- }
-
- def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s =>
- SQL(s"drop table $table").execute().apply()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/storage/TestEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/storage/TestEvents.scala b/data/src/test/scala/io/prediction/data/storage/TestEvents.scala
deleted file mode 100644
index 4fc2469..0000000
--- a/data/src/test/scala/io/prediction/data/storage/TestEvents.scala
+++ /dev/null
@@ -1,263 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.storage
-
-import org.joda.time.DateTime
-import org.joda.time.DateTimeZone
-
-trait TestEvents {
-
- val u1BaseTime = new DateTime(654321)
- val u2BaseTime = new DateTime(6543210)
- val u3BaseTime = new DateTime(6543410)
-
- // u1 events
- val u1e1 = Event(
- event = "$set",
- entityType = "user",
- entityId = "u1",
- properties = DataMap(
- """{
- "a" : 1,
- "b" : "value2",
- "d" : [1, 2, 3],
- }"""),
- eventTime = u1BaseTime
- )
-
- val u1e2 = u1e1.copy(
- event = "$set",
- properties = DataMap("""{"a" : 2}"""),
- eventTime = u1BaseTime.plusDays(1)
- )
-
- val u1e3 = u1e1.copy(
- event = "$set",
- properties = DataMap("""{"b" : "value4"}"""),
- eventTime = u1BaseTime.plusDays(2)
- )
-
- val u1e4 = u1e1.copy(
- event = "$unset",
- properties = DataMap("""{"b" : null}"""),
- eventTime = u1BaseTime.plusDays(3)
- )
-
- val u1e5 = u1e1.copy(
- event = "$set",
- properties = DataMap("""{"e" : "new"}"""),
- eventTime = u1BaseTime.plusDays(4)
- )
-
- val u1LastTime = u1BaseTime.plusDays(4)
- val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}"""
-
- // delete event for u1
- val u1ed = u1e1.copy(
- event = "$delete",
- properties = DataMap(),
- eventTime = u1BaseTime.plusDays(5)
- )
-
- // u2 events
- val u2e1 = Event(
- event = "$set",
- entityType = "user",
- entityId = "u2",
- properties = DataMap(
- """{
- "a" : 21,
- "b" : "value12",
- "d" : [7, 5, 6],
- }"""),
- eventTime = u2BaseTime
- )
-
- val u2e2 = u2e1.copy(
- event = "$unset",
- properties = DataMap("""{"a" : null}"""),
- eventTime = u2BaseTime.plusDays(1)
- )
-
- val u2e3 = u2e1.copy(
- event = "$set",
- properties = DataMap("""{"b" : "value9", "g": "new11"}"""),
- eventTime = u2BaseTime.plusDays(2)
- )
-
- val u2LastTime = u2BaseTime.plusDays(2)
- val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}"""
-
- // u3 events
- val u3e1 = Event(
- event = "$set",
- entityType = "user",
- entityId = "u3",
- properties = DataMap(
- """{
- "a" : 22,
- "b" : "value13",
- "d" : [5, 6, 1],
- }"""),
- eventTime = u3BaseTime
- )
-
- val u3e2 = u3e1.copy(
- event = "$unset",
- properties = DataMap("""{"a" : null}"""),
- eventTime = u3BaseTime.plusDays(1)
- )
-
- val u3e3 = u3e1.copy(
- event = "$set",
- properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""),
- eventTime = u3BaseTime.plusDays(2)
- )
-
- val u3LastTime = u3BaseTime.plusDays(2)
- val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}"""
-
- // some random events
- val r1 = Event(
- event = "my_event",
- entityType = "my_entity_type",
- entityId = "my_entity_id",
- targetEntityType = Some("my_target_entity_type"),
- targetEntityId = Some("my_target_entity_id"),
- properties = DataMap(
- """{
- "prop1" : 1,
- "prop2" : "value2",
- "prop3" : [1, 2, 3],
- "prop4" : true,
- "prop5" : ["a", "b", "c"],
- "prop6" : 4.56
- }"""
- ),
- eventTime = DateTime.now,
- prId = Some("my_prid")
- )
- val r2 = Event(
- event = "my_event2",
- entityType = "my_entity_type2",
- entityId = "my_entity_id2"
- )
- val r3 = Event(
- event = "my_event3",
- entityType = "my_entity_type",
- entityId = "my_entity_id",
- targetEntityType = Some("my_target_entity_type"),
- targetEntityId = Some("my_target_entity_id"),
- properties = DataMap(
- """{
- "propA" : 1.2345,
- "propB" : "valueB",
- }"""
- ),
- prId = Some("my_prid")
- )
- val r4 = Event(
- event = "my_event4",
- entityType = "my_entity_type4",
- entityId = "my_entity_id4",
- targetEntityType = Some("my_target_entity_type4"),
- targetEntityId = Some("my_target_entity_id4"),
- properties = DataMap(
- """{
- "prop1" : 1,
- "prop2" : "value2",
- "prop3" : [1, 2, 3],
- "prop4" : true,
- "prop5" : ["a", "b", "c"],
- "prop6" : 4.56
- }"""),
- eventTime = DateTime.now
- )
- val r5 = Event(
- event = "my_event5",
- entityType = "my_entity_type5",
- entityId = "my_entity_id5",
- targetEntityType = Some("my_target_entity_type5"),
- targetEntityId = Some("my_target_entity_id5"),
- properties = DataMap(
- """{
- "prop1" : 1,
- "prop2" : "value2",
- "prop3" : [1, 2, 3],
- "prop4" : true,
- "prop5" : ["a", "b", "c"],
- "prop6" : 4.56
- }"""
- ),
- eventTime = DateTime.now
- )
- val r6 = Event(
- event = "my_event6",
- entityType = "my_entity_type6",
- entityId = "my_entity_id6",
- targetEntityType = Some("my_target_entity_type6"),
- targetEntityId = Some("my_target_entity_id6"),
- properties = DataMap(
- """{
- "prop1" : 6,
- "prop2" : "value2",
- "prop3" : [6, 7, 8],
- "prop4" : true,
- "prop5" : ["a", "b", "c"],
- "prop6" : 4.56
- }"""
- ),
- eventTime = DateTime.now
- )
-
- // timezone
- val tz1 = Event(
- event = "my_event",
- entityType = "my_entity_type",
- entityId = "my_entity_id0",
- targetEntityType = Some("my_target_entity_type"),
- targetEntityId = Some("my_target_entity_id"),
- properties = DataMap(
- """{
- "prop1" : 1,
- "prop2" : "value2",
- "prop3" : [1, 2, 3],
- "prop4" : true,
- "prop5" : ["a", "b", "c"],
- "prop6" : 4.56
- }"""
- ),
- eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")),
- prId = Some("my_prid")
- )
-
- val tz2 = Event(
- event = "my_event",
- entityType = "my_entity_type",
- entityId = "my_entity_id1",
- eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")),
- prId = Some("my_prid")
- )
-
- val tz3 = Event(
- event = "my_event",
- entityType = "my_entity_type",
- entityId = "my_entity_id2",
- eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")),
- prId = Some("my_prid")
- )
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala b/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala
deleted file mode 100644
index 4009e0f..0000000
--- a/data/src/test/scala/io/prediction/data/webhooks/ConnectorTestUtil.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks
-
-import org.specs2.execute.Result
-import org.specs2.mutable._
-
-import org.json4s.JObject
-import org.json4s.DefaultFormats
-import org.json4s.native.JsonMethods.parse
-import org.json4s.native.Serialization.write
-
-/** TestUtil for JsonConnector */
-trait ConnectorTestUtil extends Specification {
-
- implicit val formats = DefaultFormats
-
- def check(connector: JsonConnector, original: String, event: String): Result = {
- val originalJson = parse(original).asInstanceOf[JObject]
- val eventJson = parse(event).asInstanceOf[JObject]
- // write and parse back to discard any JNothing field
- val result = parse(write(connector.toEventJson(originalJson))).asInstanceOf[JObject]
- result.obj must containTheSameElementsAs(eventJson.obj)
- }
-
- def check(connector: FormConnector, original: Map[String, String], event: String) = {
-
- val eventJson = parse(event).asInstanceOf[JObject]
- // write and parse back to discard any JNothing field
- val result = parse(write(connector.toEventJson(original))).asInstanceOf[JObject]
-
- result.obj must containTheSameElementsAs(eventJson.obj)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala
deleted file mode 100644
index 7f6ad8f..0000000
--- a/data/src/test/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnectorSpec.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks.exampleform
-
-import io.prediction.data.webhooks.ConnectorTestUtil
-
-import org.specs2.mutable._
-
-/** Test the ExampleFormConnector */
-class ExampleFormConnectorSpec extends Specification with ConnectorTestUtil {
-
- "ExampleFormConnector" should {
-
- "convert userAction to Event JSON" in {
- // webhooks input
- val userAction = Map(
- "type" -> "userAction",
- "userId" -> "as34smg4",
- "event" -> "do_something",
- "context[ip]" -> "24.5.68.47", // optional
- "context[prop1]" -> "2.345", // optional
- "context[prop2]" -> "value1", // optional
- "anotherProperty1" -> "100",
- "anotherProperty2"-> "optional1", // optional
- "timestamp" -> "2015-01-02T00:30:12.984Z"
- )
-
- // expected converted Event JSON
- val expected = """
- {
- "event": "do_something",
- "entityType": "user",
- "entityId": "as34smg4",
- "properties": {
- "context": {
- "ip": "24.5.68.47",
- "prop1": 2.345
- "prop2": "value1"
- },
- "anotherProperty1": 100,
- "anotherProperty2": "optional1"
- }
- "eventTime": "2015-01-02T00:30:12.984Z"
- }
- """
-
- check(ExampleFormConnector, userAction, expected)
- }
-
- "convert userAction without optional fields to Event JSON" in {
- // webhooks input
- val userAction = Map(
- "type" -> "userAction",
- "userId" -> "as34smg4",
- "event" -> "do_something",
- "anotherProperty1" -> "100",
- "timestamp" -> "2015-01-02T00:30:12.984Z"
- )
-
- // expected converted Event JSON
- val expected = """
- {
- "event": "do_something",
- "entityType": "user",
- "entityId": "as34smg4",
- "properties": {
- "anotherProperty1": 100,
- }
- "eventTime": "2015-01-02T00:30:12.984Z"
- }
- """
-
- check(ExampleFormConnector, userAction, expected)
- }
-
- "convert userActionItem to Event JSON" in {
- // webhooks input
- val userActionItem = Map(
- "type" -> "userActionItem",
- "userId" -> "as34smg4",
- "event" -> "do_something_on",
- "itemId" -> "kfjd312bc",
- "context[ip]" -> "1.23.4.56",
- "context[prop1]" -> "2.345",
- "context[prop2]" -> "value1",
- "anotherPropertyA" -> "4.567", // optional
- "anotherPropertyB" -> "false", // optional
- "timestamp" -> "2015-01-15T04:20:23.567Z"
- )
-
- // expected converted Event JSON
- val expected = """
- {
- "event": "do_something_on",
- "entityType": "user",
- "entityId": "as34smg4",
- "targetEntityType": "item",
- "targetEntityId": "kfjd312bc"
- "properties": {
- "context": {
- "ip": "1.23.4.56",
- "prop1": 2.345
- "prop2": "value1"
- },
- "anotherPropertyA": 4.567
- "anotherPropertyB": false
- }
- "eventTime": "2015-01-15T04:20:23.567Z"
- }
- """
-
- check(ExampleFormConnector, userActionItem, expected)
- }
-
- "convert userActionItem without optional fields to Event JSON" in {
- // webhooks input
- val userActionItem = Map(
- "type" -> "userActionItem",
- "userId" -> "as34smg4",
- "event" -> "do_something_on",
- "itemId" -> "kfjd312bc",
- "context[ip]" -> "1.23.4.56",
- "context[prop1]" -> "2.345",
- "context[prop2]" -> "value1",
- "timestamp" -> "2015-01-15T04:20:23.567Z"
- )
-
- // expected converted Event JSON
- val expected = """
- {
- "event": "do_something_on",
- "entityType": "user",
- "entityId": "as34smg4",
- "targetEntityType": "item",
- "targetEntityId": "kfjd312bc"
- "properties": {
- "context": {
- "ip": "1.23.4.56",
- "prop1": 2.345
- "prop2": "value1"
- }
- }
- "eventTime": "2015-01-15T04:20:23.567Z"
- }
- """
-
- check(ExampleFormConnector, userActionItem, expected)
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala
deleted file mode 100644
index bdf1cc4..0000000
--- a/data/src/test/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks.examplejson
-
-import io.prediction.data.webhooks.ConnectorTestUtil
-
-import org.specs2.mutable._
-
-/** Test the ExampleJsonConnector */
-class ExampleJsonConnectorSpec extends Specification with ConnectorTestUtil {
-
- "ExampleJsonConnector" should {
-
- "convert userAction to Event JSON" in {
- // webhooks input
- val userAction = """
- {
- "type": "userAction"
- "userId": "as34smg4",
- "event": "do_something",
- "context": {
- "ip": "24.5.68.47",
- "prop1": 2.345
- "prop2": "value1"
- },
- "anotherProperty1": 100,
- "anotherProperty2": "optional1",
- "timestamp": "2015-01-02T00:30:12.984Z"
- }
- """
-
- // expected converted Event JSON
- val expected = """
- {
- "event": "do_something",
- "entityType": "user",
- "entityId": "as34smg4",
- "properties": {
- "context": {
- "ip": "24.5.68.47",
- "prop1": 2.345
- "prop2": "value1"
- },
- "anotherProperty1": 100,
- "anotherProperty2": "optional1"
- }
- "eventTime": "2015-01-02T00:30:12.984Z"
- }
- """
-
- check(ExampleJsonConnector, userAction, expected)
- }
-
- "convert userAction without optional field to Event JSON" in {
- // webhooks input
- val userAction = """
- {
- "type": "userAction"
- "userId": "as34smg4",
- "event": "do_something",
- "anotherProperty1": 100,
- "timestamp": "2015-01-02T00:30:12.984Z"
- }
- """
-
- // expected converted Event JSON
- val expected = """
- {
- "event": "do_something",
- "entityType": "user",
- "entityId": "as34smg4",
- "properties": {
- "anotherProperty1": 100,
- }
- "eventTime": "2015-01-02T00:30:12.984Z"
- }
- """
-
- check(ExampleJsonConnector, userAction, expected)
- }
-
- "convert userActionItem to Event JSON" in {
- // webhooks input
- val userActionItem = """
- {
- "type": "userActionItem"
- "userId": "as34smg4",
- "event": "do_something_on",
- "itemId": "kfjd312bc",
- "context": {
- "ip": "1.23.4.56",
- "prop1": 2.345
- "prop2": "value1"
- },
- "anotherPropertyA": 4.567
- "anotherPropertyB": false
- "timestamp": "2015-01-15T04:20:23.567Z"
- }
- """
-
- // expected converted Event JSON
- val expected = """
- {
- "event": "do_something_on",
- "entityType": "user",
- "entityId": "as34smg4",
- "targetEntityType": "item",
- "targetEntityId": "kfjd312bc"
- "properties": {
- "context": {
- "ip": "1.23.4.56",
- "prop1": 2.345
- "prop2": "value1"
- },
- "anotherPropertyA": 4.567
- "anotherPropertyB": false
- }
- "eventTime": "2015-01-15T04:20:23.567Z"
- }
- """
-
- check(ExampleJsonConnector, userActionItem, expected)
- }
-
- "convert userActionItem without optional fields to Event JSON" in {
- // webhooks input
- val userActionItem = """
- {
- "type": "userActionItem"
- "userId": "as34smg4",
- "event": "do_something_on",
- "itemId": "kfjd312bc",
- "context": {
- "ip": "1.23.4.56",
- "prop1": 2.345
- "prop2": "value1"
- }
- "timestamp": "2015-01-15T04:20:23.567Z"
- }
- """
-
- // expected converted Event JSON
- val expected = """
- {
- "event": "do_something_on",
- "entityType": "user",
- "entityId": "as34smg4",
- "targetEntityType": "item",
- "targetEntityId": "kfjd312bc"
- "properties": {
- "context": {
- "ip": "1.23.4.56",
- "prop1": 2.345
- "prop2": "value1"
- }
- }
- "eventTime": "2015-01-15T04:20:23.567Z"
- }
- """
-
- check(ExampleJsonConnector, userActionItem, expected)
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala
deleted file mode 100644
index 56484c2..0000000
--- a/data/src/test/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnectorSpec.scala
+++ /dev/null
@@ -1,254 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks.mailchimp
-
-import io.prediction.data.webhooks.ConnectorTestUtil
-
-import org.specs2.mutable._
-
-class MailChimpConnectorSpec extends Specification with ConnectorTestUtil {
-
- // TOOD: test other events
- // TODO: test different optional fields
-
- "MailChimpConnector" should {
-
- "convert subscribe to event JSON" in {
-
- val subscribe = Map(
- "type" -> "subscribe",
- "fired_at" -> "2009-03-26 21:35:57",
- "data[id]" -> "8a25ff1d98",
- "data[list_id]" -> "a6b5da1054",
- "data[email]" -> "api@mailchimp.com",
- "data[email_type]" -> "html",
- "data[merges][EMAIL]" -> "api@mailchimp.com",
- "data[merges][FNAME]" -> "MailChimp",
- "data[merges][LNAME]" -> "API",
- "data[merges][INTERESTS]" -> "Group1,Group2", //optional
- "data[ip_opt]" -> "10.20.10.30",
- "data[ip_signup]" -> "10.20.10.30"
- )
-
- val expected = """
- {
- "event" : "subscribe",
- "entityType" : "user",
- "entityId" : "8a25ff1d98",
- "targetEntityType" : "list",
- "targetEntityId" : "a6b5da1054",
- "properties" : {
- "email" : "api@mailchimp.com",
- "email_type" : "html",
- "merges" : {
- "EMAIL" : "api@mailchimp.com",
- "FNAME" : "MailChimp",
- "LNAME" : "API"
- "INTERESTS" : "Group1,Group2"
- },
- "ip_opt" : "10.20.10.30",
- "ip_signup" : "10.20.10.30"
- },
- "eventTime" : "2009-03-26T21:35:57.000Z"
- }
- """
-
- check(MailChimpConnector, subscribe, expected)
- }
-
- //check unsubscribe to event Json
- "convert unsubscribe to event JSON" in {
-
- val unsubscribe = Map(
- "type" -> "unsubscribe",
- "fired_at" -> "2009-03-26 21:40:57",
- "data[action]" -> "unsub",
- "data[reason]" -> "manual",
- "data[id]" -> "8a25ff1d98",
- "data[list_id]" -> "a6b5da1054",
- "data[email]" -> "api+unsub@mailchimp.com",
- "data[email_type]" -> "html",
- "data[merges][EMAIL]" -> "api+unsub@mailchimp.com",
- "data[merges][FNAME]" -> "MailChimp",
- "data[merges][LNAME]" -> "API",
- "data[merges][INTERESTS]" -> "Group1,Group2", //optional
- "data[ip_opt]" -> "10.20.10.30",
- "data[campaign_id]" -> "cb398d21d2"
- )
-
- val expected = """
- {
- "event" : "unsubscribe",
- "entityType" : "user",
- "entityId" : "8a25ff1d98",
- "targetEntityType" : "list",
- "targetEntityId" : "a6b5da1054",
- "properties" : {
- "action" : "unsub",
- "reason" : "manual",
- "email" : "api+unsub@mailchimp.com",
- "email_type" : "html",
- "merges" : {
- "EMAIL" : "api+unsub@mailchimp.com",
- "FNAME" : "MailChimp",
- "LNAME" : "API"
- "INTERESTS" : "Group1,Group2"
- },
- "ip_opt" : "10.20.10.30",
- "campaign_id" : "cb398d21d2"
- },
- "eventTime" : "2009-03-26T21:40:57.000Z"
- }
- """
-
- check(MailChimpConnector, unsubscribe, expected)
- }
-
- //check profile update to event Json
- "convert profile update to event JSON" in {
-
- val profileUpdate = Map(
- "type" -> "profile",
- "fired_at" -> "2009-03-26 21:31:21",
- "data[id]" -> "8a25ff1d98",
- "data[list_id]" -> "a6b5da1054",
- "data[email]" -> "api@mailchimp.com",
- "data[email_type]" -> "html",
- "data[merges][EMAIL]" -> "api@mailchimp.com",
- "data[merges][FNAME]" -> "MailChimp",
- "data[merges][LNAME]" -> "API",
- "data[merges][INTERESTS]" -> "Group1,Group2", //optional
- "data[ip_opt]" -> "10.20.10.30"
- )
-
- val expected = """
- {
- "event" : "profile",
- "entityType" : "user",
- "entityId" : "8a25ff1d98",
- "targetEntityType" : "list",
- "targetEntityId" : "a6b5da1054",
- "properties" : {
- "email" : "api@mailchimp.com",
- "email_type" : "html",
- "merges" : {
- "EMAIL" : "api@mailchimp.com",
- "FNAME" : "MailChimp",
- "LNAME" : "API"
- "INTERESTS" : "Group1,Group2"
- },
- "ip_opt" : "10.20.10.30"
- },
- "eventTime" : "2009-03-26T21:31:21.000Z"
- }
- """
-
- check(MailChimpConnector, profileUpdate, expected)
- }
-
- //check email update to event Json
- "convert email update to event JSON" in {
-
- val emailUpdate = Map(
- "type" -> "upemail",
- "fired_at" -> "2009-03-26 22:15:09",
- "data[list_id]" -> "a6b5da1054",
- "data[new_id]" -> "51da8c3259",
- "data[new_email]" -> "api+new@mailchimp.com",
- "data[old_email]" -> "api+old@mailchimp.com"
- )
-
- val expected = """
- {
- "event" : "upemail",
- "entityType" : "user",
- "entityId" : "51da8c3259",
- "targetEntityType" : "list",
- "targetEntityId" : "a6b5da1054",
- "properties" : {
- "new_email" : "api+new@mailchimp.com",
- "old_email" : "api+old@mailchimp.com"
- },
- "eventTime" : "2009-03-26T22:15:09.000Z"
- }
- """
-
- check(MailChimpConnector, emailUpdate, expected)
- }
-
- //check cleaned email to event Json
- "convert cleaned email to event JSON" in {
-
- val cleanedEmail = Map(
- "type" -> "cleaned",
- "fired_at" -> "2009-03-26 22:01:00",
- "data[list_id]" -> "a6b5da1054",
- "data[campaign_id]" -> "4fjk2ma9xd",
- "data[reason]" -> "hard",
- "data[email]" -> "api+cleaned@mailchimp.com"
- )
-
- val expected = """
- {
- "event" : "cleaned",
- "entityType" : "list",
- "entityId" : "a6b5da1054",
- "properties" : {
- "campaignId" : "4fjk2ma9xd",
- "reason" : "hard",
- "email" : "api+cleaned@mailchimp.com"
- },
- "eventTime" : "2009-03-26T22:01:00.000Z"
- }
- """
-
- check(MailChimpConnector, cleanedEmail, expected)
- }
-
- //check campaign sending status to event Json
- "convert campaign sending status to event JSON" in {
-
- val campaign = Map(
- "type" -> "campaign",
- "fired_at" -> "2009-03-26 22:15:09",
- "data[id]" -> "5aa2102003",
- "data[subject]" -> "Test Campaign Subject",
- "data[status]" -> "sent",
- "data[reason]" -> "",
- "data[list_id]" -> "a6b5da1054"
- )
-
- val expected = """
- {
- "event" : "campaign",
- "entityType" : "campaign",
- "entityId" : "5aa2102003",
- "targetEntityType" : "list",
- "targetEntityId" : "a6b5da1054",
- "properties" : {
- "subject" : "Test Campaign Subject",
- "status" : "sent",
- "reason" : ""
- },
- "eventTime" : "2009-03-26T22:15:09.000Z"
- }
- """
-
- check(MailChimpConnector, campaign, expected)
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala b/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala
deleted file mode 100644
index d7587cd..0000000
--- a/data/src/test/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnectorSpec.scala
+++ /dev/null
@@ -1,335 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks.segmentio
-
-import io.prediction.data.webhooks.ConnectorTestUtil
-
-import org.specs2.mutable._
-
-class SegmentIOConnectorSpec extends Specification with ConnectorTestUtil {
-
- // TODO: test different optional fields
-
- val commonFields =
- s"""
- | "anonymous_id": "id",
- | "sent_at": "sendAt",
- | "version": "2",
- """.stripMargin
-
- "SegmentIOConnector" should {
-
- "convert group with context to event JSON" in {
- val context =
- """
- | "context": {
- | "app": {
- | "name": "InitechGlobal",
- | "version": "545",
- | "build": "3.0.1.545"
- | },
- | "campaign": {
- | "name": "TPS Innovation Newsletter",
- | "source": "Newsletter",
- | "medium": "email",
- | "term": "tps reports",
- | "content": "image link"
- | },
- | "device": {
- | "id": "B5372DB0-C21E-11E4-8DFC-AA07A5B093DB",
- | "advertising_id": "7A3CBEA0-BDF5-11E4-8DFC-AA07A5B093DB",
- | "ad_tracking_enabled": true,
- | "manufacturer": "Apple",
- | "model": "iPhone7,2",
- | "name": "maguro",
- | "type": "ios",
- | "token": "ff15bc0c20c4aa6cd50854ff165fd265c838e5405bfeb9571066395b8c9da449"
- | },
- | "ip": "8.8.8.8",
- | "library": {
- | "name": "analytics-ios",
- | "version": "1.8.0"
- | },
- | "network": {
- | "bluetooth": false,
- | "carrier": "T-Mobile NL",
- | "cellular": true,
- | "wifi": false
- | },
- | "location": {
- | "city": "San Francisco",
- | "country": "United States",
- | "latitude": 40.2964197,
- | "longitude": -76.9411617,
- | "speed": 0
- | },
- | "os": {
- | "name": "iPhone OS",
- | "version": "8.1.3"
- | },
- | "referrer": {
- | "id": "ABCD582CDEFFFF01919",
- | "type": "dataxu"
- | },
- | "screen": {
- | "width": 320,
- | "height": 568,
- | "density": 2
- | },
- | "timezone": "Europe/Amsterdam",
- | "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)"
- | }
- """.stripMargin
-
- val group =
- s"""
- |{ $commonFields
- | "type": "group",
- | "group_id": "groupId",
- | "user_id": "userIdValue",
- | "timestamp" : "2012-12-02T00:30:08.276Z",
- | "traits": {
- | "name": "groupName",
- | "employees": 329,
- | },
- | $context
- |}
- """.stripMargin
-
- val expected =
- s"""
- |{
- | "event": "group",
- | "entityType": "user",
- | "entityId": "userIdValue",
- | "properties": {
- | $context,
- | "group_id": "groupId",
- | "traits": {
- | "name": "groupName",
- | "employees": 329
- | },
- | },
- | "eventTime" : "2012-12-02T00:30:08.276Z"
- |}
- """.stripMargin
-
- check(SegmentIOConnector, group, expected)
- }
-
- "convert group to event JSON" in {
- val group =
- s"""
- |{ $commonFields
- | "type": "group",
- | "group_id": "groupId",
- | "user_id": "userIdValue",
- | "timestamp" : "2012-12-02T00:30:08.276Z",
- | "traits": {
- | "name": "groupName",
- | "employees": 329,
- | }
- |}
- """.stripMargin
-
- val expected =
- """
- |{
- | "event": "group",
- | "entityType": "user",
- | "entityId": "userIdValue",
- | "properties": {
- | "group_id": "groupId",
- | "traits": {
- | "name": "groupName",
- | "employees": 329
- | }
- | },
- | "eventTime" : "2012-12-02T00:30:08.276Z"
- |}
- """.stripMargin
-
- check(SegmentIOConnector, group, expected)
- }
-
- "convert screen to event JSON" in {
- val screen =
- s"""
- |{ $commonFields
- | "type": "screen",
- | "name": "screenName",
- | "user_id": "userIdValue",
- | "timestamp" : "2012-12-02T00:30:08.276Z",
- | "properties": {
- | "variation": "screenVariation"
- | }
- |}
- """.stripMargin
-
- val expected =
- """
- |{
- | "event": "screen",
- | "entityType": "user",
- | "entityId": "userIdValue",
- | "properties": {
- | "properties": {
- | "variation": "screenVariation"
- | },
- | "name": "screenName"
- | },
- | "eventTime" : "2012-12-02T00:30:08.276Z"
- |}
- """.stripMargin
-
- check(SegmentIOConnector, screen, expected)
- }
-
- "convert page to event JSON" in {
- val page =
- s"""
- |{ $commonFields
- | "type": "page",
- | "name": "pageName",
- | "user_id": "userIdValue",
- | "timestamp" : "2012-12-02T00:30:08.276Z",
- | "properties": {
- | "title": "pageTitle",
- | "url": "pageUrl"
- | }
- |}
- """.stripMargin
-
- val expected =
- """
- |{
- | "event": "page",
- | "entityType": "user",
- | "entityId": "userIdValue",
- | "properties": {
- | "properties": {
- | "title": "pageTitle",
- | "url": "pageUrl"
- | },
- | "name": "pageName"
- | },
- | "eventTime" : "2012-12-02T00:30:08.276Z"
- |}
- """.stripMargin
-
- check(SegmentIOConnector, page, expected)
- }
-
- "convert alias to event JSON" in {
- val alias =
- s"""
- |{ $commonFields
- | "type": "alias",
- | "previous_id": "previousIdValue",
- | "user_id": "userIdValue",
- | "timestamp" : "2012-12-02T00:30:08.276Z"
- |}
- """.stripMargin
-
- val expected =
- """
- |{
- | "event": "alias",
- | "entityType": "user",
- | "entityId": "userIdValue",
- | "properties": {
- | "previous_id" : "previousIdValue"
- | },
- | "eventTime" : "2012-12-02T00:30:08.276Z"
- |}
- """.stripMargin
-
- check(SegmentIOConnector, alias, expected)
- }
-
- "convert track to event JSON" in {
- val track =
- s"""
- |{ $commonFields
- | "user_id": "some_user_id",
- | "type": "track",
- | "event": "Registered",
- | "timestamp" : "2012-12-02T00:30:08.276Z",
- | "properties": {
- | "plan": "Pro Annual",
- | "accountType" : "Facebook"
- | }
- |}
- """.stripMargin
-
- val expected =
- """
- |{
- | "event": "track",
- | "entityType": "user",
- | "entityId": "some_user_id",
- | "properties": {
- | "event": "Registered",
- | "properties": {
- | "plan": "Pro Annual",
- | "accountType": "Facebook"
- | }
- | },
- | "eventTime" : "2012-12-02T00:30:08.276Z"
- |}
- """.stripMargin
-
- check(SegmentIOConnector, track, expected)
- }
-
- "convert identify to event JSON" in {
- val identify = s"""
- { $commonFields
- "type" : "identify",
- "user_id" : "019mr8mf4r",
- "traits" : {
- "email" : "achilles@segment.com",
- "name" : "Achilles",
- "subscription_plan" : "Premium",
- "friendCount" : 29
- },
- "timestamp" : "2012-12-02T00:30:08.276Z"
- }
- """
-
- val expected = """
- {
- "event" : "identify",
- "entityType": "user",
- "entityId" : "019mr8mf4r",
- "properties" : {
- "traits" : {
- "email" : "achilles@segment.com",
- "name" : "Achilles",
- "subscription_plan" : "Premium",
- "friendCount" : 29
- }
- },
- "eventTime" : "2012-12-02T00:30:08.276Z"
- }
- """
-
- check(SegmentIOConnector, identify, expected)
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
new file mode 100644
index 0000000..62fd89c
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/api/EventServiceSpec.scala
@@ -0,0 +1,68 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.storage.Storage
+
+import akka.testkit.TestProbe
+import akka.actor.ActorSystem
+import akka.actor.Props
+
+import spray.http.HttpEntity
+import spray.http.HttpResponse
+import spray.http.ContentTypes
+import spray.httpx.RequestBuilding.Get
+
+import org.specs2.mutable.Specification
+
+class EventServiceSpec extends Specification {
+
+ val system = ActorSystem("EventServiceSpecSystem")
+
+ val eventClient = Storage.getLEvents()
+ val accessKeysClient = Storage.getMetaDataAccessKeys()
+ val channelsClient = Storage.getMetaDataChannels()
+
+ val eventServiceActor = system.actorOf(
+ Props(
+ new EventServiceActor(
+ eventClient,
+ accessKeysClient,
+ channelsClient,
+ EventServerConfig()
+ )
+ )
+ )
+
+ "GET / request" should {
+ "properly produce OK HttpResponses" in {
+ val probe = TestProbe()(system)
+ probe.send(eventServiceActor, Get("/"))
+ probe.expectMsg(
+ HttpResponse(
+ 200,
+ HttpEntity(
+ contentType = ContentTypes.`application/json`,
+ string = """{"status":"alive"}"""
+ )
+ )
+ )
+ success
+ }
+ }
+
+ step(system.shutdown())
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
new file mode 100644
index 0000000..bae0f0b
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/api/SegmentIOAuthSpec.scala
@@ -0,0 +1,175 @@
+package io.prediction.data.api
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import io.prediction.data.storage._
+import org.joda.time.DateTime
+import org.specs2.mutable.Specification
+import spray.http.HttpHeaders.RawHeader
+import spray.http.{ContentTypes, HttpEntity, HttpResponse}
+import spray.httpx.RequestBuilding._
+import sun.misc.BASE64Encoder
+
+import scala.concurrent.{Future, ExecutionContext}
+
+class SegmentIOAuthSpec extends Specification {
+
+ val system = ActorSystem("EventServiceSpecSystem")
+ sequential
+ isolated
+ val eventClient = new LEvents {
+ override def init(appId: Int, channelId: Option[Int]): Boolean = true
+
+ override def futureInsert(event: Event, appId: Int, channelId: Option[Int])
+ (implicit ec: ExecutionContext): Future[String] =
+ Future successful "event_id"
+
+ override def futureFind(
+ appId: Int, channelId: Option[Int], startTime: Option[DateTime],
+ untilTime: Option[DateTime], entityType: Option[String],
+ entityId: Option[String], eventNames: Option[Seq[String]],
+ targetEntityType: Option[Option[String]],
+ targetEntityId: Option[Option[String]], limit: Option[Int],
+ reversed: Option[Boolean])
+ (implicit ec: ExecutionContext): Future[Iterator[Event]] =
+ Future successful List.empty[Event].iterator
+
+ override def futureGet(eventId: String, appId: Int, channelId: Option[Int])
+ (implicit ec: ExecutionContext): Future[Option[Event]] =
+ Future successful None
+
+ override def remove(appId: Int, channelId: Option[Int]): Boolean = true
+
+ override def futureDelete(eventId: String, appId: Int, channelId: Option[Int])
+ (implicit ec: ExecutionContext): Future[Boolean] =
+ Future successful true
+
+ override def close(): Unit = {}
+ }
+ val appId = 0
+ val accessKeysClient = new AccessKeys {
+ override def insert(k: AccessKey): Option[String] = null
+ override def getByAppid(appid: Int): Seq[AccessKey] = null
+ override def update(k: AccessKey): Unit = {}
+ override def delete(k: String): Unit = {}
+ override def getAll(): Seq[AccessKey] = null
+
+ override def get(k: String): Option[AccessKey] =
+ k match {
+ case "abc" \u21d2 Some(AccessKey(k, appId, Seq.empty))
+ case _ \u21d2 None
+ }
+ }
+
+ val channelsClient = Storage.getMetaDataChannels()
+ val eventServiceActor = system.actorOf(
+ Props(
+ new EventServiceActor(
+ eventClient,
+ accessKeysClient,
+ channelsClient,
+ EventServerConfig()
+ )
+ )
+ )
+
+ val base64Encoder = new BASE64Encoder
+
+ "Event Service" should {
+
+ "reject with CredentialsRejected with invalid credentials" in {
+ val accessKey = "abc123:"
+ val probe = TestProbe()(system)
+ probe.send(
+ eventServiceActor,
+ Post("/webhooks/segmentio.json")
+ .withHeaders(
+ List(
+ RawHeader("Authorization", s"Basic $accessKey")
+ )
+ )
+ )
+ probe.expectMsg(
+ HttpResponse(
+ 401,
+ HttpEntity(
+ contentType = ContentTypes.`application/json`,
+ string = """{"message":"Invalid accessKey."}"""
+ )
+ )
+ )
+ success
+ }
+
+ "reject with CredentialsMissed without credentials" in {
+ val probe = TestProbe()(system)
+ probe.send(
+ eventServiceActor,
+ Post("/webhooks/segmentio.json")
+ )
+ probe.expectMsg(
+ HttpResponse(
+ 401,
+ HttpEntity(
+ contentType = ContentTypes.`application/json`,
+ string = """{"message":"Missing accessKey."}"""
+ )
+ )
+ )
+ success
+ }
+
+ "process SegmentIO identity request properly" in {
+ val jsonReq =
+ """
+ |{
+ | "anonymous_id": "507f191e810c19729de860ea",
+ | "channel": "browser",
+ | "context": {
+ | "ip": "8.8.8.8",
+ | "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)"
+ | },
+ | "message_id": "022bb90c-bbac-11e4-8dfc-aa07a5b093db",
+ | "timestamp": "2015-02-23T22:28:55.387Z",
+ | "sent_at": "2015-02-23T22:28:55.111Z",
+ | "traits": {
+ | "name": "Peter Gibbons",
+ | "email": "peter@initech.com",
+ | "plan": "premium",
+ | "logins": 5
+ | },
+ | "type": "identify",
+ | "user_id": "97980cfea0067",
+ | "version": "2"
+ |}
+ """.stripMargin
+
+ val accessKey = "abc:"
+ val accessKeyEncoded = base64Encoder.encodeBuffer(accessKey.getBytes)
+ val probe = TestProbe()(system)
+ probe.send(
+ eventServiceActor,
+ Post(
+ "/webhooks/segmentio.json",
+ HttpEntity(ContentTypes.`application/json`, jsonReq.getBytes)
+ ).withHeaders(
+ List(
+ RawHeader("Authorization", s"Basic $accessKeyEncoded")
+ )
+ )
+ )
+ probe.expectMsg(
+ HttpResponse(
+ 201,
+ HttpEntity(
+ contentType = ContentTypes.`application/json`,
+ string = """{"eventId":"event_id"}"""
+ )
+ )
+ )
+ success
+ }
+ }
+
+ step(system.shutdown())
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala
new file mode 100644
index 0000000..c98c882
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/BiMapSpec.scala
@@ -0,0 +1,196 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.specs2.mutable._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.SparkConf
+import org.apache.spark.rdd.RDD
+
+class BiMapSpec extends Specification {
+
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+ val sc = new SparkContext("local[4]", "BiMapSpec test")
+
+ "BiMap created with map" should {
+
+ val keys = Seq(1, 4, 6)
+ val orgValues = Seq(2, 5, 7)
+ val org = keys.zip(orgValues).toMap
+ val bi = BiMap(org)
+
+ "return correct values for each key of original map" in {
+ val biValues = keys.map(k => bi(k))
+
+ biValues must beEqualTo(orgValues)
+ }
+
+ "get return Option[V]" in {
+ val checkKeys = keys ++ Seq(12345)
+ val biValues = checkKeys.map(k => bi.get(k))
+ val expected = orgValues.map(Some(_)) ++ Seq(None)
+
+ biValues must beEqualTo(expected)
+ }
+
+ "getOrElse return value for each key of original map" in {
+ val biValues = keys.map(k => bi.getOrElse(k, -1))
+
+ biValues must beEqualTo(orgValues)
+ }
+
+ "getOrElse return default values for invalid key" in {
+ val keys = Seq(999, -1, -2)
+ val defaults = Seq(1234, 5678, 987)
+ val biValues = keys.zip(defaults).map{ case (k,d) => bi.getOrElse(k, d) }
+
+ biValues must beEqualTo(defaults)
+ }
+
+ "contains() returns true/false correctly" in {
+ val checkKeys = keys ++ Seq(12345)
+ val biValues = checkKeys.map(k => bi.contains(k))
+ val expected = orgValues.map(_ => true) ++ Seq(false)
+
+ biValues must beEqualTo(expected)
+ }
+
+ "same size as original map" in {
+ (bi.size) must beEqualTo(org.size)
+ }
+
+ "take(2) returns BiMap of size 2" in {
+ bi.take(2).size must beEqualTo(2)
+ }
+
+ "toMap contain same element as original map" in {
+ (bi.toMap) must beEqualTo(org)
+ }
+
+ "toSeq contain same element as original map" in {
+ (bi.toSeq) must containTheSameElementsAs(org.toSeq)
+ }
+
+ "inverse and return correct keys for each values of original map" in {
+ val biKeys = orgValues.map(v => bi.inverse(v))
+ biKeys must beEqualTo(keys)
+ }
+
+ "inverse with same size" in {
+ bi.inverse.size must beEqualTo(org.size)
+ }
+
+ "inverse's inverse reference back to the same original object" in {
+ // NOTE: reference equality
+ bi.inverse.inverse == bi
+ }
+ }
+
+ "BiMap created with duplicated values in map" should {
+ val dup = Map(1 -> 2, 4 -> 7, 6 -> 7)
+ "return IllegalArgumentException" in {
+ BiMap(dup) must throwA[IllegalArgumentException]
+ }
+ }
+
+ "BiMap.stringLong and stringInt" should {
+
+ "create BiMap from set of string" in {
+ val keys = Set("a", "b", "foo", "bar")
+ val values: Seq[Long] = Seq(0, 1, 2, 3)
+
+ val bi = BiMap.stringLong(keys)
+ val biValues = keys.map(k => bi(k))
+
+ val biInt = BiMap.stringInt(keys)
+ val valuesInt: Seq[Int] = values.map(_.toInt)
+ val biIntValues = keys.map(k => biInt(k))
+
+ biValues must containTheSameElementsAs(values) and
+ (biIntValues must containTheSameElementsAs(valuesInt))
+ }
+
+ "create BiMap from Array of unique string" in {
+ val keys = Array("a", "b", "foo", "bar")
+ val values: Seq[Long] = Seq(0, 1, 2, 3)
+
+ val bi = BiMap.stringLong(keys)
+ val biValues = keys.toSeq.map(k => bi(k))
+
+ val biInt = BiMap.stringInt(keys)
+ val valuesInt: Seq[Int] = values.map(_.toInt)
+ val biIntValues = keys.toSeq.map(k => biInt(k))
+
+ biValues must containTheSameElementsAs(values) and
+ (biIntValues must containTheSameElementsAs(valuesInt))
+ }
+
+ "not guarantee sequential index for Array with duplicated string" in {
+ val keys = Array("a", "b", "foo", "bar", "a", "b", "x")
+ val dupValues: Seq[Long] = Seq(0, 1, 2, 3, 4, 5, 6)
+ val values = keys.zip(dupValues).toMap.values.toSeq
+
+ val bi = BiMap.stringLong(keys)
+ val biValues = keys.toSet[String].map(k => bi(k))
+
+ val biInt = BiMap.stringInt(keys)
+ val valuesInt: Seq[Int] = values.map(_.toInt)
+ val biIntValues = keys.toSet[String].map(k => biInt(k))
+
+ biValues must containTheSameElementsAs(values) and
+ (biIntValues must containTheSameElementsAs(valuesInt))
+ }
+
+ "create BiMap from RDD[String]" in {
+
+ val keys = Seq("a", "b", "foo", "bar")
+ val values: Seq[Long] = Seq(0, 1, 2, 3)
+ val rdd = sc.parallelize(keys)
+
+ val bi = BiMap.stringLong(rdd)
+ val biValues = keys.map(k => bi(k))
+
+ val biInt = BiMap.stringInt(rdd)
+ val valuesInt: Seq[Int] = values.map(_.toInt)
+ val biIntValues = keys.map(k => biInt(k))
+
+ biValues must containTheSameElementsAs(values) and
+ (biIntValues must containTheSameElementsAs(valuesInt))
+ }
+
+ "create BiMap from RDD[String] with duplicated string" in {
+
+ val keys = Seq("a", "b", "foo", "bar", "a", "b", "x")
+ val values: Seq[Long] = Seq(0, 1, 2, 3, 4)
+ val rdd = sc.parallelize(keys)
+
+ val bi = BiMap.stringLong(rdd)
+ val biValues = keys.distinct.map(k => bi(k))
+
+ val biInt = BiMap.stringInt(rdd)
+ val valuesInt: Seq[Int] = values.map(_.toInt)
+ val biIntValues = keys.distinct.map(k => biInt(k))
+
+ biValues must containTheSameElementsAs(values) and
+ (biIntValues must containTheSameElementsAs(valuesInt))
+ }
+ }
+
+ step(sc.stop())
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala
new file mode 100644
index 0000000..46ae8dd
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/DataMapSpec.scala
@@ -0,0 +1,243 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.specs2.mutable._
+
+class DataMapSpec extends Specification {
+
+ "DataMap" should {
+
+ val properties = DataMap("""
+ {
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c", "c"],
+ "prop6" : 4.56
+ }
+ """)
+
+ "get Int data" in {
+ properties.get[Int]("prop1") must beEqualTo(1)
+ properties.getOpt[Int]("prop1") must beEqualTo(Some(1))
+ }
+
+ "get String data" in {
+ properties.get[String]("prop2") must beEqualTo("value2")
+ properties.getOpt[String]("prop2") must beEqualTo(Some("value2"))
+ }
+
+ "get List of Int data" in {
+ properties.get[List[Int]]("prop3") must beEqualTo(List(1,2,3))
+ properties.getOpt[List[Int]]("prop3") must beEqualTo(Some(List(1,2,3)))
+ }
+
+ "get Boolean data" in {
+ properties.get[Boolean]("prop4") must beEqualTo(true)
+ properties.getOpt[Boolean]("prop4") must beEqualTo(Some(true))
+ }
+
+ "get List of String data" in {
+ properties.get[List[String]]("prop5") must beEqualTo(List("a", "b", "c", "c"))
+ properties.getOpt[List[String]]("prop5") must beEqualTo(Some(List("a", "b", "c", "c")))
+ }
+
+ "get Set of String data" in {
+ properties.get[Set[String]]("prop5") must beEqualTo(Set("a", "b", "c"))
+ properties.getOpt[Set[String]]("prop5") must beEqualTo(Some(Set("a", "b", "c")))
+ }
+
+ "get Double data" in {
+ properties.get[Double]("prop6") must beEqualTo(4.56)
+ properties.getOpt[Double]("prop6") must beEqualTo(Some(4.56))
+ }
+
+ "get empty optional Int data" in {
+ properties.getOpt[Int]("prop9999") must beEqualTo(None)
+ }
+
+ }
+
+ "DataMap with multi-level data" should {
+ val properties = DataMap("""
+ {
+ "context": {
+ "ip": "1.23.4.56",
+ "prop1": 2.345
+ "prop2": "value1",
+ "prop4": [1, 2, 3]
+ },
+ "anotherPropertyA": 4.567,
+ "anotherPropertyB": false
+ }
+ """)
+
+ "get case class data" in {
+ val expected = DataMapSpec.Context(
+ ip = "1.23.4.56",
+ prop1 = Some(2.345),
+ prop2 = Some("value1"),
+ prop3 = None,
+ prop4 = List(1,2,3)
+ )
+
+ properties.get[DataMapSpec.Context]("context") must beEqualTo(expected)
+ }
+
+ "get empty optional case class data" in {
+ properties.getOpt[DataMapSpec.Context]("context999") must beEqualTo(None)
+ }
+
+ "get double data" in {
+ properties.get[Double]("anotherPropertyA") must beEqualTo(4.567)
+ }
+
+ "get boolean data" in {
+ properties.get[Boolean]("anotherPropertyB") must beEqualTo(false)
+ }
+ }
+
+ "DataMap extract" should {
+
+ "extract to case class object" in {
+ val properties = DataMap("""
+ {
+ "prop1" : 1,
+ "prop2" : "value2",
+ "prop3" : [1, 2, 3],
+ "prop4" : true,
+ "prop5" : ["a", "b", "c", "c"],
+ "prop6" : 4.56
+ }
+ """)
+
+ val result = properties.extract[DataMapSpec.BasicProperty]
+ val expected = DataMapSpec.BasicProperty(
+ prop1 = 1,
+ prop2 = "value2",
+ prop3 = List(1,2,3),
+ prop4 = true,
+ prop5 = List("a", "b", "c", "c"),
+ prop6 = 4.56
+ )
+
+ result must beEqualTo(expected)
+ }
+
+ "extract with optional fields" in {
+ val propertiesEmpty = DataMap("""{}""")
+ val propertiesSome = DataMap("""
+ {
+ "prop1" : 1,
+ "prop5" : ["a", "b", "c", "c"],
+ "prop6" : 4.56
+ }
+ """)
+
+ val resultEmpty = propertiesEmpty.extract[DataMapSpec.OptionProperty]
+ val expectedEmpty = DataMapSpec.OptionProperty(
+ prop1 = None,
+ prop2 = None,
+ prop3 = None,
+ prop4 = None,
+ prop5 = None,
+ prop6 = None
+ )
+
+ val resultSome = propertiesSome.extract[DataMapSpec.OptionProperty]
+ val expectedSome = DataMapSpec.OptionProperty(
+ prop1 = Some(1),
+ prop2 = None,
+ prop3 = None,
+ prop4 = None,
+ prop5 = Some(List("a", "b", "c", "c")),
+ prop6 = Some(4.56)
+ )
+
+ resultEmpty must beEqualTo(expectedEmpty)
+ resultSome must beEqualTo(expectedSome)
+ }
+
+ "extract to multi-level object" in {
+ val properties = DataMap("""
+ {
+ "context": {
+ "ip": "1.23.4.56",
+ "prop1": 2.345
+ "prop2": "value1",
+ "prop4": [1, 2, 3]
+ },
+ "anotherPropertyA": 4.567,
+ "anotherPropertyB": false
+ }
+ """)
+
+ val result = properties.extract[DataMapSpec.MultiLevelProperty]
+ val expected = DataMapSpec.MultiLevelProperty(
+ context = DataMapSpec.Context(
+ ip = "1.23.4.56",
+ prop1 = Some(2.345),
+ prop2 = Some("value1"),
+ prop3 = None,
+ prop4 = List(1,2,3)
+ ),
+ anotherPropertyA = 4.567,
+ anotherPropertyB = false
+ )
+
+ result must beEqualTo(expected)
+ }
+
+ }
+}
+
+object DataMapSpec {
+
+ // define this case class inside object to avoid case class name conflict with other tests
+ case class Context(
+ ip: String,
+ prop1: Option[Double],
+ prop2: Option[String],
+ prop3: Option[Int],
+ prop4: List[Int]
+ )
+
+ case class BasicProperty(
+ prop1: Int,
+ prop2: String,
+ prop3: List[Int],
+ prop4: Boolean,
+ prop5: List[String],
+ prop6: Double
+ )
+
+ case class OptionProperty(
+ prop1: Option[Int],
+ prop2: Option[String],
+ prop3: Option[List[Int]],
+ prop4: Option[Boolean],
+ prop5: Option[List[String]],
+ prop6: Option[Double]
+ )
+
+ case class MultiLevelProperty(
+ context: Context,
+ anotherPropertyA: Double,
+ anotherPropertyB: Boolean
+ )
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala
new file mode 100644
index 0000000..8c02186
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/LEventAggregatorSpec.scala
@@ -0,0 +1,103 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.specs2.mutable._
+
+import org.json4s.JObject
+import org.json4s.native.JsonMethods.parse
+
+import org.joda.time.DateTime
+
+class LEventAggregatorSpec extends Specification with TestEvents {
+
+ "LEventAggregator.aggregateProperties()" should {
+
+ "aggregate two entities' properties as DataMap correctly" in {
+ val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+ val result: Map[String, DataMap] =
+ LEventAggregator.aggregateProperties(events.toIterator)
+
+ val expected = Map(
+ "u1" -> DataMap(u1),
+ "u2" -> DataMap(u2)
+ )
+
+ result must beEqualTo(expected)
+ }
+
+ "aggregate two entities' properties as PropertyMap correctly" in {
+ val events = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+ val result: Map[String, PropertyMap] =
+ LEventAggregator.aggregateProperties(events.toIterator)
+
+ val expected = Map(
+ "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+ "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+ )
+
+ result must beEqualTo(expected)
+ }
+
+
+ "aggregate deleted entity correctly" in {
+ val events = Vector(u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+ val result = LEventAggregator.aggregateProperties(events.toIterator)
+ val expected = Map(
+ "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+ )
+
+ result must beEqualTo(expected)
+ }
+ }
+
+
+ "LEventAggregator.aggregatePropertiesSingle()" should {
+
+ "aggregate single entity properties as DataMap correctly" in {
+ val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2)
+ val eventsIt = events.toIterator
+
+ val result: Option[DataMap] = LEventAggregator
+ .aggregatePropertiesSingle(eventsIt)
+ val expected = DataMap(u1)
+
+ result must beEqualTo(Some(expected))
+ }
+
+ "aggregate single entity properties as PropertyMap correctly" in {
+ val events = Vector(u1e5, u1e3, u1e1, u1e4, u1e2)
+ val eventsIt = events.toIterator
+
+ val result: Option[PropertyMap] = LEventAggregator
+ .aggregatePropertiesSingle(eventsIt)
+ val expected = PropertyMap(u1, u1BaseTime, u1LastTime)
+
+ result must beEqualTo(Some(expected))
+ }
+
+ "aggregate deleted entity correctly" in {
+ // put the delete event in the middle
+ val events = Vector(u1e4, u1e2, u1ed, u1e3, u1e1, u1e5)
+ val eventsIt = events.toIterator
+
+ val result = LEventAggregator.aggregatePropertiesSingle(eventsIt)
+
+ result must beEqualTo(None)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
new file mode 100644
index 0000000..0639613
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/LEventsSpec.scala
@@ -0,0 +1,245 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.specs2._
+import org.specs2.specification.Step
+
+class LEventsSpec extends Specification with TestEvents {
+ def is = s2"""
+
+ PredictionIO Storage LEvents Specification
+
+ Events can be implemented by:
+ - HBLEvents ${hbEvents}
+ - JDBCLEvents ${jdbcLEvents}
+
+ """
+
+ def hbEvents = sequential ^ s2"""
+
+ HBLEvents should
+ - behave like any LEvents implementation ${events(hbDO)}
+ - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
+
+ """
+
+ def jdbcLEvents = sequential ^ s2"""
+
+ JDBCLEvents should
+ - behave like any LEvents implementation ${events(jdbcDO)}
+
+ """
+
+ val appId = 1
+
+ def events(eventClient: LEvents) = sequential ^ s2"""
+
+ init default ${initDefault(eventClient)}
+ insert 3 test events and get back by event ID ${insertAndGetEvents(eventClient)}
+ insert 3 test events with timezone and get back by event ID ${insertAndGetTimezone(eventClient)}
+ insert and delete by ID ${insertAndDelete(eventClient)}
+ insert test user events ${insertTestUserEvents(eventClient)}
+ find user events ${findUserEvents(eventClient)}
+ aggregate user properties ${aggregateUserProperties(eventClient)}
+ aggregate one user properties ${aggregateOneUserProperties(eventClient)}
+ aggregate non-existent user properties ${aggregateNonExistentUserProperties(eventClient)}
+ init channel ${initChannel(eventClient)}
+ insert 2 events to channel ${insertChannel(eventClient)}
+ insert 1 event to channel and delete by ID ${insertAndDeleteChannel(eventClient)}
+ find events from channel ${findChannel(eventClient)}
+ remove default ${removeDefault(eventClient)}
+ remove channel ${removeChannel(eventClient)}
+
+ """
+
+ val dbName = "test_pio_storage_events_" + hashCode
+ def hbDO = Storage.getDataObject[LEvents](
+ StorageTestUtils.hbaseSourceName,
+ dbName
+ )
+
+ def jdbcDO = Storage.getDataObject[LEvents](StorageTestUtils.jdbcSourceName, dbName)
+
+ def initDefault(eventClient: LEvents) = {
+ eventClient.init(appId)
+ }
+
+ def insertAndGetEvents(eventClient: LEvents) = {
+
+ // events from TestEvents trait
+ val listOfEvents = List(r1,r2,r3)
+
+ val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+ val insertedEventId: List[String] = insertResp
+
+ val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+ .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+ val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+ val getEvents = getResp
+
+ insertedEvent must containTheSameElementsAs(getEvents)
+ }
+
+ def insertAndGetTimezone(eventClient: LEvents) = {
+ val listOfEvents = List(tz1, tz2, tz3)
+
+ val insertResp = listOfEvents.map { eventClient.insert(_, appId) }
+
+ val insertedEventId: List[String] = insertResp
+
+ val insertedEvent: List[Option[Event]] = listOfEvents.zip(insertedEventId)
+ .map { case (e, id) => Some(e.copy(eventId = Some(id))) }
+
+ val getResp = insertedEventId.map { id => eventClient.get(id, appId) }
+
+ val getEvents = getResp
+
+ insertedEvent must containTheSameElementsAs(getEvents)
+ }
+
+ def insertAndDelete(eventClient: LEvents) = {
+ val eventId = eventClient.insert(r2, appId)
+
+ val resultBefore = eventClient.get(eventId, appId)
+
+ val expectedBefore = r2.copy(eventId = Some(eventId))
+
+ val deleteStatus = eventClient.delete(eventId, appId)
+
+ val resultAfter = eventClient.get(eventId, appId)
+
+ (resultBefore must beEqualTo(Some(expectedBefore))) and
+ (deleteStatus must beEqualTo(true)) and
+ (resultAfter must beEqualTo(None))
+ }
+
+ def insertTestUserEvents(eventClient: LEvents) = {
+ // events from TestEvents trait
+ val listOfEvents = Vector(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+ listOfEvents.map{ eventClient.insert(_, appId) }
+
+ success
+ }
+
+ def findUserEvents(eventClient: LEvents) = {
+
+ val results: List[Event] = eventClient.find(
+ appId = appId,
+ entityType = Some("user"))
+ .toList
+ .map(e => e.copy(eventId = None)) // ignore eventID
+
+ // same events in insertTestUserEvents
+ val expected = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2)
+
+ results must containTheSameElementsAs(expected)
+ }
+
+ def aggregateUserProperties(eventClient: LEvents) = {
+
+ val result: Map[String, PropertyMap] = eventClient.aggregateProperties(
+ appId = appId,
+ entityType = "user")
+
+ val expected = Map(
+ "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+ "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+ )
+
+ result must beEqualTo(expected)
+ }
+
+ def aggregateOneUserProperties(eventClient: LEvents) = {
+ val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+ appId = appId,
+ entityType = "user",
+ entityId = "u1")
+
+ val expected = Some(PropertyMap(u1, u1BaseTime, u1LastTime))
+
+ result must beEqualTo(expected)
+ }
+
+ def aggregateNonExistentUserProperties(eventClient: LEvents) = {
+ val result: Option[PropertyMap] = eventClient.aggregatePropertiesOfEntity(
+ appId = appId,
+ entityType = "user",
+ entityId = "u999999")
+
+ result must beEqualTo(None)
+ }
+
+ val channelId = 12
+
+ def initChannel(eventClient: LEvents) = {
+ eventClient.init(appId, Some(channelId))
+ }
+
+ def insertChannel(eventClient: LEvents) = {
+
+ // events from TestEvents trait
+ val listOfEvents = List(r4,r5)
+
+ listOfEvents.map( eventClient.insert(_, appId, Some(channelId)) )
+
+ success
+ }
+
+ def insertAndDeleteChannel(eventClient: LEvents) = {
+
+ val eventId = eventClient.insert(r2, appId, Some(channelId))
+
+ val resultBefore = eventClient.get(eventId, appId, Some(channelId))
+
+ val expectedBefore = r2.copy(eventId = Some(eventId))
+
+ val deleteStatus = eventClient.delete(eventId, appId, Some(channelId))
+
+ val resultAfter = eventClient.get(eventId, appId, Some(channelId))
+
+ (resultBefore must beEqualTo(Some(expectedBefore))) and
+ (deleteStatus must beEqualTo(true)) and
+ (resultAfter must beEqualTo(None))
+ }
+
+ def findChannel(eventClient: LEvents) = {
+
+ val results: List[Event] = eventClient.find(
+ appId = appId,
+ channelId = Some(channelId)
+ )
+ .toList
+ .map(e => e.copy(eventId = None)) // ignore eventId
+
+ // same events in insertChannel
+ val expected = List(r4, r5)
+
+ results must containTheSameElementsAs(expected)
+ }
+
+ def removeDefault(eventClient: LEvents) = {
+ eventClient.remove(appId)
+ }
+
+ def removeChannel(eventClient: LEvents) = {
+ eventClient.remove(appId, Some(channelId))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala
new file mode 100644
index 0000000..21790ad
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/PEventAggregatorSpec.scala
@@ -0,0 +1,72 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage
+
+import org.specs2.mutable._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+
+class PEventAggregatorSpec extends Specification with TestEvents {
+
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+ val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
+
+ "PEventAggregator" should {
+
+ "aggregate two entities' properties as DataMap/PropertyMap correctly" in {
+ val events = sc.parallelize(Seq(
+ u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2))
+
+ val users = PEventAggregator.aggregateProperties(events)
+
+ val userMap = users.collectAsMap.toMap
+ val expectedDM = Map(
+ "u1" -> DataMap(u1),
+ "u2" -> DataMap(u2)
+ )
+
+ val expectedPM = Map(
+ "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+ "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+ )
+
+ userMap must beEqualTo(expectedDM)
+ userMap must beEqualTo(expectedPM)
+ }
+
+ "aggregate deleted entity correctly" in {
+ // put the delete event in middle
+ val events = sc.parallelize(Seq(
+ u1e5, u2e2, u1e3, u1ed, u1e1, u2e3, u2e1, u1e4, u1e2))
+
+ val users = PEventAggregator.aggregateProperties(events)
+
+ val userMap = users.collectAsMap.toMap
+ val expectedPM = Map(
+ "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+ )
+
+ userMap must beEqualTo(expectedPM)
+ }
+
+ }
+
+ step(sc.stop())
+}