You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:44 UTC

[13/34] incubator-predictionio git commit: rename all except examples

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/LEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/LEventStore.scala b/data/src/main/scala/io/prediction/data/store/LEventStore.scala
deleted file mode 100644
index be543eb..0000000
--- a/data/src/main/scala/io/prediction/data/store/LEventStore.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.store
-
-import io.prediction.data.storage.Storage
-import io.prediction.data.storage.Event
-
-import org.joda.time.DateTime
-
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration.Duration
-
-/** This object provides a set of operation to access Event Store
-  * without going through Spark's parallelization
-  */
-object LEventStore {
-
-  private val defaultTimeout = Duration(60, "seconds")
-
-  @transient lazy private val eventsDb = Storage.getLEvents()
-
-  /** Reads events of the specified entity. May use this in Algorithm's predict()
-    * or Serving logic to have fast event store access.
-    *
-    * @param appName return events of this app
-    * @param entityType return events of this entityType
-    * @param entityId return events of this entityId
-    * @param channelName return events of this channel (default channel if it's None)
-    * @param eventNames return events with any of these event names.
-    * @param targetEntityType return events of this targetEntityType:
-    *   - None means no restriction on targetEntityType
-    *   - Some(None) means no targetEntityType for this event
-    *   - Some(Some(x)) means targetEntityType should match x.
-    * @param targetEntityId return events of this targetEntityId
-    *   - None means no restriction on targetEntityId
-    *   - Some(None) means no targetEntityId for this event
-    *   - Some(Some(x)) means targetEntityId should match x.
-    * @param startTime return events with eventTime >= startTime
-    * @param untilTime return events with eventTime < untilTime
-    * @param limit Limit number of events. Get all events if None or Some(-1)
-    * @param latest Return latest event first (default true)
-    * @return Iterator[Event]
-    */
-  def findByEntity(
-    appName: String,
-    entityType: String,
-    entityId: String,
-    channelName: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    limit: Option[Int] = None,
-    latest: Boolean = true,
-    timeout: Duration = defaultTimeout): Iterator[Event] = {
-
-    val (appId, channelId) = Common.appNameToId(appName, channelName)
-
-    Await.result(eventsDb.futureFind(
-      appId = appId,
-      channelId = channelId,
-      startTime = startTime,
-      untilTime = untilTime,
-      entityType = Some(entityType),
-      entityId = Some(entityId),
-      eventNames = eventNames,
-      targetEntityType = targetEntityType,
-      targetEntityId = targetEntityId,
-      limit = limit,
-      reversed = Some(latest)),
-      timeout)
-  }
-
-  /** Reads events generically. If entityType or entityId is not specified, it
-    * results in table scan.
-    *
-    * @param appName return events of this app
-    * @param entityType return events of this entityType
-    *   - None means no restriction on entityType
-    *   - Some(x) means entityType should match x.
-    * @param entityId return events of this entityId
-    *   - None means no restriction on entityId
-    *   - Some(x) means entityId should match x.
-    * @param channelName return events of this channel (default channel if it's None)
-    * @param eventNames return events with any of these event names.
-    * @param targetEntityType return events of this targetEntityType:
-    *   - None means no restriction on targetEntityType
-    *   - Some(None) means no targetEntityType for this event
-    *   - Some(Some(x)) means targetEntityType should match x.
-    * @param targetEntityId return events of this targetEntityId
-    *   - None means no restriction on targetEntityId
-    *   - Some(None) means no targetEntityId for this event
-    *   - Some(Some(x)) means targetEntityId should match x.
-    * @param startTime return events with eventTime >= startTime
-    * @param untilTime return events with eventTime < untilTime
-    * @param limit Limit number of events. Get all events if None or Some(-1)
-    * @return Iterator[Event]
-    */
-  def find(
-    appName: String,
-    entityType: Option[String] = None,
-    entityId: Option[String] = None,
-    channelName: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    limit: Option[Int] = None,
-    timeout: Duration = defaultTimeout): Iterator[Event] = {
-
-    val (appId, channelId) = Common.appNameToId(appName, channelName)
-
-    Await.result(eventsDb.futureFind(
-      appId = appId,
-      channelId = channelId,
-      startTime = startTime,
-      untilTime = untilTime,
-      entityType = entityType,
-      entityId = entityId,
-      eventNames = eventNames,
-      targetEntityType = targetEntityType,
-      targetEntityId = targetEntityId,
-      limit = limit), timeout)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/PEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/PEventStore.scala b/data/src/main/scala/io/prediction/data/store/PEventStore.scala
deleted file mode 100644
index cd20da9..0000000
--- a/data/src/main/scala/io/prediction/data/store/PEventStore.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.store
-
-import io.prediction.data.storage.Storage
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.PropertyMap
-
-import org.joda.time.DateTime
-
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-/** This object provides a set of operation to access Event Store
-  * with Spark's parallelization
-  */
-object PEventStore {
-
-  @transient lazy private val eventsDb = Storage.getPEvents()
-
-  /** Read events from Event Store
-    *
-    * @param appName return events of this app
-    * @param channelName return events of this channel (default channel if it's None)
-    * @param startTime return events with eventTime >= startTime
-    * @param untilTime return events with eventTime < untilTime
-    * @param entityType return events of this entityType
-    * @param entityId return events of this entityId
-    * @param eventNames return events with any of these event names.
-    * @param targetEntityType return events of this targetEntityType:
-    *   - None means no restriction on targetEntityType
-    *   - Some(None) means no targetEntityType for this event
-    *   - Some(Some(x)) means targetEntityType should match x.
-    * @param targetEntityId return events of this targetEntityId
-    *   - None means no restriction on targetEntityId
-    *   - Some(None) means no targetEntityId for this event
-    *   - Some(Some(x)) means targetEntityId should match x.
-    * @param sc Spark context
-    * @return RDD[Event]
-    */
-  def find(
-    appName: String,
-    channelName: Option[String] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    entityType: Option[String] = None,
-    entityId: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None
-  )(sc: SparkContext): RDD[Event] = {
-
-    val (appId, channelId) = Common.appNameToId(appName, channelName)
-
-    eventsDb.find(
-      appId = appId,
-      channelId = channelId,
-      startTime = startTime,
-      untilTime = untilTime,
-      entityType = entityType,
-      entityId = entityId,
-      eventNames = eventNames,
-      targetEntityType = targetEntityType,
-      targetEntityId = targetEntityId
-    )(sc)
-
-  }
-
-  /** Aggregate properties of entities based on these special events:
-    * \$set, \$unset, \$delete events.
-    *
-    * @param appName use events of this app
-    * @param entityType aggregate properties of the entities of this entityType
-    * @param channelName use events of this channel (default channel if it's None)
-    * @param startTime use events with eventTime >= startTime
-    * @param untilTime use events with eventTime < untilTime
-    * @param required only keep entities with these required properties defined
-    * @param sc Spark context
-    * @return RDD[(String, PropertyMap)] RDD of entityId and PropetyMap pair
-    */
-  def aggregateProperties(
-    appName: String,
-    entityType: String,
-    channelName: Option[String] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    required: Option[Seq[String]] = None)
-    (sc: SparkContext): RDD[(String, PropertyMap)] = {
-
-      val (appId, channelId) = Common.appNameToId(appName, channelName)
-
-      eventsDb.aggregateProperties(
-        appId = appId,
-        entityType = entityType,
-        channelId = channelId,
-        startTime = startTime,
-        untilTime = untilTime,
-        required = required
-      )(sc)
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala b/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala
deleted file mode 100644
index d619f65..0000000
--- a/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.store.java
-
-import io.prediction.data.storage.Event
-import io.prediction.data.store.LEventStore
-import org.joda.time.DateTime
-
-import scala.collection.JavaConversions
-import scala.concurrent.duration.Duration
-
-/** This Java-friendly object provides a set of operation to access Event Store
-  * without going through Spark's parallelization
-  */
-object LJavaEventStore {
-
-  /** Reads events of the specified entity. May use this in Algorithm's predict()
-    * or Serving logic to have fast event store access.
-    *
-    * @param appName return events of this app
-    * @param entityType return events of this entityType
-    * @param entityId return events of this entityId
-    * @param channelName return events of this channel (default channel if it's None)
-    * @param eventNames return events with any of these event names.
-    * @param targetEntityType return events of this targetEntityType:
-    *   - None means no restriction on targetEntityType
-    *   - Some(None) means no targetEntityType for this event
-    *   - Some(Some(x)) means targetEntityType should match x.
-    * @param targetEntityId return events of this targetEntityId
-    *   - None means no restriction on targetEntityId
-    *   - Some(None) means no targetEntityId for this event
-    *   - Some(Some(x)) means targetEntityId should match x.
-    * @param startTime return events with eventTime >= startTime
-    * @param untilTime return events with eventTime < untilTime
-    * @param limit Limit number of events. Get all events if None or Some(-1)
-    * @param latest Return latest event first
-    * @return java.util.List[Event]
-    */
-  def findByEntity(
-    appName: String,
-    entityType: String,
-    entityId: String,
-    channelName: Option[String],
-    eventNames: Option[java.util.List[String]],
-    targetEntityType: Option[Option[String]],
-    targetEntityId: Option[Option[String]],
-    startTime: Option[DateTime],
-    untilTime: Option[DateTime],
-    limit: Option[Integer],
-    latest: Boolean,
-    timeout: Duration): java.util.List[Event] = {
-
-    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
-    val limitInt = limit.map(_.intValue())
-
-    JavaConversions.seqAsJavaList(
-      LEventStore.findByEntity(
-        appName,
-        entityType,
-        entityId,
-        channelName,
-        eventNamesSeq,
-        targetEntityType,
-        targetEntityId,
-        startTime,
-        untilTime,
-        limitInt,
-        latest,
-        timeout
-      ).toSeq)
-  }
-
-  /** Reads events generically. If entityType or entityId is not specified, it
-    * results in table scan.
-    *
-    * @param appName return events of this app
-    * @param entityType return events of this entityType
-    *   - None means no restriction on entityType
-    *   - Some(x) means entityType should match x.
-    * @param entityId return events of this entityId
-    *   - None means no restriction on entityId
-    *   - Some(x) means entityId should match x.
-    * @param channelName return events of this channel (default channel if it's None)
-    * @param eventNames return events with any of these event names.
-    * @param targetEntityType return events of this targetEntityType:
-    *   - None means no restriction on targetEntityType
-    *   - Some(None) means no targetEntityType for this event
-    *   - Some(Some(x)) means targetEntityType should match x.
-    * @param targetEntityId return events of this targetEntityId
-    *   - None means no restriction on targetEntityId
-    *   - Some(None) means no targetEntityId for this event
-    *   - Some(Some(x)) means targetEntityId should match x.
-    * @param startTime return events with eventTime >= startTime
-    * @param untilTime return events with eventTime < untilTime
-    * @param limit Limit number of events. Get all events if None or Some(-1)
-    * @return java.util.List[Event]
-    */
-  def find(
-    appName: String,
-    entityType: Option[String],
-    entityId: Option[String],
-    channelName: Option[String],
-    eventNames: Option[java.util.List[String]],
-    targetEntityType: Option[Option[String]],
-    targetEntityId: Option[Option[String]],
-    startTime: Option[DateTime],
-    untilTime: Option[DateTime],
-    limit: Option[Integer],
-    timeout: Duration): java.util.List[Event] = {
-
-    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
-    val limitInt = limit.map(_.intValue())
-
-    JavaConversions.seqAsJavaList(
-      LEventStore.find(
-        appName,
-        entityType,
-        entityId,
-        channelName,
-        eventNamesSeq,
-        targetEntityType,
-        targetEntityId,
-        startTime,
-        untilTime,
-        limitInt,
-        timeout
-      ).toSeq)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala b/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala
deleted file mode 100644
index dee608d..0000000
--- a/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.store.java
-
-/** Used by Java-based engines to mock Some and None */
-object OptionHelper {
-  /** Mimics a None from Java-based engine */
-  def none[T]: Option[T] = {
-    Option(null.asInstanceOf[T])
-  }
-
-  /** Mimics a Some from Java-based engine */
-  def some[T](value: T): Option[T] = {
-    Some(value)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala b/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala
deleted file mode 100644
index c0657d2..0000000
--- a/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.store.java
-
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.PropertyMap
-import io.prediction.data.store.PEventStore
-import org.apache.spark.SparkContext
-import org.apache.spark.api.java.JavaRDD
-import org.joda.time.DateTime
-
-import scala.collection.JavaConversions
-
-/** This Java-friendly object provides a set of operation to access Event Store
-  * with Spark's parallelization
-  */
-object PJavaEventStore {
-
-  /** Read events from Event Store
-    *
-    * @param appName return events of this app
-    * @param channelName return events of this channel (default channel if it's None)
-    * @param startTime return events with eventTime >= startTime
-    * @param untilTime return events with eventTime < untilTime
-    * @param entityType return events of this entityType
-    * @param entityId return events of this entityId
-    * @param eventNames return events with any of these event names.
-    * @param targetEntityType return events of this targetEntityType:
-    *   - None means no restriction on targetEntityType
-    *   - Some(None) means no targetEntityType for this event
-    *   - Some(Some(x)) means targetEntityType should match x.
-    * @param targetEntityId return events of this targetEntityId
-    *   - None means no restriction on targetEntityId
-    *   - Some(None) means no targetEntityId for this event
-    *   - Some(Some(x)) means targetEntityId should match x.
-    * @param sc Spark context
-    * @return JavaRDD[Event]
-    */
-  def find(
-    appName: String,
-    channelName: Option[String],
-    startTime: Option[DateTime],
-    untilTime: Option[DateTime],
-    entityType: Option[String],
-    entityId: Option[String],
-    eventNames: Option[java.util.List[String]],
-    targetEntityType: Option[Option[String]],
-    targetEntityId: Option[Option[String]],
-    sc: SparkContext): JavaRDD[Event] = {
-
-    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
-
-    PEventStore.find(
-      appName,
-      channelName,
-      startTime,
-      untilTime,
-      entityType,
-      entityId,
-      eventNamesSeq,
-      targetEntityType,
-      targetEntityId
-    )(sc)
-  }
-
-  /** Aggregate properties of entities based on these special events:
-    * \$set, \$unset, \$delete events.
-    *
-    * @param appName use events of this app
-    * @param entityType aggregate properties of the entities of this entityType
-    * @param channelName use events of this channel (default channel if it's None)
-    * @param startTime use events with eventTime >= startTime
-    * @param untilTime use events with eventTime < untilTime
-    * @param required only keep entities with these required properties defined
-    * @param sc Spark context
-    * @return JavaRDD[(String, PropertyMap)] JavaRDD of entityId and PropetyMap pair
-    */
-  def aggregateProperties(
-    appName: String,
-    entityType: String,
-    channelName: Option[String],
-    startTime: Option[DateTime],
-    untilTime: Option[DateTime],
-    required: Option[java.util.List[String]],
-    sc: SparkContext): JavaRDD[(String, PropertyMap)] = {
-
-    PEventStore.aggregateProperties(
-      appName,
-    entityType,
-    channelName,
-    startTime,
-    untilTime
-    )(sc)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/package.scala b/data/src/main/scala/io/prediction/data/store/package.scala
deleted file mode 100644
index 4856416..0000000
--- a/data/src/main/scala/io/prediction/data/store/package.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data
-
-/** Provides high level interfaces to the Event Store from within a prediction
-  * engine.
-  */
-package object store {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/DataView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/view/DataView.scala b/data/src/main/scala/io/prediction/data/view/DataView.scala
deleted file mode 100644
index 52a67fd..0000000
--- a/data/src/main/scala/io/prediction/data/view/DataView.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.view
-
-import io.prediction.annotation.Experimental
-import io.prediction.data.storage.Event
-
-import grizzled.slf4j.Logger
-import io.prediction.data.store.PEventStore
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.SQLContext
-import org.joda.time.DateTime
-
-import scala.reflect.ClassTag
-import scala.reflect.runtime.universe._
-import scala.util.hashing.MurmurHash3
-
-/**
- * :: Experimental ::
- */
-@Experimental
-object DataView {
-  /**
-    * :: Experimental ::
-    *
-    * Create a DataFrame from events of a specified app.
-    *
-    * @param appName return events of this app
-    * @param channelName use events of this channel (default channel if it's None)
-    * @param startTime return events with eventTime >= startTime
-    * @param untilTime return events with eventTime < untilTime
-    * @param conversionFunction a function that turns raw Events into events of interest.
-    *                           If conversionFunction returns None, such events are dropped.
-    * @param name identify the DataFrame created
-    * @param version used to track changes to the conversionFunction, e.g. version = "20150413"
-    *                and update whenever the function is changed.
-    * @param sqlContext SQL context
-    * @tparam E the output type of the conversion function. The type needs to extend Product
-    *           (e.g. case class)
-    * @return a DataFrame of events
-    */
-  @Experimental
-  def create[E <: Product: TypeTag: ClassTag](
-    appName: String,
-    channelName: Option[String] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    conversionFunction: Event => Option[E],
-    name: String = "",
-    version: String = "")(sqlContext: SQLContext): DataFrame = {
-
-    @transient lazy val logger = Logger[this.type]
-
-    val sc = sqlContext.sparkContext
-
-    val beginTime = startTime match {
-      case Some(t) => t
-      case None => new DateTime(0L)
-    }
-    val endTime = untilTime match {
-      case Some(t) => t
-      case None => DateTime.now() // fix the current time
-    }
-    // detect changes to the case class
-    val uid = java.io.ObjectStreamClass.lookup(implicitly[reflect.ClassTag[E]].runtimeClass)
-        .getSerialVersionUID
-    val hash = MurmurHash3.stringHash(s"$beginTime-$endTime-$version-$uid")
-    val baseDir = s"${sys.env("PIO_FS_BASEDIR")}/view"
-    val fileName = s"$baseDir/$name-$appName-$hash.parquet"
-    try {
-      sqlContext.parquetFile(fileName)
-    } catch {
-      case e: java.io.FileNotFoundException =>
-        logger.info("Cached copy not found, reading from DB.")
-        // if cached copy is found, use it. If not, grab from Storage
-        val result: RDD[E] = PEventStore.find(
-            appName = appName,
-            channelName = channelName,
-            startTime = startTime,
-            untilTime = Some(endTime))(sc)
-          .flatMap((e) => conversionFunction(e))
-        import sqlContext.implicits._ // needed for RDD.toDF()
-        val resultDF = result.toDF()
-
-        resultDF.saveAsParquetFile(fileName)
-        sqlContext.parquetFile(fileName)
-      case e: java.lang.RuntimeException =>
-        if (e.toString.contains("is not a Parquet file")) {
-          logger.error(s"$fileName does not contain a valid Parquet file. " +
-            "Please delete it and try again.")
-        }
-        throw e
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/LBatchView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/view/LBatchView.scala b/data/src/main/scala/io/prediction/data/view/LBatchView.scala
deleted file mode 100644
index f806056..0000000
--- a/data/src/main/scala/io/prediction/data/view/LBatchView.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.view
-
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventValidation
-import io.prediction.data.storage.DataMap
-import io.prediction.data.storage.Storage
-
-import org.joda.time.DateTime
-import scala.language.implicitConversions
-
-import scala.concurrent.ExecutionContext.Implicits.global // TODO
-
-@deprecated("Use LEvents or LEventStore instead.", "0.9.2")
-object ViewPredicates {
-  def getStartTimePredicate(startTimeOpt: Option[DateTime])
-  : (Event => Boolean) = {
-    startTimeOpt.map(getStartTimePredicate).getOrElse(_ => true)
-  }
-
-  def getStartTimePredicate(startTime: DateTime): (Event => Boolean) = {
-    e => (!(e.eventTime.isBefore(startTime) || e.eventTime.isEqual(startTime)))
-  }
-
-  def getUntilTimePredicate(untilTimeOpt: Option[DateTime])
-  : (Event => Boolean) = {
-    untilTimeOpt.map(getUntilTimePredicate).getOrElse(_ => true)
-  }
-
-  def getUntilTimePredicate(untilTime: DateTime): (Event => Boolean) = {
-    _.eventTime.isBefore(untilTime)
-  }
-
-  def getEntityTypePredicate(entityTypeOpt: Option[String]): (Event => Boolean)
-  = {
-    entityTypeOpt.map(getEntityTypePredicate).getOrElse(_ => true)
-  }
-
-  def getEntityTypePredicate(entityType: String): (Event => Boolean) = {
-    (_.entityType == entityType)
-  }
-
-  def getEventPredicate(eventOpt: Option[String]): (Event => Boolean)
-  = {
-    eventOpt.map(getEventPredicate).getOrElse(_ => true)
-  }
-
-  def getEventPredicate(event: String): (Event => Boolean) = {
-    (_.event == event)
-  }
-}
-
-@deprecated("Use LEvents instead.", "0.9.2")
-object ViewAggregators {
-  def getDataMapAggregator(): ((Option[DataMap], Event) => Option[DataMap]) = {
-    (p, e) => {
-      e.event match {
-        case "$set" => {
-          if (p == None) {
-            Some(e.properties)
-          } else {
-            p.map(_ ++ e.properties)
-          }
-        }
-        case "$unset" => {
-          if (p == None) {
-            None
-          } else {
-            p.map(_ -- e.properties.keySet)
-          }
-        }
-        case "$delete" => None
-        case _ => p // do nothing for others
-      }
-    }
-  }
-}
-
-@deprecated("Use LEvents instead.", "0.9.2")
-object EventSeq {
-  // Need to
-  // >>> import scala.language.implicitConversions
-  // to enable implicit conversion. Only import in the code where this is
-  // necessary to avoid confusion.
-  implicit def eventSeqToList(es: EventSeq): List[Event] = es.events
-  implicit def listToEventSeq(l: List[Event]): EventSeq = new EventSeq(l)
-}
-
-
-@deprecated("Use LEvents instead.", "0.9.2")
-class EventSeq(val events: List[Event]) {
-  def filter(
-    eventOpt: Option[String] = None,
-    entityTypeOpt: Option[String] = None,
-    startTimeOpt: Option[DateTime] = None,
-    untilTimeOpt: Option[DateTime] = None): EventSeq = {
-
-    events
-    .filter(ViewPredicates.getEventPredicate(eventOpt))
-    .filter(ViewPredicates.getStartTimePredicate(startTimeOpt))
-    .filter(ViewPredicates.getUntilTimePredicate(untilTimeOpt))
-    .filter(ViewPredicates.getEntityTypePredicate(entityTypeOpt))
-  }
-
-  def filter(p: (Event => Boolean)): EventSeq = events.filter(p)
-
-  def aggregateByEntityOrdered[T](init: T, op: (T, Event) => T)
-  : Map[String, T] = {
-    events
-    .groupBy( _.entityId )
-    .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op))
-    .toMap
-  }
-
-
-}
-
-
-@deprecated("Use LEventStore instead.", "0.9.2")
-class LBatchView(
-  val appId: Int,
-  val startTime: Option[DateTime],
-  val untilTime: Option[DateTime]) {
-
-  @transient lazy val eventsDb = Storage.getLEvents()
-
-  @transient lazy val _events = eventsDb.find(
-    appId = appId,
-    startTime = startTime,
-    untilTime = untilTime).toList
-
-  @transient lazy val events: EventSeq = new EventSeq(_events)
-
-  /* Aggregate event data
-   *
-   * @param entityType only aggregate event with entityType
-   * @param startTimeOpt if specified, only aggregate event after (inclusive)
-   * startTimeOpt
-   * @param untilTimeOpt if specified, only aggregate event until (exclusive)
-   * endTimeOpt
-   */
-  def aggregateProperties(
-      entityType: String,
-      startTimeOpt: Option[DateTime] = None,
-      untilTimeOpt: Option[DateTime] = None
-      ): Map[String, DataMap] = {
-
-    events
-    .filter(entityTypeOpt = Some(entityType))
-    .filter(e => EventValidation.isSpecialEvents(e.event))
-    .aggregateByEntityOrdered(
-      init = None,
-      op = ViewAggregators.getDataMapAggregator())
-    .filter{ case (k, v) => (v != None) }
-    .mapValues(_.get)
-
-  }
-
-  /*
-  def aggregateByEntityOrdered[T](
-    predicate: Event => Boolean,
-    init: T,
-    op: (T, Event) => T): Map[String, T] = {
-
-    _events
-      .filter( predicate(_) )
-      .groupBy( _.entityId )
-      .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op))
-      .toMap
-
-  }
-  */
-
-  /*
-  def groupByEntityOrdered[T](
-    predicate: Event => Boolean,
-    map: Event => T): Map[String, Seq[T]] = {
-
-    _events
-      .filter( predicate(_) )
-      .groupBy( _.entityId )
-      .mapValues( _.sortBy(_.eventTime.getMillis).map(map(_)) )
-      .toMap
-  }
-  */
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/PBatchView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/view/PBatchView.scala b/data/src/main/scala/io/prediction/data/view/PBatchView.scala
deleted file mode 100644
index 5b0f878..0000000
--- a/data/src/main/scala/io/prediction/data/view/PBatchView.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.view
-
-import io.prediction.data.storage.hbase.HBPEvents
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventValidation
-import io.prediction.data.storage.DataMap
-import io.prediction.data.storage.Storage
-
-import org.joda.time.DateTime
-
-import org.json4s.JValue
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-
-// each JValue data associated with the time it is set
-private[prediction] case class PropTime(val d: JValue, val t: Long) extends Serializable
-
-private[prediction] case class SetProp (
-  val fields: Map[String, PropTime],
-  // last set time. Note: fields could be empty with valid set time
-  val t: Long) extends Serializable {
-
-  def ++ (that: SetProp): SetProp = {
-    val commonKeys = fields.keySet.intersect(that.fields.keySet)
-
-    val common: Map[String, PropTime] = commonKeys.map { k =>
-      val thisData = this.fields(k)
-      val thatData = that.fields(k)
-      // only keep the value with latest time
-      val v = if (thisData.t > thatData.t) thisData else thatData
-      (k, v)
-    }.toMap
-
-    val combinedFields = common ++
-      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
-
-    // keep the latest set time
-    val combinedT = if (this.t > that.t) this.t else that.t
-
-    SetProp(
-      fields = combinedFields,
-      t = combinedT
-    )
-  }
-}
-
-private[prediction] case class UnsetProp (fields: Map[String, Long]) extends Serializable {
-  def ++ (that: UnsetProp): UnsetProp = {
-    val commonKeys = fields.keySet.intersect(that.fields.keySet)
-
-    val common: Map[String, Long] = commonKeys.map { k =>
-      val thisData = this.fields(k)
-      val thatData = that.fields(k)
-      // only keep the value with latest time
-      val v = if (thisData > thatData) thisData else thatData
-      (k, v)
-    }.toMap
-
-    val combinedFields = common ++
-      (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
-
-    UnsetProp(
-      fields = combinedFields
-    )
-  }
-}
-
-private[prediction] case class DeleteEntity (t: Long) extends Serializable {
-  def ++ (that: DeleteEntity): DeleteEntity = {
-    if (this.t > that.t) this else that
-  }
-}
-
-private[prediction] case class EventOp (
-  val setProp: Option[SetProp] = None,
-  val unsetProp: Option[UnsetProp] = None,
-  val deleteEntity: Option[DeleteEntity] = None
-) extends Serializable {
-
-  def ++ (that: EventOp): EventOp = {
-    EventOp(
-      setProp = (setProp ++ that.setProp).reduceOption(_ ++ _),
-      unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _),
-      deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _)
-    )
-  }
-
-  def toDataMap(): Option[DataMap] = {
-    setProp.flatMap { set =>
-
-      val unsetKeys: Set[String] = unsetProp.map( unset =>
-        unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet
-      ).getOrElse(Set())
-
-      val combinedFields = deleteEntity.map { delete =>
-        if (delete.t >= set.t) {
-          None
-        } else {
-          val deleteKeys: Set[String] = set.fields
-            .filter { case (k, PropTime(kv, t)) =>
-              (delete.t >= t)
-            }.keySet
-          Some(set.fields -- unsetKeys -- deleteKeys)
-        }
-      }.getOrElse{
-        Some(set.fields -- unsetKeys)
-      }
-
-      // Note: mapValues() doesn't return concrete Map and causes
-      // NotSerializableException issue. Use map(identity) to work around this.
-      // see https://issues.scala-lang.org/browse/SI-7005
-      combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity)))
-    }
-  }
-
-}
-
-private[prediction] object EventOp {
-  def apply(e: Event): EventOp = {
-    val t = e.eventTime.getMillis
-    e.event match {
-      case "$set" => {
-        val fields = e.properties.fields.mapValues(jv =>
-          PropTime(jv, t)
-        ).map(identity)
-
-        EventOp(
-          setProp = Some(SetProp(fields = fields, t = t))
-        )
-      }
-      case "$unset" => {
-        val fields = e.properties.fields.mapValues(jv => t).map(identity)
-        EventOp(
-          unsetProp = Some(UnsetProp(fields = fields))
-        )
-      }
-      case "$delete" => {
-        EventOp(
-          deleteEntity = Some(DeleteEntity(t))
-        )
-      }
-      case _ => {
-        EventOp()
-      }
-    }
-  }
-}
-
-@deprecated("Use PEvents or PEventStore instead.", "0.9.2")
-class PBatchView(
-  val appId: Int,
-  val startTime: Option[DateTime],
-  val untilTime: Option[DateTime],
-  val sc: SparkContext) {
-
-  // NOTE: parallel Events DB interface
-  @transient lazy val eventsDb = Storage.getPEvents()
-
-  @transient lazy val _events: RDD[Event] =
-    eventsDb.getByAppIdAndTimeAndEntity(
-      appId = appId,
-      startTime = startTime,
-      untilTime = untilTime,
-      entityType = None,
-      entityId = None)(sc)
-
-  // TODO: change to use EventSeq?
-  @transient lazy val events: RDD[Event] = _events
-
-  def aggregateProperties(
-    entityType: String,
-    startTimeOpt: Option[DateTime] = None,
-    untilTimeOpt: Option[DateTime] = None
-  ): RDD[(String, DataMap)] = {
-
-    _events
-      .filter( e => ((e.entityType == entityType) &&
-        (EventValidation.isSpecialEvents(e.event))) )
-      .map( e => (e.entityId, EventOp(e) ))
-      .aggregateByKey[EventOp](EventOp())(
-        // within same partition
-        seqOp = { case (u, v) => u ++ v },
-        // across partition
-        combOp = { case (accu, u) => accu ++ u }
-      )
-      .mapValues(_.toDataMap)
-      .filter{ case (k, v) => v.isDefined }
-      .map{ case (k, v) => (k, v.get) }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/QuickTest.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/view/QuickTest.scala b/data/src/main/scala/io/prediction/data/view/QuickTest.scala
deleted file mode 100644
index 68ade1d..0000000
--- a/data/src/main/scala/io/prediction/data/view/QuickTest.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.view
-
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.LEvents
-import io.prediction.data.storage.EventValidation
-import io.prediction.data.storage.DataMap
-import io.prediction.data.storage.Storage
-
-import scala.concurrent.ExecutionContext.Implicits.global // TODO
-
-import grizzled.slf4j.Logger
-import org.joda.time.DateTime
-
-import scala.language.implicitConversions
-
-class TestHBLEvents() {
-  @transient lazy val eventsDb = Storage.getLEvents()
-
-  def run(): Unit = {
-    val r = eventsDb.find(
-      appId = 1,
-      startTime = None,
-      untilTime = None,
-      entityType = Some("pio_user"),
-      entityId = Some("3")).toList
-    println(r)
-  }
-}
-
-class TestSource(val appId: Int) {
-  @transient lazy val logger = Logger[this.type]
-  @transient lazy val batchView = new LBatchView(appId,
-    None, None)
-
-  def run(): Unit = {
-    println(batchView.events)
-  }
-}
-
-object QuickTest {
-
-  def main(args: Array[String]) {
-    val t = new TestHBLEvents()
-    t.run()
-
-    // val ts = new TestSource(args(0).toInt)
-    // ts.run()
-  }
-}
-
-object TestEventTime {
-  @transient lazy val batchView = new LBatchView(9, None, None)
-
-  // implicit def back2list(es: EventSeq) = es.events
-
-  def main(args: Array[String]) {
-    val e = batchView.events.filter(
-      eventOpt = Some("rate"),
-      startTimeOpt = Some(new DateTime(1998, 1, 1, 0, 0))
-      // untilTimeOpt = Some(new DateTime(1997, 1, 1, 0, 0))
-    )
-      // untilTimeOpt = Some(new DateTime(2000, 1, 1, 0, 0)))
-
-    e.foreach { println }
-    println()
-    println()
-    println()
-    val u = batchView.aggregateProperties("pio_item")
-    u.foreach { println }
-    println()
-    println()
-    println()
-
-    // val l: Seq[Event] = e
-    val l = e.map { _.entityId }
-    l.foreach { println }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala b/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala
deleted file mode 100644
index 0b64afb..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks
-
-/** Webhooks Connnector Exception
-  *
-  * @param message the detail message
-  * @param cause the cause
-  */
-private[prediction] class ConnectorException(message: String, cause: Throwable)
-  extends Exception(message, cause) {
-
-  /** Webhooks Connnector Exception with cause being set to null
-    *
-    * @param message the detail message
-    */
-  def this(message: String) = this(message, null)
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala b/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala
deleted file mode 100644
index 424b6ba..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks
-
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventJson4sSupport
-
-import org.json4s.Formats
-import org.json4s.DefaultFormats
-import org.json4s.JObject
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-
-private[prediction] object ConnectorUtil {
-
-  implicit val eventJson4sFormats: Formats = DefaultFormats +
-    new EventJson4sSupport.APISerializer
-
-  // intentionally use EventJson4sSupport.APISerializer to convert
-  // from JSON to Event object. Don't allow connector directly create
-  // Event object so that the Event object formation is consistent
-  // by enforcing JSON format
-
-  def toEvent(connector: JsonConnector, data: JObject): Event = {
-    read[Event](write(connector.toEventJson(data)))
-  }
-
-  def toEvent(connector: FormConnector, data: Map[String, String]): Event = {
-    read[Event](write(connector.toEventJson(data)))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala
deleted file mode 100644
index 9087f31..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks
-
-import org.json4s.JObject
-
-/** Connector for Webhooks connection with Form submission data format
-  */
-private[prediction] trait FormConnector {
-
-  // TODO: support conversion to multiple events?
-
-  /** Convert from original Form submission data to Event JObject
-    * @param data Map of key-value pairs in String type received through webhooks
-    * @return Event JObject
-   */
-  def toEventJson(data: Map[String, String]): JObject
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala
deleted file mode 100644
index e0e80fe..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks
-
-import org.json4s.JObject
-
-/** Connector for Webhooks connection */
-private[prediction] trait JsonConnector {
-
-  // TODO: support conversion to multiple events?
-
-  /** Convert from original JObject to Event JObject
-    * @param data original JObject recevived through webhooks
-    * @return Event JObject
-   */
-  def toEventJson(data: JObject): JObject
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala
deleted file mode 100644
index f19e009..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks.exampleform
-
-import io.prediction.data.webhooks.FormConnector
-import io.prediction.data.webhooks.ConnectorException
-
-import org.json4s.JObject
-
-
-/** Example FormConnector with following types of webhook form data inputs:
-  *
-  * UserAction
-  *
-  *   "type"="userAction"
-  *   "userId"="as34smg4",
-  *   "event"="do_something",
-  *   "context[ip]"="24.5.68.47", // optional
-  *   "context[prop1]"="2.345", // optional
-  *   "context[prop2]"="value1" // optional
-  *   "anotherProperty1"="100",
-  *   "anotherProperty2"="optional1", // optional
-  *   "timestamp"="2015-01-02T00:30:12.984Z"
-  *
-  * UserActionItem
-  *
-  *   "type"="userActionItem"
-  *   "userId"="as34smg4",
-  *   "event"="do_something_on",
-  *   "itemId"="kfjd312bc",
-  *   "context[ip]"="1.23.4.56",
-  *   "context[prop1]"="2.345",
-  *   "context[prop2]"="value1",
-  *   "anotherPropertyA"="4.567", // optional
-  *   "anotherPropertyB"="false", // optional
-  *   "timestamp"="2015-01-15T04:20:23.567Z"
-  *
-  */
-private[prediction] object ExampleFormConnector extends FormConnector {
-
-  override
-  def toEventJson(data: Map[String, String]): JObject = {
-    val json = try {
-      data.get("type") match {
-        case Some("userAction") => userActionToEventJson(data)
-        case Some("userActionItem") => userActionItemToEventJson(data)
-        case Some(x) => throw new ConnectorException(
-          s"Cannot convert unknown type ${x} to event JSON")
-        case None => throw new ConnectorException(
-          s"The field 'type' is required.")
-      }
-    } catch {
-      case e: ConnectorException => throw e
-      case e: Exception => throw new ConnectorException(
-        s"Cannot convert ${data} to event JSON. ${e.getMessage()}", e)
-    }
-    json
-  }
-
-  def userActionToEventJson(data: Map[String, String]): JObject = {
-    import org.json4s.JsonDSL._
-
-    // two level optional data
-    val context = if (data.exists(_._1.startsWith("context["))) {
-      Some(
-        ("ip" -> data.get("context[ip]")) ~
-        ("prop1" -> data.get("context[prop1]").map(_.toDouble)) ~
-        ("prop2" -> data.get("context[prop2]"))
-      )
-    } else {
-      None
-    }
-
-    val json =
-      ("event" -> data("event")) ~
-      ("entityType" -> "user") ~
-      ("entityId" -> data("userId")) ~
-      ("eventTime" -> data("timestamp")) ~
-      ("properties" -> (
-        ("context" -> context) ~
-        ("anotherProperty1" -> data("anotherProperty1").toInt) ~
-        ("anotherProperty2" -> data.get("anotherProperty2"))
-      ))
-    json
-  }
-
-
-  def userActionItemToEventJson(data: Map[String, String]): JObject = {
-    import org.json4s.JsonDSL._
-
-    val json =
-      ("event" -> data("event")) ~
-      ("entityType" -> "user") ~
-      ("entityId" -> data("userId")) ~
-      ("targetEntityType" -> "item") ~
-      ("targetEntityId" -> data("itemId")) ~
-      ("eventTime" -> data("timestamp")) ~
-      ("properties" -> (
-        ("context" -> (
-          ("ip" -> data("context[ip]")) ~
-          ("prop1" -> data("context[prop1]").toDouble) ~
-          ("prop2" -> data("context[prop2]"))
-        )) ~
-        ("anotherPropertyA" -> data.get("anotherPropertyA").map(_.toDouble)) ~
-        ("anotherPropertyB" -> data.get("anotherPropertyB").map(_.toBoolean))
-      ))
-    json
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala
deleted file mode 100644
index 4d4b991..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks.examplejson
-
-import io.prediction.data.webhooks.JsonConnector
-import io.prediction.data.webhooks.ConnectorException
-
-import org.json4s.Formats
-import org.json4s.DefaultFormats
-import org.json4s.JObject
-
-/** Example JsonConnector with following types of webhooks JSON input:
-  *
-  * UserAction
-  *
-  * {
-  *   "type": "userAction"
-  *   "userId": "as34smg4",
-  *   "event": "do_something",
-  *   "context": {
-  *     "ip": "24.5.68.47",
-  *     "prop1": 2.345,
-  *     "prop2": "value1"
-  *   },
-  *   "anotherProperty1": 100,
-  *   "anotherProperty2": "optional1",
-  *   "timestamp": "2015-01-02T00:30:12.984Z"
-  * }
-  *
-  * UserActionItem
-  *
-  * {
-  *   "type": "userActionItem"
-  *   "userId": "as34smg4",
-  *   "event": "do_something_on",
-  *   "itemId": "kfjd312bc",
-  *   "context": {
-  *     "ip": "1.23.4.56",
-  *     "prop1": 2.345,
-  *     "prop2": "value1"
-  *   },
-  *   "anotherPropertyA": 4.567,
-  *   "anotherPropertyB": false,
-  *   "timestamp": "2015-01-15T04:20:23.567Z"
-  * }
-  */
-private[prediction] object ExampleJsonConnector extends JsonConnector {
-
-  implicit val json4sFormats: Formats = DefaultFormats
-
-  override def toEventJson(data: JObject): JObject = {
-    val common = try {
-      data.extract[Common]
-    } catch {
-      case e: Exception => throw new ConnectorException(
-        s"Cannot extract Common field from ${data}. ${e.getMessage()}", e)
-    }
-
-    val json = try {
-      common.`type` match {
-        case "userAction" =>
-          toEventJson(common = common, userAction = data.extract[UserAction])
-        case "userActionItem" =>
-          toEventJson(common = common, userActionItem = data.extract[UserActionItem])
-        case x: String =>
-          throw new ConnectorException(
-            s"Cannot convert unknown type '${x}' to Event JSON.")
-      }
-    } catch {
-      case e: ConnectorException => throw e
-      case e: Exception => throw new ConnectorException(
-        s"Cannot convert ${data} to eventJson. ${e.getMessage()}", e)
-    }
-
-    json
-  }
-
-  def toEventJson(common: Common, userAction: UserAction): JObject = {
-    import org.json4s.JsonDSL._
-
-    // map to EventAPI JSON
-    val json =
-      ("event" -> userAction.event) ~
-        ("entityType" -> "user") ~
-        ("entityId" -> userAction.userId) ~
-        ("eventTime" -> userAction.timestamp) ~
-        ("properties" -> (
-          ("context" -> userAction.context) ~
-            ("anotherProperty1" -> userAction.anotherProperty1) ~
-            ("anotherProperty2" -> userAction.anotherProperty2)
-          ))
-    json
-  }
-
-  def toEventJson(common: Common, userActionItem: UserActionItem): JObject = {
-    import org.json4s.JsonDSL._
-
-    // map to EventAPI JSON
-    val json =
-      ("event" -> userActionItem.event) ~
-        ("entityType" -> "user") ~
-        ("entityId" -> userActionItem.userId) ~
-        ("targetEntityType" -> "item") ~
-        ("targetEntityId" -> userActionItem.itemId) ~
-        ("eventTime" -> userActionItem.timestamp) ~
-        ("properties" -> (
-          ("context" -> userActionItem.context) ~
-            ("anotherPropertyA" -> userActionItem.anotherPropertyA) ~
-            ("anotherPropertyB" -> userActionItem.anotherPropertyB)
-          ))
-    json
-  }
-
-  // Common required fields
-  case class Common(
-    `type`: String
-  )
-
-  // User Actions fields
-  case class UserAction (
-    userId: String,
-    event: String,
-    context: Option[JObject],
-    anotherProperty1: Int,
-    anotherProperty2: Option[String],
-    timestamp: String
-  )
-
-  // UserActionItem fields
-  case class UserActionItem (
-    userId: String,
-    event: String,
-    itemId: String,
-    context: JObject,
-    anotherPropertyA: Option[Double],
-    anotherPropertyB: Option[Boolean],
-    timestamp: String
-  )
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala
deleted file mode 100644
index b2793a0..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala
+++ /dev/null
@@ -1,305 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-
-package io.prediction.data.webhooks.mailchimp
-
-import io.prediction.data.webhooks.FormConnector
-import io.prediction.data.webhooks.ConnectorException
-import io.prediction.data.storage.EventValidation
-import io.prediction.data.Utils
-
-import org.json4s.JObject
-
-import org.joda.time.DateTime
-import org.joda.time.format.DateTimeFormat
-
-private[prediction] object MailChimpConnector extends FormConnector {
-
-  override
-  def toEventJson(data: Map[String, String]): JObject = {
-
-    val json = data.get("type") match {
-      case Some("subscribe") => subscribeToEventJson(data)
-      // UNSUBSCRIBE
-      case Some("unsubscribe") => unsubscribeToEventJson(data)
-      // PROFILE UPDATES
-      case Some("profile") => profileToEventJson(data)
-      // EMAIL UPDATE
-      case Some("upemail") => upemailToEventJson(data)
-      // CLEANED EMAILS
-      case Some("cleaned") => cleanedToEventJson(data)
-      // CAMPAIGN SENDING STATUS
-      case Some("campaign") => campaignToEventJson(data)
-      // invalid type
-      case Some(x) => throw new ConnectorException(
-        s"Cannot convert unknown MailChimp data type ${x} to event JSON")
-      case None => throw new ConnectorException(
-        s"The field 'type' is required for MailChimp data.")
-    }
-    json
-  }
-
-
-  val mailChimpDateTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
-    .withZone(EventValidation.defaultTimeZone)
-
-  def parseMailChimpDateTime(s: String): DateTime = {
-    mailChimpDateTimeFormat.parseDateTime(s)
-  }
-
-  def subscribeToEventJson(data: Map[String, String]): JObject = {
-
-    import org.json4s.JsonDSL._
-
-    /*
-    "type": "subscribe",
-    "fired_at": "2009-03-26 21:35:57",
-    "data[id]": "8a25ff1d98",
-    "data[list_id]": "a6b5da1054",
-    "data[email]": "api@mailchimp.com",
-    "data[email_type]": "html",
-    "data[merges][EMAIL]": "api@mailchimp.com",
-    "data[merges][FNAME]": "MailChimp",
-    "data[merges][LNAME]": "API",
-    "data[merges][INTERESTS]": "Group1,Group2",
-    "data[ip_opt]": "10.20.10.30",
-    "data[ip_signup]": "10.20.10.30"
-    */
-
-    // convert to ISO8601 format
-    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
-    // TODO: handle optional fields
-    val json =
-      ("event" -> "subscribe") ~
-      ("entityType" -> "user") ~
-      ("entityId" -> data("data[id]")) ~
-      ("targetEntityType" -> "list") ~
-      ("targetEntityId" -> data("data[list_id]")) ~
-      ("eventTime" -> eventTime) ~
-      ("properties" -> (
-        ("email" -> data("data[email]")) ~
-        ("email_type" -> data("data[email_type]")) ~
-        ("merges" -> (
-          ("EMAIL" -> data("data[merges][EMAIL]")) ~
-          ("FNAME" -> data("data[merges][FNAME]"))) ~
-          ("LNAME" -> data("data[merges][LNAME]")) ~
-          ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
-        )) ~
-        ("ip_opt" -> data("data[ip_opt]")) ~
-        ("ip_signup" -> data("data[ip_signup]")
-      ))
-
-    json
-
-  }
-
-  def unsubscribeToEventJson(data: Map[String, String]): JObject = {
-
-    import org.json4s.JsonDSL._
-
-    /*
-    "action" will either be "unsub" or "delete".
-    The reason will be "manual" unless caused by a spam complaint - then it will be "abuse"
-
-    "type": "unsubscribe",
-    "fired_at": "2009-03-26 21:40:57",
-    "data[action]": "unsub",
-    "data[reason]": "manual",
-    "data[id]": "8a25ff1d98",
-    "data[list_id]": "a6b5da1054",
-    "data[email]": "api+unsub@mailchimp.com",
-    "data[email_type]": "html",
-    "data[merges][EMAIL]": "api+unsub@mailchimp.com",
-    "data[merges][FNAME]": "MailChimp",
-    "data[merges][LNAME]": "API",
-    "data[merges][INTERESTS]": "Group1,Group2",
-    "data[ip_opt]": "10.20.10.30",
-    "data[campaign_id]": "cb398d21d2",
-    */
-
-    // convert to ISO8601 format
-    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
-    val json =
-      ("event" -> "unsubscribe") ~
-      ("entityType" -> "user") ~
-      ("entityId" -> data("data[id]")) ~
-      ("targetEntityType" -> "list") ~
-      ("targetEntityId" -> data("data[list_id]")) ~
-      ("eventTime" -> eventTime) ~
-      ("properties" -> (
-        ("action" -> data("data[action]")) ~
-        ("reason" -> data("data[reason]")) ~
-        ("email" -> data("data[email]")) ~
-        ("email_type" -> data("data[email_type]")) ~
-        ("merges" -> (
-          ("EMAIL" -> data("data[merges][EMAIL]")) ~
-          ("FNAME" -> data("data[merges][FNAME]"))) ~
-          ("LNAME" -> data("data[merges][LNAME]")) ~
-          ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
-        )) ~
-        ("ip_opt" -> data("data[ip_opt]")) ~
-        ("campaign_id" -> data("data[campaign_id]")
-      ))
-
-    json
-
-  }
-
-  def profileToEventJson(data: Map[String, String]): JObject = {
-
-    import org.json4s.JsonDSL._
-
-    /*
-    "type": "profile",
-    "fired_at": "2009-03-26 21:31:21",
-    "data[id]": "8a25ff1d98",
-    "data[list_id]": "a6b5da1054",
-    "data[email]": "api@mailchimp.com",
-    "data[email_type]": "html",
-    "data[merges][EMAIL]": "api@mailchimp.com",
-    "data[merges][FNAME]": "MailChimp",
-    "data[merges][LNAME]": "API",
-    "data[merges][INTERESTS]": "Group1,Group2", \\OPTIONAL
-    "data[ip_opt]": "10.20.10.30"
-    */
-
-    // convert to ISO8601 format
-    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
-    val json =
-      ("event" -> "profile") ~
-      ("entityType" -> "user") ~
-      ("entityId" -> data("data[id]")) ~
-      ("targetEntityType" -> "list") ~
-      ("targetEntityId" -> data("data[list_id]")) ~
-      ("eventTime" -> eventTime) ~
-      ("properties" -> (
-        ("email" -> data("data[email]")) ~
-        ("email_type" -> data("data[email_type]")) ~
-        ("merges" -> (
-          ("EMAIL" -> data("data[merges][EMAIL]")) ~
-          ("FNAME" -> data("data[merges][FNAME]"))) ~
-          ("LNAME" -> data("data[merges][LNAME]")) ~
-          ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
-        )) ~
-        ("ip_opt" -> data("data[ip_opt]")
-      ))
-
-    json
-
-  }
-
-  def upemailToEventJson(data: Map[String, String]): JObject = {
-
-    import org.json4s.JsonDSL._
-
-    /*
-    "type": "upemail",
-    "fired_at": "2009-03-26 22:15:09",
-    "data[list_id]": "a6b5da1054",
-    "data[new_id]": "51da8c3259",
-    "data[new_email]": "api+new@mailchimp.com",
-    "data[old_email]": "api+old@mailchimp.com"
-    */
-
-    // convert to ISO8601 format
-    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
-    val json =
-      ("event" -> "upemail") ~
-      ("entityType" -> "user") ~
-      ("entityId" -> data("data[new_id]")) ~
-      ("targetEntityType" -> "list") ~
-      ("targetEntityId" -> data("data[list_id]")) ~
-      ("eventTime" -> eventTime) ~
-      ("properties" -> (
-        ("new_email" -> data("data[new_email]")) ~
-        ("old_email" -> data("data[old_email]"))
-      ))
-
-    json
-
-  }
-
-  def cleanedToEventJson(data: Map[String, String]): JObject = {
-
-    import org.json4s.JsonDSL._
-
-    /*
-    Reason will be one of "hard" (for hard bounces) or "abuse"
-    "type": "cleaned",
-    "fired_at": "2009-03-26 22:01:00",
-    "data[list_id]": "a6b5da1054",
-    "data[campaign_id]": "4fjk2ma9xd",
-    "data[reason]": "hard",
-    "data[email]": "api+cleaned@mailchimp.com"
-    */
-
-    // convert to ISO8601 format
-    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
-    val json =
-      ("event" -> "cleaned") ~
-      ("entityType" -> "list") ~
-      ("entityId" -> data("data[list_id]")) ~
-      ("eventTime" -> eventTime) ~
-      ("properties" -> (
-        ("campaignId" -> data("data[campaign_id]")) ~
-        ("reason" -> data("data[reason]")) ~
-        ("email" -> data("data[email]"))
-      ))
-
-    json
-
-  }
-
-  def campaignToEventJson(data: Map[String, String]): JObject = {
-
-    import org.json4s.JsonDSL._
-
-    /*
-    "type": "campaign",
-    "fired_at": "2009-03-26 21:31:21",
-    "data[id]": "5aa2102003",
-    "data[subject]": "Test Campaign Subject",
-    "data[status]": "sent",
-    "data[reason]": "",
-    "data[list_id]": "a6b5da1054"
-    */
-
-    // convert to ISO8601 format
-    val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
-    val json =
-      ("event" -> "campaign") ~
-      ("entityType" -> "campaign") ~
-      ("entityId" -> data("data[id]")) ~
-      ("targetEntityType" -> "list") ~
-      ("targetEntityId" -> data("data[list_id]")) ~
-      ("eventTime" -> eventTime) ~
-      ("properties" -> (
-        ("subject" -> data("data[subject]")) ~
-        ("status" -> data("data[status]")) ~
-        ("reason" -> data("data[reason]"))
-      ))
-
-    json
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala
deleted file mode 100644
index 318043c..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala
+++ /dev/null
@@ -1,306 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.data.webhooks.segmentio
-
-import io.prediction.data.webhooks.{ConnectorException, JsonConnector}
-import org.json4s._
-
-private[prediction] object SegmentIOConnector extends JsonConnector {
-
-  // private lazy val supportedAPI = Vector("2", "2.0", "2.0.0")
-
-  implicit val json4sFormats: Formats = DefaultFormats
-
-  override
-  def toEventJson(data: JObject): JObject = {
-    try {
-      val version: String = data.values("version").toString
-/*
-      if (!supportedAPI.contains(version)) {
-        throw new ConnectorException(
-          s"Supported segment.io API versions: [2]. got [$version]"
-        )
-      }
-*/
-    } catch { case _: Throwable \u21d2
-      throw new ConnectorException(s"Failed to get segment.io API version.")
-    }
-
-    val common = try {
-      data.extract[Common]
-    } catch {
-      case e: Throwable \u21d2 throw new ConnectorException(
-        s"Cannot extract Common field from $data. ${e.getMessage}", e
-      )
-    }
-
-    try {
-      common.`type` match {
-        case "identify" \u21d2
-          toEventJson(
-            common = common,
-            identify = data.extract[Events.Identify]
-          )
-
-        case "track" \u21d2
-          toEventJson(
-            common = common,
-            track = data.extract[Events.Track]
-          )
-
-        case "alias" \u21d2
-          toEventJson(
-            common = common,
-            alias = data.extract[Events.Alias]
-          )
-
-        case "page" \u21d2
-          toEventJson(
-            common = common,
-            page = data.extract[Events.Page]
-          )
-
-        case "screen" \u21d2
-          toEventJson(
-            common = common,
-            screen = data.extract[Events.Screen]
-          )
-
-        case "group" \u21d2
-          toEventJson(
-            common = common,
-            group = data.extract[Events.Group]
-          )
-
-        case _ \u21d2
-          throw new ConnectorException(
-            s"Cannot convert unknown type ${common.`type`} to event JSON."
-          )
-      }
-    } catch {
-      case e: ConnectorException => throw e
-      case e: Exception =>
-        throw new ConnectorException(
-          s"Cannot convert $data to event JSON. ${e.getMessage}", e
-        )
-    }
-  }
-
-  def toEventJson(common: Common, identify: Events.Identify ): JObject = {
-    import org.json4s.JsonDSL._
-    val eventProperties = "traits" \u2192 identify.traits
-    toJson(common, eventProperties)
-  }
-
-  def toEventJson(common: Common, track: Events.Track): JObject = {
-    import org.json4s.JsonDSL._
-    val eventProperties =
-      ("properties" \u2192 track.properties) ~
-      ("event" \u2192 track.event)
-    toJson(common, eventProperties)
-  }
-
-  def toEventJson(common: Common, alias: Events.Alias): JObject = {
-    import org.json4s.JsonDSL._
-    toJson(common, "previous_id" \u2192 alias.previous_id)
-  }
-
-  def toEventJson(common: Common, screen: Events.Screen): JObject = {
-    import org.json4s.JsonDSL._
-    val eventProperties =
-      ("name" \u2192 screen.name) ~
-      ("properties" \u2192 screen.properties)
-    toJson(common, eventProperties)
-  }
-
-  def toEventJson(common: Common, page: Events.Page): JObject = {
-    import org.json4s.JsonDSL._
-    val eventProperties =
-      ("name" \u2192 page.name) ~
-      ("properties" \u2192 page.properties)
-    toJson(common, eventProperties)
-  }
-
-  def toEventJson(common: Common, group: Events.Group): JObject = {
-    import org.json4s.JsonDSL._
-    val eventProperties =
-      ("group_id" \u2192 group.group_id) ~
-      ("traits" \u2192 group.traits)
-    toJson(common, eventProperties)
-  }
-
-  private def toJson(common: Common, props: JObject): JsonAST.JObject = {
-    val commonFields = commonToJson(common)
-    JObject(("properties" \u2192 properties(common, props)) :: commonFields.obj)
-  }
-
-  private def properties(common: Common, eventProps: JObject): JObject = {
-    import org.json4s.JsonDSL._
-    common.context map { context \u21d2
-      try {
-        ("context" \u2192 Extraction.decompose(context)) ~ eventProps
-      } catch {
-        case e: Throwable \u21d2
-          throw new ConnectorException(
-            s"Cannot convert $context to event JSON. ${e.getMessage }", e
-          )
-      }
-    } getOrElse eventProps
-  }
-
-  private def commonToJson(common: Common): JObject =
-    commonToJson(common, common.`type`)
-
-  private def commonToJson(common: Common, typ: String): JObject = {
-    import org.json4s.JsonDSL._
-      common.user_id.orElse(common.anonymous_id) match {
-        case Some(userId) \u21d2
-          ("event" \u2192 typ) ~
-            ("entityType" \u2192 "user") ~
-            ("entityId" \u2192 userId) ~
-            ("eventTime" \u2192 common.timestamp)
-
-        case None \u21d2
-          throw new ConnectorException(
-            "there was no `userId` or `anonymousId` in the common fields."
-          )
-      }
-  }
-}
-
-object Events {
-
-  private[prediction] case class Track(
-    event: String,
-    properties: Option[JObject] = None
-  )
-
-  private[prediction] case class Alias(previous_id: String, user_id: String)
-
-  private[prediction] case class Group(
-    group_id: String,
-    traits: Option[JObject] = None
-  )
-
-  private[prediction] case class Screen(
-    name: Option[String] = None,
-    properties: Option[JObject] = None
-  )
-
-  private[prediction] case class Page(
-    name: Option[String] = None,
-    properties: Option[JObject] = None
-  )
-
-  private[prediction] case class Identify(
-    user_id: String,
-    traits: Option[JObject]
-  )
-
-}
-
-object Common {
-
-  private[prediction] case class Integrations(
-    All: Boolean = false,
-    Mixpanel: Boolean = false,
-    Marketo: Boolean = false,
-    Salesforse: Boolean = false
-  )
-
-  private[prediction] case class Context(
-    ip: String,
-    library: Library,
-    user_agent: String,
-    app: Option[App] = None,
-    campaign: Option[Campaign] = None,
-    device: Option[Device] = None,
-    network: Option[Network] = None,
-    location: Option[Location] = None,
-    os: Option[OS] = None,
-    referrer: Option[Referrer] = None,
-    screen: Option[Screen] = None,
-    timezone: Option[String] = None
-  )
-
-  private[prediction] case class Screen(width: Int, height: Int, density: Int)
-
-  private[prediction] case class Referrer(id: String, `type`: String)
-
-  private[prediction] case class OS(name: String, version: String)
-
-  private[prediction] case class Location(
-    city: Option[String] = None,
-    country: Option[String] = None,
-    latitude: Option[Double] = None,
-    longitude: Option[Double] = None,
-    speed: Option[Int] = None
-  )
-
-  case class Page(
-    path: String,
-    referrer: String,
-    search: String,
-    title: String,
-    url: String
-  )
-
-  private[prediction] case class Network(
-    bluetooth: Option[Boolean] = None,
-    carrier: Option[String] = None,
-    cellular: Option[Boolean] = None,
-    wifi: Option[Boolean] = None
-  )
-
-  private[prediction] case class Library(name: String, version: String)
-
-  private[prediction] case class Device(
-    id: Option[String] = None,
-    advertising_id: Option[String] = None,
-    ad_tracking_enabled: Option[Boolean] = None,
-    manufacturer: Option[String] = None,
-    model: Option[String] = None,
-    name: Option[String] = None,
-    `type`: Option[String] = None,
-    token: Option[String] = None
-  )
-
-  private[prediction] case class Campaign(
-    name: Option[String] = None,
-    source: Option[String] = None,
-    medium: Option[String] = None,
-    term: Option[String] = None,
-    content: Option[String] = None
-  )
-
-  private[prediction] case class App(
-    name: Option[String] = None,
-    version: Option[String] = None,
-    build: Option[String] = None
-  )
-
-}
-
-private[prediction] case class Common(
-  `type`: String,
-  sent_at: String,
-  timestamp: String,
-  version: String,
-  anonymous_id: Option[String] = None,
-  user_id: Option[String] = None,
-  context: Option[Common.Context] = None,
-  integrations: Option[Common.Integrations] = None
-)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/Utils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/Utils.scala b/data/src/main/scala/org/apache/predictionio/data/Utils.scala
new file mode 100644
index 0000000..db8c7a2
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/Utils.scala
@@ -0,0 +1,50 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data
+
+import org.joda.time.DateTime
+import org.joda.time.format.ISODateTimeFormat
+
+import java.lang.IllegalArgumentException
+
+private[prediction] object Utils {
+
+  // use dateTime() for strict ISO8601 format
+  val dateTimeFormatter = ISODateTimeFormat.dateTime().withOffsetParsed()
+
+  val dateTimeNoMillisFormatter =
+    ISODateTimeFormat.dateTimeNoMillis().withOffsetParsed()
+
+  def stringToDateTime(dt: String): DateTime = {
+    // We accept two formats.
+    // 1. "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"
+    // 2. "yyyy-MM-dd'T'HH:mm:ssZZ"
+    // The first one also takes milliseconds into account.
+    try {
+      // formatting for "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"
+      dateTimeFormatter.parseDateTime(dt)
+    } catch {
+      case e: IllegalArgumentException => {
+        // handle when the datetime string doesn't specify milliseconds.
+        dateTimeNoMillisFormatter.parseDateTime(dt)
+      }
+    }
+  }
+
+  def dateTimeToString(dt: DateTime): String = dateTimeFormatter.print(dt)
+    // dt.toString
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
new file mode 100644
index 0000000..c380daa
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
@@ -0,0 +1,80 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.webhooks.ConnectorException
+import org.apache.predictionio.data.storage.StorageException
+
+import spray.routing._
+import spray.routing.Directives._
+import spray.routing.Rejection
+import spray.http.StatusCodes
+import spray.http.StatusCode
+import spray.httpx.Json4sSupport
+
+import org.json4s.Formats
+import org.json4s.DefaultFormats
+
+object Common {
+
+  object Json4sProtocol extends Json4sSupport {
+    implicit def json4sFormats: Formats = DefaultFormats
+  }
+
+  import Json4sProtocol._
+
+  val rejectionHandler = RejectionHandler {
+    case MalformedRequestContentRejection(msg, _) :: _ =>
+      complete(StatusCodes.BadRequest, Map("message" -> msg))
+    case MissingQueryParamRejection(msg) :: _ =>
+      complete(StatusCodes.NotFound,
+        Map("message" -> s"missing required query parameter ${msg}."))
+    case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => {
+      val msg = cause match {
+        case AuthenticationFailedRejection.CredentialsRejected =>
+          "Invalid accessKey."
+        case AuthenticationFailedRejection.CredentialsMissing =>
+          "Missing accessKey."
+      }
+      complete(StatusCodes.Unauthorized, challengeHeaders, Map("message" -> msg))
+    }
+    case ChannelRejection(msg) :: _ =>
+      complete(StatusCodes.Unauthorized, Map("message" -> msg))
+    case NonExistentAppRejection(msg) :: _ =>
+      complete(StatusCodes.Unauthorized, Map("message" -> msg))
+  }
+
+  val exceptionHandler = ExceptionHandler {
+    case e: ConnectorException => {
+      val msg = s"${e.getMessage()}"
+      complete(StatusCodes.BadRequest, Map("message" -> msg))
+    }
+    case e: StorageException => {
+      val msg = s"${e.getMessage()}"
+      complete(StatusCodes.InternalServerError, Map("message" -> msg))
+    }
+    case e: Exception => {
+      val msg = s"${e.getMessage()}"
+      complete(StatusCodes.InternalServerError, Map("message" -> msg))
+    }
+  }
+}
+
+/** invalid channel */
+case class ChannelRejection(msg: String) extends Rejection
+
+/** the app doesn't exist */
+case class NonExistentAppRejection(msg: String) extends Rejection

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala
new file mode 100644
index 0000000..e25234f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala
@@ -0,0 +1,24 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.storage.Event
+
+case class EventInfo(
+  appId: Int,
+  channelId: Option[Int],
+  event: Event)
+