You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:37 UTC

[06/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala b/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala
new file mode 100644
index 0000000..93cbe6e
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/PEventsSpec.scala
@@ -0,0 +1,210 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.specs2._
+import org.specs2.specification.Step
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+class PEventsSpec extends Specification with TestEvents {
+
+  System.clearProperty("spark.driver.port")
+  System.clearProperty("spark.hostPort")
+  val sc = new SparkContext("local[4]", "PEventAggregatorSpec test")
+
+  val appId = 1
+  val channelId = 6
+  val dbName = "test_pio_storage_events_" + hashCode
+
+  def hbLocal = Storage.getDataObject[LEvents](
+    StorageTestUtils.hbaseSourceName,
+    dbName
+  )
+
+  def hbPar = Storage.getDataObject[PEvents](
+    StorageTestUtils.hbaseSourceName,
+    dbName
+  )
+
+  def jdbcLocal = Storage.getDataObject[LEvents](
+    StorageTestUtils.jdbcSourceName,
+    dbName
+  )
+
+  def jdbcPar = Storage.getDataObject[PEvents](
+    StorageTestUtils.jdbcSourceName,
+    dbName
+  )
+
+  def stopSpark = {
+    sc.stop()
+  }
+
+  def is = s2"""
+
+  PredictionIO Storage PEvents Specification
+
+    PEvents can be implemented by:
+    - HBPEvents ${hbPEvents}
+    - JDBCPEvents ${jdbcPEvents}
+    - (stop Spark) ${Step(sc.stop())}
+
+  """
+
+  def hbPEvents = sequential ^ s2"""
+
+    HBPEvents should
+    - behave like any PEvents implementation ${events(hbLocal, hbPar)}
+    - (table cleanup) ${Step(StorageTestUtils.dropHBaseNamespace(dbName))}
+
+  """
+
+  def jdbcPEvents = sequential ^ s2"""
+
+    JDBCPEvents should
+    - behave like any PEvents implementation ${events(jdbcLocal, jdbcPar)}
+    - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_$appId"))}
+    - (table cleanup) ${Step(StorageTestUtils.dropJDBCTable(s"${dbName}_${appId}_$channelId"))}
+
+  """
+
+  def events(localEventClient: LEvents, parEventClient: PEvents) = sequential ^ s2"""
+
+    - (init test) ${initTest(localEventClient)}
+    - (insert test events) ${insertTestEvents(localEventClient)}
+    find in default ${find(parEventClient)}
+    find in channel ${findChannel(parEventClient)}
+    aggregate user properties in default ${aggregateUserProperties(parEventClient)}
+    aggregate user properties in channel ${aggregateUserPropertiesChannel(parEventClient)}
+    write to default ${write(parEventClient)}
+    write to channel ${writeChannel(parEventClient)}
+
+  """
+
+  /* setup */
+
+  // events from TestEvents trait
+  val listOfEvents = List(u1e5, u2e2, u1e3, u1e1, u2e3, u2e1, u1e4, u1e2, r1, r2)
+  val listOfEventsChannel = List(u3e1, u3e2, u3e3, r3, r4)
+
+  def initTest(localEventClient: LEvents) = {
+    localEventClient.init(appId)
+    localEventClient.init(appId, Some(channelId))
+  }
+
+  def insertTestEvents(localEventClient: LEvents) = {
+    listOfEvents.map( localEventClient.insert(_, appId) )
+    // insert to channel
+    listOfEventsChannel.map( localEventClient.insert(_, appId, Some(channelId)) )
+    success
+  }
+
+  /* following are tests */
+
+  def find(parEventClient: PEvents) = {
+    val resultRDD: RDD[Event] = parEventClient.find(
+      appId = appId
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map {_.copy(eventId = None)} // ignore eventId
+
+    results must containTheSameElementsAs(listOfEvents)
+  }
+
+  def findChannel(parEventClient: PEvents) = {
+    val resultRDD: RDD[Event] = parEventClient.find(
+      appId = appId,
+      channelId = Some(channelId)
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map {_.copy(eventId = None)} // ignore eventId
+
+    results must containTheSameElementsAs(listOfEventsChannel)
+  }
+
+  def aggregateUserProperties(parEventClient: PEvents) = {
+    val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
+      appId = appId,
+      entityType = "user"
+    )(sc)
+    val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
+
+    val expected = Map(
+      "u1" -> PropertyMap(u1, u1BaseTime, u1LastTime),
+      "u2" -> PropertyMap(u2, u2BaseTime, u2LastTime)
+    )
+
+    result must beEqualTo(expected)
+  }
+
+  def aggregateUserPropertiesChannel(parEventClient: PEvents) = {
+    val resultRDD: RDD[(String, PropertyMap)] = parEventClient.aggregateProperties(
+      appId = appId,
+      channelId = Some(channelId),
+      entityType = "user"
+    )(sc)
+    val result: Map[String, PropertyMap] = resultRDD.collectAsMap.toMap
+
+    val expected = Map(
+      "u3" -> PropertyMap(u3, u3BaseTime, u3LastTime)
+    )
+
+    result must beEqualTo(expected)
+  }
+
+  def write(parEventClient: PEvents) = {
+    val written = List(r5, r6)
+    val writtenRDD = sc.parallelize(written)
+    parEventClient.write(writtenRDD, appId)(sc)
+
+    // read back
+    val resultRDD = parEventClient.find(
+      appId = appId
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map { _.copy(eventId = None)} // ignore eventId
+
+    val expected = listOfEvents ++ written
+
+    results must containTheSameElementsAs(expected)
+  }
+
+  def writeChannel(parEventClient: PEvents) = {
+    val written = List(r1, r5, r6)
+    val writtenRDD = sc.parallelize(written)
+    parEventClient.write(writtenRDD, appId, Some(channelId))(sc)
+
+    // read back
+    val resultRDD = parEventClient.find(
+      appId = appId,
+      channelId = Some(channelId)
+    )(sc)
+
+    val results = resultRDD.collect.toList
+      .map { _.copy(eventId = None)} // ignore eventId
+
+    val expected = listOfEventsChannel ++ written
+
+    results must containTheSameElementsAs(expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala b/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala
new file mode 100644
index 0000000..6068f4c
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/StorageTestUtils.scala
@@ -0,0 +1,42 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.apache.predictionio.data.storage.hbase.HBLEvents
+import scalikejdbc._
+
+object StorageTestUtils {
+  val hbaseSourceName = "HBASE"
+  val jdbcSourceName = "PGSQL"
+
+  def dropHBaseNamespace(namespace: String): Unit = {
+    val eventDb = Storage.getDataObject[LEvents](hbaseSourceName, namespace)
+      .asInstanceOf[HBLEvents]
+    val admin = eventDb.client.admin
+    val tableNames = admin.listTableNamesByNamespace(namespace)
+    tableNames.foreach { name =>
+      admin.disableTable(name)
+      admin.deleteTable(name)
+    }
+
+    //Only empty namespaces (no tables) can be removed.
+    admin.deleteNamespace(namespace)
+  }
+
+  def dropJDBCTable(table: String): Unit = DB autoCommit { implicit s =>
+    SQL(s"drop table $table").execute().apply()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/storage/TestEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/storage/TestEvents.scala b/data/src/test/scala/org/apache/predictionio/data/storage/TestEvents.scala
new file mode 100644
index 0000000..f1c327b
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/storage/TestEvents.scala
@@ -0,0 +1,263 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.storage
+
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+
+trait TestEvents {
+
+  val u1BaseTime = new DateTime(654321)
+  val u2BaseTime = new DateTime(6543210)
+  val u3BaseTime = new DateTime(6543410)
+
+  // u1 events
+  val u1e1 = Event(
+    event = "$set",
+    entityType = "user",
+    entityId = "u1",
+    properties = DataMap(
+      """{
+        "a" : 1,
+        "b" : "value2",
+        "d" : [1, 2, 3],
+      }"""),
+    eventTime = u1BaseTime
+  )
+
+  val u1e2 = u1e1.copy(
+    event = "$set",
+    properties = DataMap("""{"a" : 2}"""),
+    eventTime = u1BaseTime.plusDays(1)
+  )
+
+  val u1e3 = u1e1.copy(
+    event = "$set",
+    properties = DataMap("""{"b" : "value4"}"""),
+    eventTime = u1BaseTime.plusDays(2)
+  )
+
+  val u1e4 = u1e1.copy(
+    event = "$unset",
+    properties = DataMap("""{"b" : null}"""),
+    eventTime = u1BaseTime.plusDays(3)
+  )
+
+  val u1e5 = u1e1.copy(
+    event = "$set",
+    properties = DataMap("""{"e" : "new"}"""),
+    eventTime = u1BaseTime.plusDays(4)
+  )
+
+  val u1LastTime = u1BaseTime.plusDays(4)
+  val u1 = """{"a": 2, "d": [1, 2, 3], "e": "new"}"""
+
+  // delete event for u1
+  val u1ed = u1e1.copy(
+    event = "$delete",
+    properties = DataMap(),
+    eventTime = u1BaseTime.plusDays(5)
+  )
+
+  // u2 events
+  val u2e1 = Event(
+    event = "$set",
+    entityType = "user",
+    entityId = "u2",
+    properties = DataMap(
+      """{
+        "a" : 21,
+        "b" : "value12",
+        "d" : [7, 5, 6],
+      }"""),
+    eventTime = u2BaseTime
+  )
+
+  val u2e2 = u2e1.copy(
+    event = "$unset",
+    properties = DataMap("""{"a" : null}"""),
+    eventTime = u2BaseTime.plusDays(1)
+  )
+
+  val u2e3 = u2e1.copy(
+    event = "$set",
+    properties = DataMap("""{"b" : "value9", "g": "new11"}"""),
+    eventTime = u2BaseTime.plusDays(2)
+  )
+
+  val u2LastTime = u2BaseTime.plusDays(2)
+  val u2 = """{"b": "value9", "d": [7, 5, 6], "g": "new11"}"""
+
+  // u3 events
+  val u3e1 = Event(
+    event = "$set",
+    entityType = "user",
+    entityId = "u3",
+    properties = DataMap(
+      """{
+        "a" : 22,
+        "b" : "value13",
+        "d" : [5, 6, 1],
+      }"""),
+    eventTime = u3BaseTime
+  )
+
+  val u3e2 = u3e1.copy(
+    event = "$unset",
+    properties = DataMap("""{"a" : null}"""),
+    eventTime = u3BaseTime.plusDays(1)
+  )
+
+  val u3e3 = u3e1.copy(
+    event = "$set",
+    properties = DataMap("""{"b" : "value10", "f": "new12", "d" : [1, 3, 2]}"""),
+    eventTime = u3BaseTime.plusDays(2)
+  )
+
+  val u3LastTime = u3BaseTime.plusDays(2)
+  val u3 = """{"b": "value10", "d": [1, 3, 2], "f": "new12"}"""
+
+  // some random events
+  val r1 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id",
+    targetEntityType = Some("my_target_entity_type"),
+    targetEntityId = Some("my_target_entity_id"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = DateTime.now,
+    prId = Some("my_prid")
+  )
+  val r2 = Event(
+    event = "my_event2",
+    entityType = "my_entity_type2",
+    entityId = "my_entity_id2"
+  )
+  val r3 = Event(
+    event = "my_event3",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id",
+    targetEntityType = Some("my_target_entity_type"),
+    targetEntityId = Some("my_target_entity_id"),
+    properties = DataMap(
+      """{
+        "propA" : 1.2345,
+        "propB" : "valueB",
+      }"""
+    ),
+    prId = Some("my_prid")
+  )
+  val r4 = Event(
+    event = "my_event4",
+    entityType = "my_entity_type4",
+    entityId = "my_entity_id4",
+    targetEntityType = Some("my_target_entity_type4"),
+    targetEntityId = Some("my_target_entity_id4"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""),
+    eventTime = DateTime.now
+  )
+  val r5 = Event(
+    event = "my_event5",
+    entityType = "my_entity_type5",
+    entityId = "my_entity_id5",
+    targetEntityType = Some("my_target_entity_type5"),
+    targetEntityId = Some("my_target_entity_id5"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = DateTime.now
+  )
+  val r6 = Event(
+    event = "my_event6",
+    entityType = "my_entity_type6",
+    entityId = "my_entity_id6",
+    targetEntityType = Some("my_target_entity_type6"),
+    targetEntityId = Some("my_target_entity_id6"),
+    properties = DataMap(
+      """{
+        "prop1" : 6,
+        "prop2" : "value2",
+        "prop3" : [6, 7, 8],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = DateTime.now
+  )
+
+  // timezone
+  val tz1 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id0",
+    targetEntityType = Some("my_target_entity_type"),
+    targetEntityId = Some("my_target_entity_id"),
+    properties = DataMap(
+      """{
+        "prop1" : 1,
+        "prop2" : "value2",
+        "prop3" : [1, 2, 3],
+        "prop4" : true,
+        "prop5" : ["a", "b", "c"],
+        "prop6" : 4.56
+      }"""
+    ),
+    eventTime = new DateTime(12345678, DateTimeZone.forID("-08:00")),
+    prId = Some("my_prid")
+  )
+
+  val tz2 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id1",
+    eventTime = new DateTime(12345678, DateTimeZone.forID("+02:00")),
+    prId = Some("my_prid")
+  )
+
+  val tz3 = Event(
+    event = "my_event",
+    entityType = "my_entity_type",
+    entityId = "my_entity_id2",
+    eventTime = new DateTime(12345678, DateTimeZone.forID("+08:00")),
+    prId = Some("my_prid")
+  )
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/webhooks/ConnectorTestUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/webhooks/ConnectorTestUtil.scala b/data/src/test/scala/org/apache/predictionio/data/webhooks/ConnectorTestUtil.scala
new file mode 100644
index 0000000..0998c52
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/webhooks/ConnectorTestUtil.scala
@@ -0,0 +1,47 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks
+
+import org.specs2.execute.Result
+import org.specs2.mutable._
+
+import org.json4s.JObject
+import org.json4s.DefaultFormats
+import org.json4s.native.JsonMethods.parse
+import org.json4s.native.Serialization.write
+
+/** TestUtil for JsonConnector */
+trait ConnectorTestUtil extends Specification {
+
+  implicit val formats = DefaultFormats
+
+  def check(connector: JsonConnector, original: String, event: String): Result = {
+    val originalJson = parse(original).asInstanceOf[JObject]
+    val eventJson = parse(event).asInstanceOf[JObject]
+    // write and parse back to discard any JNothing field
+    val result = parse(write(connector.toEventJson(originalJson))).asInstanceOf[JObject]
+    result.obj must containTheSameElementsAs(eventJson.obj)
+  }
+
+  def check(connector: FormConnector, original: Map[String, String], event: String) = {
+
+    val eventJson = parse(event).asInstanceOf[JObject]
+    // write and parse back to discard any JNothing field
+    val result = parse(write(connector.toEventJson(original))).asInstanceOf[JObject]
+
+    result.obj must containTheSameElementsAs(eventJson.obj)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnectorSpec.scala b/data/src/test/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnectorSpec.scala
new file mode 100644
index 0000000..d99e2ca
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/webhooks/exampleform/ExampleFormConnectorSpec.scala
@@ -0,0 +1,164 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks.exampleform
+
+import org.apache.predictionio.data.webhooks.ConnectorTestUtil
+
+import org.specs2.mutable._
+
+/** Test the ExampleFormConnector */
+class ExampleFormConnectorSpec extends Specification with ConnectorTestUtil {
+
+  "ExampleFormConnector" should {
+
+    "convert userAction to Event JSON" in {
+      // webhooks input
+      val userAction = Map(
+        "type" -> "userAction",
+        "userId" -> "as34smg4",
+        "event" -> "do_something",
+        "context[ip]" -> "24.5.68.47", // optional
+        "context[prop1]" -> "2.345", // optional
+        "context[prop2]" -> "value1", // optional
+        "anotherProperty1" -> "100",
+        "anotherProperty2"-> "optional1", // optional
+        "timestamp" -> "2015-01-02T00:30:12.984Z"
+      )
+
+      // expected converted Event JSON
+      val expected = """
+        {
+          "event": "do_something",
+          "entityType": "user",
+          "entityId": "as34smg4",
+          "properties": {
+            "context": {
+              "ip": "24.5.68.47",
+              "prop1": 2.345
+              "prop2": "value1"
+            },
+            "anotherProperty1": 100,
+            "anotherProperty2": "optional1"
+          }
+          "eventTime": "2015-01-02T00:30:12.984Z"
+        }
+      """
+
+      check(ExampleFormConnector, userAction, expected)
+    }
+
+    "convert userAction without optional fields to Event JSON" in {
+      // webhooks input
+      val userAction = Map(
+        "type" -> "userAction",
+        "userId" -> "as34smg4",
+        "event" -> "do_something",
+        "anotherProperty1" -> "100",
+        "timestamp" -> "2015-01-02T00:30:12.984Z"
+      )
+
+      // expected converted Event JSON
+      val expected = """
+        {
+          "event": "do_something",
+          "entityType": "user",
+          "entityId": "as34smg4",
+          "properties": {
+            "anotherProperty1": 100,
+          }
+          "eventTime": "2015-01-02T00:30:12.984Z"
+        }
+      """
+
+      check(ExampleFormConnector, userAction, expected)
+    }
+
+    "convert userActionItem to Event JSON" in {
+      // webhooks input
+      val userActionItem = Map(
+        "type" -> "userActionItem",
+        "userId" -> "as34smg4",
+        "event" -> "do_something_on",
+        "itemId" -> "kfjd312bc",
+        "context[ip]" -> "1.23.4.56",
+        "context[prop1]" -> "2.345",
+        "context[prop2]" -> "value1",
+        "anotherPropertyA" -> "4.567", // optional
+        "anotherPropertyB" -> "false", // optional
+        "timestamp" -> "2015-01-15T04:20:23.567Z"
+      )
+
+      // expected converted Event JSON
+      val expected = """
+        {
+          "event": "do_something_on",
+          "entityType": "user",
+          "entityId": "as34smg4",
+          "targetEntityType": "item",
+          "targetEntityId": "kfjd312bc"
+          "properties": {
+            "context": {
+              "ip": "1.23.4.56",
+              "prop1": 2.345
+              "prop2": "value1"
+            },
+            "anotherPropertyA": 4.567
+            "anotherPropertyB": false
+          }
+          "eventTime": "2015-01-15T04:20:23.567Z"
+        }
+      """
+
+      check(ExampleFormConnector, userActionItem, expected)
+    }
+
+    "convert userActionItem without optional fields to Event JSON" in {
+      // webhooks input
+      val userActionItem = Map(
+        "type" -> "userActionItem",
+        "userId" -> "as34smg4",
+        "event" -> "do_something_on",
+        "itemId" -> "kfjd312bc",
+        "context[ip]" -> "1.23.4.56",
+        "context[prop1]" -> "2.345",
+        "context[prop2]" -> "value1",
+        "timestamp" -> "2015-01-15T04:20:23.567Z"
+      )
+
+      // expected converted Event JSON
+      val expected = """
+        {
+          "event": "do_something_on",
+          "entityType": "user",
+          "entityId": "as34smg4",
+          "targetEntityType": "item",
+          "targetEntityId": "kfjd312bc"
+          "properties": {
+            "context": {
+              "ip": "1.23.4.56",
+              "prop1": 2.345
+              "prop2": "value1"
+            }
+          }
+          "eventTime": "2015-01-15T04:20:23.567Z"
+        }
+      """
+
+      check(ExampleFormConnector, userActionItem, expected)
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala b/data/src/test/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala
new file mode 100644
index 0000000..069d52e
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/webhooks/examplejson/ExampleJsonConnectorSpec.scala
@@ -0,0 +1,179 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks.examplejson
+
+import org.apache.predictionio.data.webhooks.ConnectorTestUtil
+
+import org.specs2.mutable._
+
+/** Test the ExampleJsonConnector */
+class ExampleJsonConnectorSpec extends Specification with ConnectorTestUtil {
+
+  "ExampleJsonConnector" should {
+
+    "convert userAction to Event JSON" in {
+      // webhooks input
+      val userAction = """
+        {
+          "type": "userAction"
+          "userId": "as34smg4",
+          "event": "do_something",
+          "context": {
+            "ip": "24.5.68.47",
+            "prop1": 2.345
+            "prop2": "value1"
+          },
+          "anotherProperty1": 100,
+          "anotherProperty2": "optional1",
+          "timestamp": "2015-01-02T00:30:12.984Z"
+        }
+      """
+
+      // expected converted Event JSON
+      val expected = """
+        {
+          "event": "do_something",
+          "entityType": "user",
+          "entityId": "as34smg4",
+          "properties": {
+            "context": {
+              "ip": "24.5.68.47",
+              "prop1": 2.345
+              "prop2": "value1"
+            },
+            "anotherProperty1": 100,
+            "anotherProperty2": "optional1"
+          }
+          "eventTime": "2015-01-02T00:30:12.984Z"
+        }
+      """
+
+      check(ExampleJsonConnector, userAction, expected)
+    }
+
+    "convert userAction without optional field to Event JSON" in {
+      // webhooks input
+      val userAction = """
+        {
+          "type": "userAction"
+          "userId": "as34smg4",
+          "event": "do_something",
+          "anotherProperty1": 100,
+          "timestamp": "2015-01-02T00:30:12.984Z"
+        }
+      """
+
+      // expected converted Event JSON
+      val expected = """
+        {
+          "event": "do_something",
+          "entityType": "user",
+          "entityId": "as34smg4",
+          "properties": {
+            "anotherProperty1": 100,
+          }
+          "eventTime": "2015-01-02T00:30:12.984Z"
+        }
+      """
+
+      check(ExampleJsonConnector, userAction, expected)
+    }
+
+    "convert userActionItem to Event JSON" in {
+      // webhooks input
+      val userActionItem = """
+        {
+          "type": "userActionItem"
+          "userId": "as34smg4",
+          "event": "do_something_on",
+          "itemId": "kfjd312bc",
+          "context": {
+            "ip": "1.23.4.56",
+            "prop1": 2.345
+            "prop2": "value1"
+          },
+          "anotherPropertyA": 4.567
+          "anotherPropertyB": false
+          "timestamp": "2015-01-15T04:20:23.567Z"
+      }
+      """
+
+      // expected converted Event JSON
+      val expected = """
+        {
+          "event": "do_something_on",
+          "entityType": "user",
+          "entityId": "as34smg4",
+          "targetEntityType": "item",
+          "targetEntityId": "kfjd312bc"
+          "properties": {
+            "context": {
+              "ip": "1.23.4.56",
+              "prop1": 2.345
+              "prop2": "value1"
+            },
+            "anotherPropertyA": 4.567
+            "anotherPropertyB": false
+          }
+          "eventTime": "2015-01-15T04:20:23.567Z"
+        }
+      """
+
+      check(ExampleJsonConnector, userActionItem, expected)
+    }
+
+    "convert userActionItem without optional fields to Event JSON" in {
+      // webhooks input
+      val userActionItem = """
+        {
+          "type": "userActionItem"
+          "userId": "as34smg4",
+          "event": "do_something_on",
+          "itemId": "kfjd312bc",
+          "context": {
+            "ip": "1.23.4.56",
+            "prop1": 2.345
+            "prop2": "value1"
+          }
+          "timestamp": "2015-01-15T04:20:23.567Z"
+      }
+      """
+
+      // expected converted Event JSON
+      val expected = """
+        {
+          "event": "do_something_on",
+          "entityType": "user",
+          "entityId": "as34smg4",
+          "targetEntityType": "item",
+          "targetEntityId": "kfjd312bc"
+          "properties": {
+            "context": {
+              "ip": "1.23.4.56",
+              "prop1": 2.345
+              "prop2": "value1"
+            }
+          }
+          "eventTime": "2015-01-15T04:20:23.567Z"
+        }
+      """
+
+      check(ExampleJsonConnector, userActionItem, expected)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnectorSpec.scala b/data/src/test/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnectorSpec.scala
new file mode 100644
index 0000000..854c9dd
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/webhooks/mailchimp/MailChimpConnectorSpec.scala
@@ -0,0 +1,254 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks.mailchimp
+
+import org.apache.predictionio.data.webhooks.ConnectorTestUtil
+
+import org.specs2.mutable._
+
+class MailChimpConnectorSpec extends Specification with ConnectorTestUtil {
+
+  // TOOD: test other events
+  // TODO: test different optional fields
+
+  "MailChimpConnector" should {
+
+    "convert subscribe to event JSON" in {
+
+      val subscribe = Map(
+        "type" -> "subscribe",
+        "fired_at" -> "2009-03-26 21:35:57",
+        "data[id]" -> "8a25ff1d98",
+        "data[list_id]" -> "a6b5da1054",
+        "data[email]" -> "api@mailchimp.com",
+        "data[email_type]" -> "html",
+        "data[merges][EMAIL]" -> "api@mailchimp.com",
+        "data[merges][FNAME]" -> "MailChimp",
+        "data[merges][LNAME]" -> "API",
+        "data[merges][INTERESTS]" -> "Group1,Group2", //optional
+        "data[ip_opt]" -> "10.20.10.30",
+        "data[ip_signup]" -> "10.20.10.30"
+      )
+
+      val expected = """
+        {
+          "event" : "subscribe",
+          "entityType" : "user",
+          "entityId" : "8a25ff1d98",
+          "targetEntityType" : "list",
+          "targetEntityId" : "a6b5da1054",
+          "properties" : {
+            "email" : "api@mailchimp.com",
+            "email_type" : "html",
+            "merges" : {
+              "EMAIL" : "api@mailchimp.com",
+              "FNAME" : "MailChimp",
+              "LNAME" : "API"
+              "INTERESTS" : "Group1,Group2"
+            },
+            "ip_opt" : "10.20.10.30",
+            "ip_signup" : "10.20.10.30"
+          },
+          "eventTime" : "2009-03-26T21:35:57.000Z"
+        }
+      """
+
+      check(MailChimpConnector, subscribe, expected)
+    }
+
+    //check unsubscribe to event Json
+    "convert unsubscribe to event JSON" in {
+
+      val unsubscribe = Map(
+        "type" -> "unsubscribe",
+        "fired_at" -> "2009-03-26 21:40:57",
+        "data[action]" -> "unsub",
+        "data[reason]" -> "manual",
+        "data[id]" -> "8a25ff1d98",
+        "data[list_id]" -> "a6b5da1054",
+        "data[email]" -> "api+unsub@mailchimp.com",
+        "data[email_type]" -> "html",
+        "data[merges][EMAIL]" -> "api+unsub@mailchimp.com",
+        "data[merges][FNAME]" -> "MailChimp",
+        "data[merges][LNAME]" -> "API",
+        "data[merges][INTERESTS]" -> "Group1,Group2", //optional 
+        "data[ip_opt]" -> "10.20.10.30",
+        "data[campaign_id]" -> "cb398d21d2"
+      )
+
+      val expected = """
+        {
+          "event" : "unsubscribe",
+          "entityType" : "user",
+          "entityId" : "8a25ff1d98",
+          "targetEntityType" : "list",
+          "targetEntityId" : "a6b5da1054",
+          "properties" : {
+            "action" : "unsub",
+            "reason" : "manual",
+            "email" : "api+unsub@mailchimp.com",
+            "email_type" : "html",
+            "merges" : {
+              "EMAIL" : "api+unsub@mailchimp.com",
+              "FNAME" : "MailChimp",
+              "LNAME" : "API"
+              "INTERESTS" : "Group1,Group2"
+            },
+            "ip_opt" : "10.20.10.30",
+            "campaign_id" : "cb398d21d2"
+          },
+          "eventTime" : "2009-03-26T21:40:57.000Z"
+        }
+      """
+
+      check(MailChimpConnector, unsubscribe, expected)
+    }
+
+    //check profile update to event Json 
+    "convert profile update to event JSON" in {
+
+      val profileUpdate = Map(
+        "type" -> "profile",
+        "fired_at" -> "2009-03-26 21:31:21",
+        "data[id]" -> "8a25ff1d98",
+        "data[list_id]" -> "a6b5da1054",
+        "data[email]" -> "api@mailchimp.com",
+        "data[email_type]" -> "html",
+        "data[merges][EMAIL]" -> "api@mailchimp.com",
+        "data[merges][FNAME]" -> "MailChimp",
+        "data[merges][LNAME]" -> "API",
+        "data[merges][INTERESTS]" -> "Group1,Group2", //optional
+        "data[ip_opt]" -> "10.20.10.30"
+      )
+
+      val expected = """
+        {
+          "event" : "profile",
+          "entityType" : "user",
+          "entityId" : "8a25ff1d98",
+          "targetEntityType" : "list",
+          "targetEntityId" : "a6b5da1054",
+          "properties" : {
+            "email" : "api@mailchimp.com",
+            "email_type" : "html",
+            "merges" : {
+              "EMAIL" : "api@mailchimp.com",
+              "FNAME" : "MailChimp",
+              "LNAME" : "API"
+              "INTERESTS" : "Group1,Group2"
+            },
+            "ip_opt" : "10.20.10.30"
+          },
+          "eventTime" : "2009-03-26T21:31:21.000Z"
+        }
+      """
+
+      check(MailChimpConnector, profileUpdate, expected)
+    }
+
+    //check email update to event Json 
+    "convert email update to event JSON" in {
+
+      val emailUpdate = Map(
+        "type" -> "upemail",
+        "fired_at" -> "2009-03-26 22:15:09",
+        "data[list_id]" -> "a6b5da1054",
+        "data[new_id]" -> "51da8c3259",
+        "data[new_email]" -> "api+new@mailchimp.com",
+        "data[old_email]" -> "api+old@mailchimp.com"
+      )
+
+      val expected = """
+        {
+          "event" : "upemail",
+          "entityType" : "user",
+          "entityId" : "51da8c3259",
+          "targetEntityType" : "list",
+          "targetEntityId" : "a6b5da1054",
+          "properties" : {
+            "new_email" : "api+new@mailchimp.com",
+            "old_email" : "api+old@mailchimp.com"
+          },
+          "eventTime" : "2009-03-26T22:15:09.000Z"
+        }
+      """
+
+      check(MailChimpConnector, emailUpdate, expected)
+    }
+
+    //check cleaned email to event Json 
+    "convert cleaned email to event JSON" in {
+
+      val cleanedEmail = Map(
+        "type" -> "cleaned",
+        "fired_at" -> "2009-03-26 22:01:00",
+        "data[list_id]" -> "a6b5da1054",
+        "data[campaign_id]" -> "4fjk2ma9xd",
+        "data[reason]" -> "hard",
+        "data[email]" -> "api+cleaned@mailchimp.com"
+      )
+
+      val expected = """
+        {
+          "event" : "cleaned",
+          "entityType" : "list",
+          "entityId" : "a6b5da1054",
+          "properties" : {
+            "campaignId" : "4fjk2ma9xd",
+            "reason" : "hard",
+            "email" : "api+cleaned@mailchimp.com"
+          },
+          "eventTime" : "2009-03-26T22:01:00.000Z"
+        }
+      """
+
+      check(MailChimpConnector, cleanedEmail, expected)
+    }
+
+    //check campaign sending status to event Json 
+    "convert campaign sending status to event JSON" in {
+
+      val campaign = Map(
+        "type" -> "campaign",
+        "fired_at" -> "2009-03-26 22:15:09",
+        "data[id]" -> "5aa2102003",
+        "data[subject]" -> "Test Campaign Subject",
+        "data[status]" -> "sent",
+        "data[reason]" -> "",
+        "data[list_id]" -> "a6b5da1054"
+      )
+
+      val expected = """
+        {
+          "event" : "campaign",
+          "entityType" : "campaign",
+          "entityId" : "5aa2102003",
+          "targetEntityType" : "list",
+          "targetEntityId" : "a6b5da1054",
+          "properties" : {
+            "subject" : "Test Campaign Subject",
+            "status" : "sent",
+            "reason" : ""
+          },
+          "eventTime" : "2009-03-26T22:15:09.000Z"
+        }
+      """
+
+      check(MailChimpConnector, campaign, expected)
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/test/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnectorSpec.scala
----------------------------------------------------------------------
diff --git a/data/src/test/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnectorSpec.scala b/data/src/test/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnectorSpec.scala
new file mode 100644
index 0000000..de92ecd
--- /dev/null
+++ b/data/src/test/scala/org/apache/predictionio/data/webhooks/segmentio/SegmentIOConnectorSpec.scala
@@ -0,0 +1,335 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.webhooks.segmentio
+
+import org.apache.predictionio.data.webhooks.ConnectorTestUtil
+
+import org.specs2.mutable._
+
+class SegmentIOConnectorSpec extends Specification with ConnectorTestUtil {
+
+  // TODO: test different optional fields
+
+  val commonFields =
+    s"""
+       |  "anonymous_id": "id",
+       |  "sent_at": "sendAt",
+       |  "version": "2",
+     """.stripMargin
+
+  "SegmentIOConnector" should {
+
+    "convert group with context to event JSON" in {
+      val context =
+        """
+          |  "context": {
+          |    "app": {
+          |      "name": "InitechGlobal",
+          |      "version": "545",
+          |      "build": "3.0.1.545"
+          |    },
+          |    "campaign": {
+          |      "name": "TPS Innovation Newsletter",
+          |      "source": "Newsletter",
+          |      "medium": "email",
+          |      "term": "tps reports",
+          |      "content": "image link"
+          |    },
+          |    "device": {
+          |      "id": "B5372DB0-C21E-11E4-8DFC-AA07A5B093DB",
+          |      "advertising_id": "7A3CBEA0-BDF5-11E4-8DFC-AA07A5B093DB",
+          |      "ad_tracking_enabled": true,
+          |      "manufacturer": "Apple",
+          |      "model": "iPhone7,2",
+          |      "name": "maguro",
+          |      "type": "ios",
+          |      "token": "ff15bc0c20c4aa6cd50854ff165fd265c838e5405bfeb9571066395b8c9da449"
+          |    },
+          |    "ip": "8.8.8.8",
+          |    "library": {
+          |      "name": "analytics-ios",
+          |      "version": "1.8.0"
+          |    },
+          |    "network": {
+          |      "bluetooth": false,
+          |      "carrier": "T-Mobile NL",
+          |      "cellular": true,
+          |      "wifi": false
+          |    },
+          |    "location": {
+          |      "city": "San Francisco",
+          |      "country": "United States",
+          |      "latitude": 40.2964197,
+          |      "longitude": -76.9411617,
+          |      "speed": 0
+          |    },
+          |    "os": {
+          |      "name": "iPhone OS",
+          |      "version": "8.1.3"
+          |    },
+          |    "referrer": {
+          |      "id": "ABCD582CDEFFFF01919",
+          |      "type": "dataxu"
+          |    },
+          |    "screen": {
+          |      "width": 320,
+          |      "height": 568,
+          |      "density": 2
+          |    },
+          |    "timezone": "Europe/Amsterdam",
+          |    "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)"
+          |  }
+        """.stripMargin
+
+      val group =
+        s"""
+           |{ $commonFields
+            |  "type": "group",
+            |  "group_id": "groupId",
+            |  "user_id": "userIdValue",
+            |  "timestamp" : "2012-12-02T00:30:08.276Z",
+            |  "traits": {
+            |    "name": "groupName",
+            |    "employees": 329,
+            |  },
+            |  $context
+            |}
+        """.stripMargin
+
+      val expected =
+        s"""
+          |{
+          |  "event": "group",
+          |  "entityType": "user",
+          |  "entityId": "userIdValue",
+          |  "properties": {
+          |    $context,
+          |    "group_id": "groupId",
+          |    "traits": {
+          |      "name": "groupName",
+          |      "employees": 329
+          |    },
+          |  },
+          |  "eventTime" : "2012-12-02T00:30:08.276Z"
+          |}
+        """.stripMargin
+
+      check(SegmentIOConnector, group, expected)
+    }
+
+    "convert group to event JSON" in {
+      val group =
+        s"""
+          |{ $commonFields
+          |  "type": "group",
+          |  "group_id": "groupId",
+          |  "user_id": "userIdValue",
+          |  "timestamp" : "2012-12-02T00:30:08.276Z",
+          |  "traits": {
+          |    "name": "groupName",
+          |    "employees": 329,
+          |  }
+          |}
+        """.stripMargin
+
+      val expected =
+        """
+          |{
+          |  "event": "group",
+          |  "entityType": "user",
+          |  "entityId": "userIdValue",
+          |  "properties": {
+          |    "group_id": "groupId",
+          |    "traits": {
+          |      "name": "groupName",
+          |      "employees": 329
+          |    }
+          |  },
+          |  "eventTime" : "2012-12-02T00:30:08.276Z"
+          |}
+        """.stripMargin
+
+      check(SegmentIOConnector, group, expected)
+    }
+
+    "convert screen to event JSON" in {
+      val screen =
+        s"""
+          |{ $commonFields
+          |  "type": "screen",
+          |  "name": "screenName",
+          |  "user_id": "userIdValue",
+          |  "timestamp" : "2012-12-02T00:30:08.276Z",
+          |  "properties": {
+          |    "variation": "screenVariation"
+          |  }
+          |}
+        """.stripMargin
+
+      val expected =
+        """
+          |{
+          |  "event": "screen",
+          |  "entityType": "user",
+          |  "entityId": "userIdValue",
+          |  "properties": {
+          |    "properties": {
+          |      "variation": "screenVariation"
+          |    },
+          |    "name": "screenName"
+          |  },
+          |  "eventTime" : "2012-12-02T00:30:08.276Z"
+          |}
+        """.stripMargin
+
+      check(SegmentIOConnector, screen, expected)
+    }
+
+    "convert page to event JSON" in {
+      val page =
+       s"""
+          |{ $commonFields
+          |  "type": "page",
+          |  "name": "pageName",
+          |  "user_id": "userIdValue",
+          |  "timestamp" : "2012-12-02T00:30:08.276Z",
+          |  "properties": {
+          |    "title": "pageTitle",
+          |    "url": "pageUrl"
+          |  }
+          |}
+        """.stripMargin
+
+      val expected =
+        """
+          |{
+          |  "event": "page",
+          |  "entityType": "user",
+          |  "entityId": "userIdValue",
+          |  "properties": {
+          |    "properties": {
+          |      "title": "pageTitle",
+          |      "url": "pageUrl"
+          |    },
+          |    "name": "pageName"
+          |  },
+          |  "eventTime" : "2012-12-02T00:30:08.276Z"
+          |}
+        """.stripMargin
+
+      check(SegmentIOConnector, page, expected)
+    }
+
+    "convert alias to event JSON" in {
+      val alias =
+        s"""
+          |{ $commonFields
+          |  "type": "alias",
+          |  "previous_id": "previousIdValue",
+          |  "user_id": "userIdValue",
+          |  "timestamp" : "2012-12-02T00:30:08.276Z"
+          |}
+        """.stripMargin
+
+      val expected =
+        """
+          |{
+          |  "event": "alias",
+          |  "entityType": "user",
+          |  "entityId": "userIdValue",
+          |  "properties": {
+          |    "previous_id" : "previousIdValue"
+          |  },
+          |  "eventTime" : "2012-12-02T00:30:08.276Z"
+          |}
+        """.stripMargin
+
+      check(SegmentIOConnector, alias, expected)
+    }
+
+    "convert track to event JSON" in {
+      val track =
+       s"""
+          |{ $commonFields
+          |  "user_id": "some_user_id",
+          |  "type": "track",
+          |  "event": "Registered",
+          |  "timestamp" : "2012-12-02T00:30:08.276Z",
+          |  "properties": {
+          |    "plan": "Pro Annual",
+          |    "accountType" : "Facebook"
+          |  }
+          |}
+        """.stripMargin
+
+      val expected =
+        """
+          |{
+          |  "event": "track",
+          |  "entityType": "user",
+          |  "entityId": "some_user_id",
+          |  "properties": {
+          |    "event": "Registered",
+          |    "properties": {
+          |      "plan": "Pro Annual",
+          |      "accountType": "Facebook"
+          |    }
+          |  },
+          |  "eventTime" : "2012-12-02T00:30:08.276Z"
+          |}
+        """.stripMargin
+
+      check(SegmentIOConnector, track, expected)
+    }
+
+    "convert identify to event JSON" in {
+      val identify = s"""
+        { $commonFields
+          "type"      : "identify",
+          "user_id"    : "019mr8mf4r",
+          "traits"    : {
+              "email"            : "achilles@segment.com",
+              "name"             : "Achilles",
+              "subscription_plan" : "Premium",
+              "friendCount"      : 29
+          },
+          "timestamp" : "2012-12-02T00:30:08.276Z"
+        }
+      """
+
+      val expected = """
+        {
+          "event" : "identify",
+          "entityType": "user",
+          "entityId" : "019mr8mf4r",
+          "properties" : {
+            "traits" : {
+              "email"            : "achilles@segment.com",
+              "name"             : "Achilles",
+              "subscription_plan" : "Premium",
+              "friendCount"      : 29
+            }
+          },
+          "eventTime" : "2012-12-02T00:30:08.276Z"
+        }
+      """
+
+      check(SegmentIOConnector, identify, expected)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/io/prediction/e2/engine/BinaryVectorizer.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/io/prediction/e2/engine/BinaryVectorizer.scala b/e2/src/main/scala/io/prediction/e2/engine/BinaryVectorizer.scala
deleted file mode 100644
index 6c0d5d3..0000000
--- a/e2/src/main/scala/io/prediction/e2/engine/BinaryVectorizer.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package io.prediction.e2.engine
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.linalg.Vectors
-import org.apache.spark.mllib.linalg.Vector
-import scala.collection.immutable.HashMap
-import scala.collection.immutable.HashSet
-
-class BinaryVectorizer(propertyMap : HashMap[(String, String), Int])
-extends Serializable {
-
-  val properties: Array[(String, String)] = propertyMap.toArray.sortBy(_._2).map(_._1)
-  val numFeatures = propertyMap.size
-
-  override def toString: String = {
-    s"BinaryVectorizer($numFeatures): " + properties.map(e => s"(${e._1}, ${e._2})").mkString(",")
-  }
-
-  def toBinary(map :  Array[(String, String)]) : Vector = {
-    val mapArr : Seq[(Int, Double)] = map.flatMap(
-      e => propertyMap.get(e).map(idx => (idx, 1.0))
-    )
-
-    Vectors.sparse(numFeatures, mapArr)
-  }
-}
-
-
-object BinaryVectorizer {
-  def apply (input : RDD[HashMap[String, String]], properties : HashSet[String])
-  : BinaryVectorizer = {
-    new BinaryVectorizer(HashMap(
-      input.flatMap(identity)
-        .filter(e => properties.contains(e._1))
-        .distinct
-        .collect
-        .zipWithIndex : _*
-    ))
-  }
-
-  def apply(input: Seq[(String, String)]): BinaryVectorizer = {
-    val indexed: Seq[((String, String), Int)] = input.zipWithIndex
-    new BinaryVectorizer(HashMap(indexed:_*))
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/io/prediction/e2/engine/CategoricalNaiveBayes.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/io/prediction/e2/engine/CategoricalNaiveBayes.scala b/e2/src/main/scala/io/prediction/e2/engine/CategoricalNaiveBayes.scala
deleted file mode 100644
index c598519..0000000
--- a/e2/src/main/scala/io/prediction/e2/engine/CategoricalNaiveBayes.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package io.prediction.e2.engine
-
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-/**
- * Class for training a naive Bayes model with categorical variables
- */
-object CategoricalNaiveBayes {
-  /**
-   * Train with data points and return the model
-   *
-   * @param points training data points
-   */
-  def train(points: RDD[LabeledPoint]): CategoricalNaiveBayesModel = {
-    val labelCountFeatureLikelihoods = points.map { p =>
-      (p.label, p.features)
-    }.combineByKey[(Long, Array[Map[String, Long]])](
-        createCombiner =
-          (features: Array[String]) => {
-            val featureCounts = features.map { feature =>
-              Map[String, Long]().withDefaultValue(0L).updated(feature, 1L)
-            }
-
-            (1L, featureCounts)
-          },
-        mergeValue =
-          (c: (Long, Array[Map[String, Long]]), features: Array[String]) => {
-            (c._1 + 1L, c._2.zip(features).map { case (m, feature) =>
-              m.updated(feature, m(feature) + 1L)
-            })
-          },
-        mergeCombiners =
-          (
-            c1: (Long, Array[Map[String, Long]]),
-            c2: (Long, Array[Map[String, Long]])) => {
-            val labelCount1 = c1._1
-            val labelCount2 = c2._1
-            val featureCounts1 = c1._2
-            val featureCounts2 = c2._2
-
-            (labelCount1 + labelCount2,
-              featureCounts1.zip(featureCounts2).map { case (m1, m2) =>
-                m2 ++ m2.map { case (k, v) => k -> (v + m2(k))}
-              })
-          }
-      ).mapValues { case (labelCount, featureCounts) =>
-      val featureLikelihoods = featureCounts.map { featureCount =>
-        // mapValues does not return a serializable map
-        featureCount.mapValues(count => math.log(count.toDouble / labelCount))
-          .map(identity)
-      }
-
-      (labelCount, featureLikelihoods)
-    }.collect().toMap
-
-    val noOfPoints = labelCountFeatureLikelihoods.map(_._2._1).sum
-    val priors =
-      labelCountFeatureLikelihoods.mapValues { countFeatureLikelihoods =>
-        math.log(countFeatureLikelihoods._1 / noOfPoints.toDouble)
-      }
-    val likelihoods = labelCountFeatureLikelihoods.mapValues(_._2)
-
-    CategoricalNaiveBayesModel(priors, likelihoods)
-  }
-}
-
-/**
- * Model for naive Bayes classifiers with categorical variables.
- *
- * @param priors log prior probabilities
- * @param likelihoods log likelihood probabilities
- */
-case class CategoricalNaiveBayesModel(
-  priors: Map[String, Double],
-  likelihoods: Map[String, Array[Map[String, Double]]]) extends Serializable {
-
-  val featureCount = likelihoods.head._2.size
-
-  /**
-   * Calculate the log score of having the given features and label
-   *
-   * @param point label and features
-   * @param defaultLikelihood a function that calculates the likelihood when a
-   *                          feature value is not present. The input to the
-   *                          function is the other feature value likelihoods.
-   * @return log score when label is present. None otherwise.
-   */
-  def logScore(
-    point: LabeledPoint,
-    defaultLikelihood: (Seq[Double]) => Double = ls => Double.NegativeInfinity
-    ): Option[Double] = {
-    val label = point.label
-    val features = point.features
-
-    if (!priors.contains(label)) {
-      None
-    } else {
-      Some(logScoreInternal(label, features, defaultLikelihood))
-    }
-  }
-
-  private def logScoreInternal(
-    label: String,
-    features: Array[String],
-    defaultLikelihood: (Seq[Double]) => Double = ls => Double.NegativeInfinity
-    ): Double = {
-
-    val prior = priors(label)
-    val likelihood = likelihoods(label)
-
-    val likelihoodScores = features.zip(likelihood).map {
-      case (feature, featureLikelihoods) =>
-        featureLikelihoods.getOrElse(
-          feature,
-          defaultLikelihood(featureLikelihoods.values.toSeq)
-        )
-    }
-
-    prior + likelihoodScores.sum
-  }
-
-  /**
-   * Return the label that yields the highest score
-   *
-   * @param features features for classification
-   *
-   */
-  def predict(features: Array[String]): String = {
-    priors.keySet.map { label =>
-      (label, logScoreInternal(label, features))
-    }.toSeq
-      .sortBy(_._2)(Ordering.Double.reverse)
-      .take(1)
-      .head
-      ._1
-  }
-}
-
-/**
- * Class that represents the features and labels of a data point.
- *
- * @param label Label of this data point
- * @param features Features of this data point
- */
-case class LabeledPoint(label: String, features: Array[String]) {
-  override def toString: String = {
-    val featuresString = features.mkString("[", ",", "]")
-
-    s"($label, $featuresString)"
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case that: LabeledPoint => that.toString == this.toString
-    case _ => false
-  }
-
-  override def hashCode(): Int = {
-    this.toString.hashCode
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/io/prediction/e2/engine/MarkovChain.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/io/prediction/e2/engine/MarkovChain.scala b/e2/src/main/scala/io/prediction/e2/engine/MarkovChain.scala
deleted file mode 100644
index 4c992f5..0000000
--- a/e2/src/main/scala/io/prediction/e2/engine/MarkovChain.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package io.prediction.e2.engine
-
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
-import org.apache.spark.mllib.linalg.{SparseVector, Vectors}
-import org.apache.spark.rdd.RDD
-
-/**
- * Class for training a Markov Chain model
- */
-object MarkovChain {
-  /**
-   * Train a Markov Chain model
-   *
-   * @param matrix Tally of all state transitions
-   * @param topN Use the top-N tally for each state
-   */
-  def train(matrix: CoordinateMatrix, topN: Int): MarkovChainModel = {
-    val noOfStates = matrix.numCols().toInt
-    val transitionVectors = matrix.entries
-      .keyBy(_.i.toInt)
-      .groupByKey()
-      .mapValues { rowEntries =>
-      val total = rowEntries.map(_.value).sum
-      val sortedTopN = rowEntries.toSeq
-        .sortBy(_.value)(Ordering.Double.reverse)
-        .take(topN)
-        .map(me => (me.j.toInt, me.value / total))
-        .sortBy(_._1)
-
-      new SparseVector(
-        noOfStates,
-        sortedTopN.map(_._1).toArray,
-        sortedTopN.map(_._2).toArray)
-    }
-
-    new MarkovChainModel(
-      transitionVectors,
-      topN)
-  }
-}
-
-/**
- * Markov Chain model
- *
- * @param transitionVectors transition vectors
- * @param n top N used to construct the model
- */
-case class MarkovChainModel(
-  transitionVectors: RDD[(Int, SparseVector)],
-  n: Int) {
-
-  /**
-   * Calculate the probabilities of the next state
-   *
-   * @param currentState probabilities of the current state
-   */
-  def predict(currentState: Seq[Double]): Seq[Double] = {
-    // multiply the input with transition matrix row by row
-    val nextStateVectors = transitionVectors.map { case (rowIndex, vector) =>
-        val values = vector.indices.map { index =>
-          vector(index) * currentState(rowIndex)
-        }
-
-        Vectors.sparse(currentState.size, vector.indices, values)
-    }.collect()
-
-    // sum up to get the total probabilities
-    (0 until currentState.size).map { index =>
-      nextStateVectors.map { vector =>
-        vector(index)
-      }.sum
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/io/prediction/e2/evaluation/CrossValidation.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/io/prediction/e2/evaluation/CrossValidation.scala b/e2/src/main/scala/io/prediction/e2/evaluation/CrossValidation.scala
deleted file mode 100644
index 8b482bd..0000000
--- a/e2/src/main/scala/io/prediction/e2/evaluation/CrossValidation.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package io.prediction.e2.evaluation
-
-import scala.reflect.ClassTag
-import org.apache.spark.rdd.RDD
-
-/** Common helper functions */
-object CommonHelperFunctions {
-
-  /** Split a data set into evalK folds for crossvalidation.
-    * Apply to data sets supplied to evaluation.
-    *
-    * @tparam D Data point class.
-    * @tparam TD Training data class.
-    * @tparam EI Evaluation Info class.
-    * @tparam Q Input query class.
-    * @tparam A Actual value class.
-    */
-
-  def splitData[D: ClassTag, TD, EI, Q, A](
-
-     evalK: Int,
-     dataset: RDD[D],
-     evaluatorInfo: EI,
-     trainingDataCreator: RDD[D] => TD,
-     queryCreator: D => Q,
-     actualCreator: D => A): Seq[(TD, EI, RDD[(Q, A)])] = {
-
-    val indexedPoints = dataset.zipWithIndex
-
-    def selectPoint(foldIdx: Int, pt: D, idx: Long, k: Int, isTraining: Boolean): Option[D] = {
-      if ((idx % k == foldIdx) ^ isTraining) Some(pt)
-      else None
-    }
-
-    (0 until evalK).map { foldIdx =>
-      val trainingPoints = indexedPoints.flatMap { case(pt, idx) =>
-        selectPoint(foldIdx, pt, idx, evalK, true)
-      }
-      val testingPoints = indexedPoints.flatMap { case(pt, idx) =>
-        selectPoint(foldIdx, pt, idx, evalK, false)
-      }
-
-      (
-        trainingDataCreator(trainingPoints),
-        evaluatorInfo,
-        testingPoints.map { d => (queryCreator(d), actualCreator(d)) }
-      )
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/io/prediction/e2/package.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/io/prediction/e2/package.scala b/e2/src/main/scala/io/prediction/e2/package.scala
deleted file mode 100644
index 9f5491a..0000000
--- a/e2/src/main/scala/io/prediction/e2/package.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.e2
-
-/** Collection of engine libraries that have no dependency on PredictionIO */
-package object engine {}
-
-/** Collection of evaluation libraries that have no dependency on PredictionIO */
-package object evaluation {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/io/prediction/package.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/io/prediction/package.scala b/e2/src/main/scala/io/prediction/package.scala
deleted file mode 100644
index 9628b5d..0000000
--- a/e2/src/main/scala/io/prediction/package.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction
-
-/** Independent library of code that is useful for engine development and
-  * evaluation
-  */
-package object e2 {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala b/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala
new file mode 100644
index 0000000..d831718
--- /dev/null
+++ b/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala
@@ -0,0 +1,61 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.predictionio.e2.engine
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.Vector
+import scala.collection.immutable.HashMap
+import scala.collection.immutable.HashSet
+
+class BinaryVectorizer(propertyMap : HashMap[(String, String), Int])
+extends Serializable {
+
+  val properties: Array[(String, String)] = propertyMap.toArray.sortBy(_._2).map(_._1)
+  val numFeatures = propertyMap.size
+
+  override def toString: String = {
+    s"BinaryVectorizer($numFeatures): " + properties.map(e => s"(${e._1}, ${e._2})").mkString(",")
+  }
+
+  def toBinary(map :  Array[(String, String)]) : Vector = {
+    val mapArr : Seq[(Int, Double)] = map.flatMap(
+      e => propertyMap.get(e).map(idx => (idx, 1.0))
+    )
+
+    Vectors.sparse(numFeatures, mapArr)
+  }
+}
+
+
+object BinaryVectorizer {
+  def apply (input : RDD[HashMap[String, String]], properties : HashSet[String])
+  : BinaryVectorizer = {
+    new BinaryVectorizer(HashMap(
+      input.flatMap(identity)
+        .filter(e => properties.contains(e._1))
+        .distinct
+        .collect
+        .zipWithIndex : _*
+    ))
+  }
+
+  def apply(input: Seq[(String, String)]): BinaryVectorizer = {
+    val indexed: Seq[((String, String), Int)] = input.zipWithIndex
+    new BinaryVectorizer(HashMap(indexed:_*))
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala b/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala
new file mode 100644
index 0000000..7944bbc
--- /dev/null
+++ b/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala
@@ -0,0 +1,176 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.predictionio.e2.engine
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+/**
+ * Class for training a naive Bayes model with categorical variables
+ */
+object CategoricalNaiveBayes {
+  /**
+   * Train with data points and return the model
+   *
+   * @param points training data points
+   */
+  def train(points: RDD[LabeledPoint]): CategoricalNaiveBayesModel = {
+    val labelCountFeatureLikelihoods = points.map { p =>
+      (p.label, p.features)
+    }.combineByKey[(Long, Array[Map[String, Long]])](
+        createCombiner =
+          (features: Array[String]) => {
+            val featureCounts = features.map { feature =>
+              Map[String, Long]().withDefaultValue(0L).updated(feature, 1L)
+            }
+
+            (1L, featureCounts)
+          },
+        mergeValue =
+          (c: (Long, Array[Map[String, Long]]), features: Array[String]) => {
+            (c._1 + 1L, c._2.zip(features).map { case (m, feature) =>
+              m.updated(feature, m(feature) + 1L)
+            })
+          },
+        mergeCombiners =
+          (
+            c1: (Long, Array[Map[String, Long]]),
+            c2: (Long, Array[Map[String, Long]])) => {
+            val labelCount1 = c1._1
+            val labelCount2 = c2._1
+            val featureCounts1 = c1._2
+            val featureCounts2 = c2._2
+
+            (labelCount1 + labelCount2,
+              featureCounts1.zip(featureCounts2).map { case (m1, m2) =>
+                m2 ++ m2.map { case (k, v) => k -> (v + m2(k))}
+              })
+          }
+      ).mapValues { case (labelCount, featureCounts) =>
+      val featureLikelihoods = featureCounts.map { featureCount =>
+        // mapValues does not return a serializable map
+        featureCount.mapValues(count => math.log(count.toDouble / labelCount))
+          .map(identity)
+      }
+
+      (labelCount, featureLikelihoods)
+    }.collect().toMap
+
+    val noOfPoints = labelCountFeatureLikelihoods.map(_._2._1).sum
+    val priors =
+      labelCountFeatureLikelihoods.mapValues { countFeatureLikelihoods =>
+        math.log(countFeatureLikelihoods._1 / noOfPoints.toDouble)
+      }
+    val likelihoods = labelCountFeatureLikelihoods.mapValues(_._2)
+
+    CategoricalNaiveBayesModel(priors, likelihoods)
+  }
+}
+
+/**
+ * Model for naive Bayes classifiers with categorical variables.
+ *
+ * @param priors log prior probabilities
+ * @param likelihoods log likelihood probabilities
+ */
+case class CategoricalNaiveBayesModel(
+  priors: Map[String, Double],
+  likelihoods: Map[String, Array[Map[String, Double]]]) extends Serializable {
+
+  val featureCount = likelihoods.head._2.size
+
+  /**
+   * Calculate the log score of having the given features and label
+   *
+   * @param point label and features
+   * @param defaultLikelihood a function that calculates the likelihood when a
+   *                          feature value is not present. The input to the
+   *                          function is the other feature value likelihoods.
+   * @return log score when label is present. None otherwise.
+   */
+  def logScore(
+    point: LabeledPoint,
+    defaultLikelihood: (Seq[Double]) => Double = ls => Double.NegativeInfinity
+    ): Option[Double] = {
+    val label = point.label
+    val features = point.features
+
+    if (!priors.contains(label)) {
+      None
+    } else {
+      Some(logScoreInternal(label, features, defaultLikelihood))
+    }
+  }
+
+  private def logScoreInternal(
+    label: String,
+    features: Array[String],
+    defaultLikelihood: (Seq[Double]) => Double = ls => Double.NegativeInfinity
+    ): Double = {
+
+    val prior = priors(label)
+    val likelihood = likelihoods(label)
+
+    val likelihoodScores = features.zip(likelihood).map {
+      case (feature, featureLikelihoods) =>
+        featureLikelihoods.getOrElse(
+          feature,
+          defaultLikelihood(featureLikelihoods.values.toSeq)
+        )
+    }
+
+    prior + likelihoodScores.sum
+  }
+
+  /**
+   * Return the label that yields the highest score
+   *
+   * @param features features for classification
+   *
+   */
+  def predict(features: Array[String]): String = {
+    priors.keySet.map { label =>
+      (label, logScoreInternal(label, features))
+    }.toSeq
+      .sortBy(_._2)(Ordering.Double.reverse)
+      .take(1)
+      .head
+      ._1
+  }
+}
+
+/**
+ * Class that represents the features and labels of a data point.
+ *
+ * @param label Label of this data point
+ * @param features Features of this data point
+ */
+case class LabeledPoint(label: String, features: Array[String]) {
+  override def toString: String = {
+    val featuresString = features.mkString("[", ",", "]")
+
+    s"($label, $featuresString)"
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case that: LabeledPoint => that.toString == this.toString
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    this.toString.hashCode
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala b/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala
new file mode 100644
index 0000000..41a070d
--- /dev/null
+++ b/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala
@@ -0,0 +1,89 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.predictionio.e2.engine
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
+import org.apache.spark.mllib.linalg.{SparseVector, Vectors}
+import org.apache.spark.rdd.RDD
+
+/**
+ * Class for training a Markov Chain model
+ */
+object MarkovChain {
+  /**
+   * Train a Markov Chain model
+   *
+   * @param matrix Tally of all state transitions
+   * @param topN Use the top-N tally for each state
+   */
+  def train(matrix: CoordinateMatrix, topN: Int): MarkovChainModel = {
+    val noOfStates = matrix.numCols().toInt
+    val transitionVectors = matrix.entries
+      .keyBy(_.i.toInt)
+      .groupByKey()
+      .mapValues { rowEntries =>
+      val total = rowEntries.map(_.value).sum
+      val sortedTopN = rowEntries.toSeq
+        .sortBy(_.value)(Ordering.Double.reverse)
+        .take(topN)
+        .map(me => (me.j.toInt, me.value / total))
+        .sortBy(_._1)
+
+      new SparseVector(
+        noOfStates,
+        sortedTopN.map(_._1).toArray,
+        sortedTopN.map(_._2).toArray)
+    }
+
+    new MarkovChainModel(
+      transitionVectors,
+      topN)
+  }
+}
+
+/**
+ * Markov Chain model
+ *
+ * @param transitionVectors transition vectors
+ * @param n top N used to construct the model
+ */
+case class MarkovChainModel(
+  transitionVectors: RDD[(Int, SparseVector)],
+  n: Int) {
+
+  /**
+   * Calculate the probabilities of the next state
+   *
+   * @param currentState probabilities of the current state
+   */
+  def predict(currentState: Seq[Double]): Seq[Double] = {
+    // multiply the input with transition matrix row by row
+    val nextStateVectors = transitionVectors.map { case (rowIndex, vector) =>
+        val values = vector.indices.map { index =>
+          vector(index) * currentState(rowIndex)
+        }
+
+        Vectors.sparse(currentState.size, vector.indices, values)
+    }.collect()
+
+    // sum up to get the total probabilities
+    (0 until currentState.size).map { index =>
+      nextStateVectors.map { vector =>
+        vector(index)
+      }.sum
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/org/apache/predictionio/e2/evaluation/CrossValidation.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/org/apache/predictionio/e2/evaluation/CrossValidation.scala b/e2/src/main/scala/org/apache/predictionio/e2/evaluation/CrossValidation.scala
new file mode 100644
index 0000000..d2e1d6a
--- /dev/null
+++ b/e2/src/main/scala/org/apache/predictionio/e2/evaluation/CrossValidation.scala
@@ -0,0 +1,64 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.predictionio.e2.evaluation
+
+import scala.reflect.ClassTag
+import org.apache.spark.rdd.RDD
+
+/** Common helper functions */
+object CommonHelperFunctions {
+
+  /** Split a data set into evalK folds for crossvalidation.
+    * Apply to data sets supplied to evaluation.
+    *
+    * @tparam D Data point class.
+    * @tparam TD Training data class.
+    * @tparam EI Evaluation Info class.
+    * @tparam Q Input query class.
+    * @tparam A Actual value class.
+    */
+
+  def splitData[D: ClassTag, TD, EI, Q, A](
+
+     evalK: Int,
+     dataset: RDD[D],
+     evaluatorInfo: EI,
+     trainingDataCreator: RDD[D] => TD,
+     queryCreator: D => Q,
+     actualCreator: D => A): Seq[(TD, EI, RDD[(Q, A)])] = {
+
+    val indexedPoints = dataset.zipWithIndex
+
+    def selectPoint(foldIdx: Int, pt: D, idx: Long, k: Int, isTraining: Boolean): Option[D] = {
+      if ((idx % k == foldIdx) ^ isTraining) Some(pt)
+      else None
+    }
+
+    (0 until evalK).map { foldIdx =>
+      val trainingPoints = indexedPoints.flatMap { case(pt, idx) =>
+        selectPoint(foldIdx, pt, idx, evalK, true)
+      }
+      val testingPoints = indexedPoints.flatMap { case(pt, idx) =>
+        selectPoint(foldIdx, pt, idx, evalK, false)
+      }
+
+      (
+        trainingDataCreator(trainingPoints),
+        evaluatorInfo,
+        testingPoints.map { d => (queryCreator(d), actualCreator(d)) }
+      )
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/org/apache/predictionio/e2/package.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/org/apache/predictionio/e2/package.scala b/e2/src/main/scala/org/apache/predictionio/e2/package.scala
new file mode 100644
index 0000000..c16e521
--- /dev/null
+++ b/e2/src/main/scala/org/apache/predictionio/e2/package.scala
@@ -0,0 +1,22 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.e2
+
+/** Collection of engine libraries that have no dependency on PredictionIO */
+package object engine {}
+
+/** Collection of evaluation libraries that have no dependency on PredictionIO */
+package object evaluation {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/main/scala/org/apache/predictionio/package.scala
----------------------------------------------------------------------
diff --git a/e2/src/main/scala/org/apache/predictionio/package.scala b/e2/src/main/scala/org/apache/predictionio/package.scala
new file mode 100644
index 0000000..b480779
--- /dev/null
+++ b/e2/src/main/scala/org/apache/predictionio/package.scala
@@ -0,0 +1,21 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio
+
+/** Independent library of code that is useful for engine development and
+  * evaluation
+  */
+package object e2 {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/e2/src/test/scala/io/prediction/e2/engine/BinaryVectorizerTest.scala
----------------------------------------------------------------------
diff --git a/e2/src/test/scala/io/prediction/e2/engine/BinaryVectorizerTest.scala b/e2/src/test/scala/io/prediction/e2/engine/BinaryVectorizerTest.scala
deleted file mode 100644
index 5e6bc16..0000000
--- a/e2/src/test/scala/io/prediction/e2/engine/BinaryVectorizerTest.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.e2.engine
-
-import io.prediction.e2.fixture.BinaryVectorizerFixture
-import io.prediction.e2.fixture.SharedSparkContext
-import org.apache.spark.mllib.linalg.Vectors
-import org.apache.spark.rdd.RDD
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
-import scala.collection.immutable.HashMap
-
-
-import scala.language.reflectiveCalls
-
-class BinaryVectorizerTest extends FlatSpec with Matchers with SharedSparkContext
-with BinaryVectorizerFixture{
-
-  "toBinary" should "produce the following summed values:" in {
-    val testCase = BinaryVectorizer(sc.parallelize(base.maps), base.properties)
-    val vectorTwoA = testCase.toBinary(testArrays.twoA)
-    val vectorTwoB = testCase.toBinary(testArrays.twoB)
-
-
-    // Make sure vectors produced are the same size.
-    vectorTwoA.size should be (vectorTwoB.size)
-
-    // // Test case for checking food value not listed in base.maps.
-    testCase.toBinary(testArrays.one).toArray.sum should be (1.0)
-
-    // Test cases for making sure indices are preserved.
-    val sumOne = vecSum(vectorTwoA, vectorTwoB)
-
-    exactly (1, sumOne) should be (2.0)
-    exactly (2,sumOne) should be (0.0)
-    exactly (2, sumOne) should be (1.0)
-
-    val sumTwo = vecSum(Vectors.dense(sumOne), testCase.toBinary(testArrays.twoC))
-
-    exactly (3, sumTwo) should be (1.0)
-  }
-
-}