You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2016/07/18 20:17:44 UTC
[13/34] incubator-predictionio git commit: rename all except examples
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/LEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/LEventStore.scala b/data/src/main/scala/io/prediction/data/store/LEventStore.scala
deleted file mode 100644
index be543eb..0000000
--- a/data/src/main/scala/io/prediction/data/store/LEventStore.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.store
-
-import io.prediction.data.storage.Storage
-import io.prediction.data.storage.Event
-
-import org.joda.time.DateTime
-
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration.Duration
-
-/** This object provides a set of operation to access Event Store
- * without going through Spark's parallelization
- */
-object LEventStore {
-
- private val defaultTimeout = Duration(60, "seconds")
-
- @transient lazy private val eventsDb = Storage.getLEvents()
-
- /** Reads events of the specified entity. May use this in Algorithm's predict()
- * or Serving logic to have fast event store access.
- *
- * @param appName return events of this app
- * @param entityType return events of this entityType
- * @param entityId return events of this entityId
- * @param channelName return events of this channel (default channel if it's None)
- * @param eventNames return events with any of these event names.
- * @param targetEntityType return events of this targetEntityType:
- * - None means no restriction on targetEntityType
- * - Some(None) means no targetEntityType for this event
- * - Some(Some(x)) means targetEntityType should match x.
- * @param targetEntityId return events of this targetEntityId
- * - None means no restriction on targetEntityId
- * - Some(None) means no targetEntityId for this event
- * - Some(Some(x)) means targetEntityId should match x.
- * @param startTime return events with eventTime >= startTime
- * @param untilTime return events with eventTime < untilTime
- * @param limit Limit number of events. Get all events if None or Some(-1)
- * @param latest Return latest event first (default true)
- * @return Iterator[Event]
- */
- def findByEntity(
- appName: String,
- entityType: String,
- entityId: String,
- channelName: Option[String] = None,
- eventNames: Option[Seq[String]] = None,
- targetEntityType: Option[Option[String]] = None,
- targetEntityId: Option[Option[String]] = None,
- startTime: Option[DateTime] = None,
- untilTime: Option[DateTime] = None,
- limit: Option[Int] = None,
- latest: Boolean = true,
- timeout: Duration = defaultTimeout): Iterator[Event] = {
-
- val (appId, channelId) = Common.appNameToId(appName, channelName)
-
- Await.result(eventsDb.futureFind(
- appId = appId,
- channelId = channelId,
- startTime = startTime,
- untilTime = untilTime,
- entityType = Some(entityType),
- entityId = Some(entityId),
- eventNames = eventNames,
- targetEntityType = targetEntityType,
- targetEntityId = targetEntityId,
- limit = limit,
- reversed = Some(latest)),
- timeout)
- }
-
- /** Reads events generically. If entityType or entityId is not specified, it
- * results in table scan.
- *
- * @param appName return events of this app
- * @param entityType return events of this entityType
- * - None means no restriction on entityType
- * - Some(x) means entityType should match x.
- * @param entityId return events of this entityId
- * - None means no restriction on entityId
- * - Some(x) means entityId should match x.
- * @param channelName return events of this channel (default channel if it's None)
- * @param eventNames return events with any of these event names.
- * @param targetEntityType return events of this targetEntityType:
- * - None means no restriction on targetEntityType
- * - Some(None) means no targetEntityType for this event
- * - Some(Some(x)) means targetEntityType should match x.
- * @param targetEntityId return events of this targetEntityId
- * - None means no restriction on targetEntityId
- * - Some(None) means no targetEntityId for this event
- * - Some(Some(x)) means targetEntityId should match x.
- * @param startTime return events with eventTime >= startTime
- * @param untilTime return events with eventTime < untilTime
- * @param limit Limit number of events. Get all events if None or Some(-1)
- * @return Iterator[Event]
- */
- def find(
- appName: String,
- entityType: Option[String] = None,
- entityId: Option[String] = None,
- channelName: Option[String] = None,
- eventNames: Option[Seq[String]] = None,
- targetEntityType: Option[Option[String]] = None,
- targetEntityId: Option[Option[String]] = None,
- startTime: Option[DateTime] = None,
- untilTime: Option[DateTime] = None,
- limit: Option[Int] = None,
- timeout: Duration = defaultTimeout): Iterator[Event] = {
-
- val (appId, channelId) = Common.appNameToId(appName, channelName)
-
- Await.result(eventsDb.futureFind(
- appId = appId,
- channelId = channelId,
- startTime = startTime,
- untilTime = untilTime,
- entityType = entityType,
- entityId = entityId,
- eventNames = eventNames,
- targetEntityType = targetEntityType,
- targetEntityId = targetEntityId,
- limit = limit), timeout)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/PEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/PEventStore.scala b/data/src/main/scala/io/prediction/data/store/PEventStore.scala
deleted file mode 100644
index cd20da9..0000000
--- a/data/src/main/scala/io/prediction/data/store/PEventStore.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.store
-
-import io.prediction.data.storage.Storage
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.PropertyMap
-
-import org.joda.time.DateTime
-
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-/** This object provides a set of operation to access Event Store
- * with Spark's parallelization
- */
-object PEventStore {
-
- @transient lazy private val eventsDb = Storage.getPEvents()
-
- /** Read events from Event Store
- *
- * @param appName return events of this app
- * @param channelName return events of this channel (default channel if it's None)
- * @param startTime return events with eventTime >= startTime
- * @param untilTime return events with eventTime < untilTime
- * @param entityType return events of this entityType
- * @param entityId return events of this entityId
- * @param eventNames return events with any of these event names.
- * @param targetEntityType return events of this targetEntityType:
- * - None means no restriction on targetEntityType
- * - Some(None) means no targetEntityType for this event
- * - Some(Some(x)) means targetEntityType should match x.
- * @param targetEntityId return events of this targetEntityId
- * - None means no restriction on targetEntityId
- * - Some(None) means no targetEntityId for this event
- * - Some(Some(x)) means targetEntityId should match x.
- * @param sc Spark context
- * @return RDD[Event]
- */
- def find(
- appName: String,
- channelName: Option[String] = None,
- startTime: Option[DateTime] = None,
- untilTime: Option[DateTime] = None,
- entityType: Option[String] = None,
- entityId: Option[String] = None,
- eventNames: Option[Seq[String]] = None,
- targetEntityType: Option[Option[String]] = None,
- targetEntityId: Option[Option[String]] = None
- )(sc: SparkContext): RDD[Event] = {
-
- val (appId, channelId) = Common.appNameToId(appName, channelName)
-
- eventsDb.find(
- appId = appId,
- channelId = channelId,
- startTime = startTime,
- untilTime = untilTime,
- entityType = entityType,
- entityId = entityId,
- eventNames = eventNames,
- targetEntityType = targetEntityType,
- targetEntityId = targetEntityId
- )(sc)
-
- }
-
- /** Aggregate properties of entities based on these special events:
- * \$set, \$unset, \$delete events.
- *
- * @param appName use events of this app
- * @param entityType aggregate properties of the entities of this entityType
- * @param channelName use events of this channel (default channel if it's None)
- * @param startTime use events with eventTime >= startTime
- * @param untilTime use events with eventTime < untilTime
- * @param required only keep entities with these required properties defined
- * @param sc Spark context
- * @return RDD[(String, PropertyMap)] RDD of entityId and PropetyMap pair
- */
- def aggregateProperties(
- appName: String,
- entityType: String,
- channelName: Option[String] = None,
- startTime: Option[DateTime] = None,
- untilTime: Option[DateTime] = None,
- required: Option[Seq[String]] = None)
- (sc: SparkContext): RDD[(String, PropertyMap)] = {
-
- val (appId, channelId) = Common.appNameToId(appName, channelName)
-
- eventsDb.aggregateProperties(
- appId = appId,
- entityType = entityType,
- channelId = channelId,
- startTime = startTime,
- untilTime = untilTime,
- required = required
- )(sc)
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala b/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala
deleted file mode 100644
index d619f65..0000000
--- a/data/src/main/scala/io/prediction/data/store/java/LJavaEventStore.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.store.java
-
-import io.prediction.data.storage.Event
-import io.prediction.data.store.LEventStore
-import org.joda.time.DateTime
-
-import scala.collection.JavaConversions
-import scala.concurrent.duration.Duration
-
-/** This Java-friendly object provides a set of operation to access Event Store
- * without going through Spark's parallelization
- */
-object LJavaEventStore {
-
- /** Reads events of the specified entity. May use this in Algorithm's predict()
- * or Serving logic to have fast event store access.
- *
- * @param appName return events of this app
- * @param entityType return events of this entityType
- * @param entityId return events of this entityId
- * @param channelName return events of this channel (default channel if it's None)
- * @param eventNames return events with any of these event names.
- * @param targetEntityType return events of this targetEntityType:
- * - None means no restriction on targetEntityType
- * - Some(None) means no targetEntityType for this event
- * - Some(Some(x)) means targetEntityType should match x.
- * @param targetEntityId return events of this targetEntityId
- * - None means no restriction on targetEntityId
- * - Some(None) means no targetEntityId for this event
- * - Some(Some(x)) means targetEntityId should match x.
- * @param startTime return events with eventTime >= startTime
- * @param untilTime return events with eventTime < untilTime
- * @param limit Limit number of events. Get all events if None or Some(-1)
- * @param latest Return latest event first
- * @return java.util.List[Event]
- */
- def findByEntity(
- appName: String,
- entityType: String,
- entityId: String,
- channelName: Option[String],
- eventNames: Option[java.util.List[String]],
- targetEntityType: Option[Option[String]],
- targetEntityId: Option[Option[String]],
- startTime: Option[DateTime],
- untilTime: Option[DateTime],
- limit: Option[Integer],
- latest: Boolean,
- timeout: Duration): java.util.List[Event] = {
-
- val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
- val limitInt = limit.map(_.intValue())
-
- JavaConversions.seqAsJavaList(
- LEventStore.findByEntity(
- appName,
- entityType,
- entityId,
- channelName,
- eventNamesSeq,
- targetEntityType,
- targetEntityId,
- startTime,
- untilTime,
- limitInt,
- latest,
- timeout
- ).toSeq)
- }
-
- /** Reads events generically. If entityType or entityId is not specified, it
- * results in table scan.
- *
- * @param appName return events of this app
- * @param entityType return events of this entityType
- * - None means no restriction on entityType
- * - Some(x) means entityType should match x.
- * @param entityId return events of this entityId
- * - None means no restriction on entityId
- * - Some(x) means entityId should match x.
- * @param channelName return events of this channel (default channel if it's None)
- * @param eventNames return events with any of these event names.
- * @param targetEntityType return events of this targetEntityType:
- * - None means no restriction on targetEntityType
- * - Some(None) means no targetEntityType for this event
- * - Some(Some(x)) means targetEntityType should match x.
- * @param targetEntityId return events of this targetEntityId
- * - None means no restriction on targetEntityId
- * - Some(None) means no targetEntityId for this event
- * - Some(Some(x)) means targetEntityId should match x.
- * @param startTime return events with eventTime >= startTime
- * @param untilTime return events with eventTime < untilTime
- * @param limit Limit number of events. Get all events if None or Some(-1)
- * @return java.util.List[Event]
- */
- def find(
- appName: String,
- entityType: Option[String],
- entityId: Option[String],
- channelName: Option[String],
- eventNames: Option[java.util.List[String]],
- targetEntityType: Option[Option[String]],
- targetEntityId: Option[Option[String]],
- startTime: Option[DateTime],
- untilTime: Option[DateTime],
- limit: Option[Integer],
- timeout: Duration): java.util.List[Event] = {
-
- val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
- val limitInt = limit.map(_.intValue())
-
- JavaConversions.seqAsJavaList(
- LEventStore.find(
- appName,
- entityType,
- entityId,
- channelName,
- eventNamesSeq,
- targetEntityType,
- targetEntityId,
- startTime,
- untilTime,
- limitInt,
- timeout
- ).toSeq)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala b/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala
deleted file mode 100644
index dee608d..0000000
--- a/data/src/main/scala/io/prediction/data/store/java/OptionHelper.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.store.java
-
-/** Used by Java-based engines to mock Some and None */
-object OptionHelper {
- /** Mimics a None from Java-based engine */
- def none[T]: Option[T] = {
- Option(null.asInstanceOf[T])
- }
-
- /** Mimics a Some from Java-based engine */
- def some[T](value: T): Option[T] = {
- Some(value)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala b/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala
deleted file mode 100644
index c0657d2..0000000
--- a/data/src/main/scala/io/prediction/data/store/java/PJavaEventStore.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.store.java
-
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.PropertyMap
-import io.prediction.data.store.PEventStore
-import org.apache.spark.SparkContext
-import org.apache.spark.api.java.JavaRDD
-import org.joda.time.DateTime
-
-import scala.collection.JavaConversions
-
-/** This Java-friendly object provides a set of operation to access Event Store
- * with Spark's parallelization
- */
-object PJavaEventStore {
-
- /** Read events from Event Store
- *
- * @param appName return events of this app
- * @param channelName return events of this channel (default channel if it's None)
- * @param startTime return events with eventTime >= startTime
- * @param untilTime return events with eventTime < untilTime
- * @param entityType return events of this entityType
- * @param entityId return events of this entityId
- * @param eventNames return events with any of these event names.
- * @param targetEntityType return events of this targetEntityType:
- * - None means no restriction on targetEntityType
- * - Some(None) means no targetEntityType for this event
- * - Some(Some(x)) means targetEntityType should match x.
- * @param targetEntityId return events of this targetEntityId
- * - None means no restriction on targetEntityId
- * - Some(None) means no targetEntityId for this event
- * - Some(Some(x)) means targetEntityId should match x.
- * @param sc Spark context
- * @return JavaRDD[Event]
- */
- def find(
- appName: String,
- channelName: Option[String],
- startTime: Option[DateTime],
- untilTime: Option[DateTime],
- entityType: Option[String],
- entityId: Option[String],
- eventNames: Option[java.util.List[String]],
- targetEntityType: Option[Option[String]],
- targetEntityId: Option[Option[String]],
- sc: SparkContext): JavaRDD[Event] = {
-
- val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
-
- PEventStore.find(
- appName,
- channelName,
- startTime,
- untilTime,
- entityType,
- entityId,
- eventNamesSeq,
- targetEntityType,
- targetEntityId
- )(sc)
- }
-
- /** Aggregate properties of entities based on these special events:
- * \$set, \$unset, \$delete events.
- *
- * @param appName use events of this app
- * @param entityType aggregate properties of the entities of this entityType
- * @param channelName use events of this channel (default channel if it's None)
- * @param startTime use events with eventTime >= startTime
- * @param untilTime use events with eventTime < untilTime
- * @param required only keep entities with these required properties defined
- * @param sc Spark context
- * @return JavaRDD[(String, PropertyMap)] JavaRDD of entityId and PropetyMap pair
- */
- def aggregateProperties(
- appName: String,
- entityType: String,
- channelName: Option[String],
- startTime: Option[DateTime],
- untilTime: Option[DateTime],
- required: Option[java.util.List[String]],
- sc: SparkContext): JavaRDD[(String, PropertyMap)] = {
-
- PEventStore.aggregateProperties(
- appName,
- entityType,
- channelName,
- startTime,
- untilTime
- )(sc)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/store/package.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/store/package.scala b/data/src/main/scala/io/prediction/data/store/package.scala
deleted file mode 100644
index 4856416..0000000
--- a/data/src/main/scala/io/prediction/data/store/package.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data
-
-/** Provides high level interfaces to the Event Store from within a prediction
- * engine.
- */
-package object store {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/DataView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/view/DataView.scala b/data/src/main/scala/io/prediction/data/view/DataView.scala
deleted file mode 100644
index 52a67fd..0000000
--- a/data/src/main/scala/io/prediction/data/view/DataView.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.view
-
-import io.prediction.annotation.Experimental
-import io.prediction.data.storage.Event
-
-import grizzled.slf4j.Logger
-import io.prediction.data.store.PEventStore
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.SQLContext
-import org.joda.time.DateTime
-
-import scala.reflect.ClassTag
-import scala.reflect.runtime.universe._
-import scala.util.hashing.MurmurHash3
-
-/**
- * :: Experimental ::
- */
-@Experimental
-object DataView {
- /**
- * :: Experimental ::
- *
- * Create a DataFrame from events of a specified app.
- *
- * @param appName return events of this app
- * @param channelName use events of this channel (default channel if it's None)
- * @param startTime return events with eventTime >= startTime
- * @param untilTime return events with eventTime < untilTime
- * @param conversionFunction a function that turns raw Events into events of interest.
- * If conversionFunction returns None, such events are dropped.
- * @param name identify the DataFrame created
- * @param version used to track changes to the conversionFunction, e.g. version = "20150413"
- * and update whenever the function is changed.
- * @param sqlContext SQL context
- * @tparam E the output type of the conversion function. The type needs to extend Product
- * (e.g. case class)
- * @return a DataFrame of events
- */
- @Experimental
- def create[E <: Product: TypeTag: ClassTag](
- appName: String,
- channelName: Option[String] = None,
- startTime: Option[DateTime] = None,
- untilTime: Option[DateTime] = None,
- conversionFunction: Event => Option[E],
- name: String = "",
- version: String = "")(sqlContext: SQLContext): DataFrame = {
-
- @transient lazy val logger = Logger[this.type]
-
- val sc = sqlContext.sparkContext
-
- val beginTime = startTime match {
- case Some(t) => t
- case None => new DateTime(0L)
- }
- val endTime = untilTime match {
- case Some(t) => t
- case None => DateTime.now() // fix the current time
- }
- // detect changes to the case class
- val uid = java.io.ObjectStreamClass.lookup(implicitly[reflect.ClassTag[E]].runtimeClass)
- .getSerialVersionUID
- val hash = MurmurHash3.stringHash(s"$beginTime-$endTime-$version-$uid")
- val baseDir = s"${sys.env("PIO_FS_BASEDIR")}/view"
- val fileName = s"$baseDir/$name-$appName-$hash.parquet"
- try {
- sqlContext.parquetFile(fileName)
- } catch {
- case e: java.io.FileNotFoundException =>
- logger.info("Cached copy not found, reading from DB.")
- // if cached copy is found, use it. If not, grab from Storage
- val result: RDD[E] = PEventStore.find(
- appName = appName,
- channelName = channelName,
- startTime = startTime,
- untilTime = Some(endTime))(sc)
- .flatMap((e) => conversionFunction(e))
- import sqlContext.implicits._ // needed for RDD.toDF()
- val resultDF = result.toDF()
-
- resultDF.saveAsParquetFile(fileName)
- sqlContext.parquetFile(fileName)
- case e: java.lang.RuntimeException =>
- if (e.toString.contains("is not a Parquet file")) {
- logger.error(s"$fileName does not contain a valid Parquet file. " +
- "Please delete it and try again.")
- }
- throw e
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/LBatchView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/view/LBatchView.scala b/data/src/main/scala/io/prediction/data/view/LBatchView.scala
deleted file mode 100644
index f806056..0000000
--- a/data/src/main/scala/io/prediction/data/view/LBatchView.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.view
-
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventValidation
-import io.prediction.data.storage.DataMap
-import io.prediction.data.storage.Storage
-
-import org.joda.time.DateTime
-import scala.language.implicitConversions
-
-import scala.concurrent.ExecutionContext.Implicits.global // TODO
-
-@deprecated("Use LEvents or LEventStore instead.", "0.9.2")
-object ViewPredicates {
- def getStartTimePredicate(startTimeOpt: Option[DateTime])
- : (Event => Boolean) = {
- startTimeOpt.map(getStartTimePredicate).getOrElse(_ => true)
- }
-
- def getStartTimePredicate(startTime: DateTime): (Event => Boolean) = {
- e => (!(e.eventTime.isBefore(startTime) || e.eventTime.isEqual(startTime)))
- }
-
- def getUntilTimePredicate(untilTimeOpt: Option[DateTime])
- : (Event => Boolean) = {
- untilTimeOpt.map(getUntilTimePredicate).getOrElse(_ => true)
- }
-
- def getUntilTimePredicate(untilTime: DateTime): (Event => Boolean) = {
- _.eventTime.isBefore(untilTime)
- }
-
- def getEntityTypePredicate(entityTypeOpt: Option[String]): (Event => Boolean)
- = {
- entityTypeOpt.map(getEntityTypePredicate).getOrElse(_ => true)
- }
-
- def getEntityTypePredicate(entityType: String): (Event => Boolean) = {
- (_.entityType == entityType)
- }
-
- def getEventPredicate(eventOpt: Option[String]): (Event => Boolean)
- = {
- eventOpt.map(getEventPredicate).getOrElse(_ => true)
- }
-
- def getEventPredicate(event: String): (Event => Boolean) = {
- (_.event == event)
- }
-}
-
-@deprecated("Use LEvents instead.", "0.9.2")
-object ViewAggregators {
- def getDataMapAggregator(): ((Option[DataMap], Event) => Option[DataMap]) = {
- (p, e) => {
- e.event match {
- case "$set" => {
- if (p == None) {
- Some(e.properties)
- } else {
- p.map(_ ++ e.properties)
- }
- }
- case "$unset" => {
- if (p == None) {
- None
- } else {
- p.map(_ -- e.properties.keySet)
- }
- }
- case "$delete" => None
- case _ => p // do nothing for others
- }
- }
- }
-}
-
-@deprecated("Use LEvents instead.", "0.9.2")
-object EventSeq {
- // Need to
- // >>> import scala.language.implicitConversions
- // to enable implicit conversion. Only import in the code where this is
- // necessary to avoid confusion.
- implicit def eventSeqToList(es: EventSeq): List[Event] = es.events
- implicit def listToEventSeq(l: List[Event]): EventSeq = new EventSeq(l)
-}
-
-
-@deprecated("Use LEvents instead.", "0.9.2")
-class EventSeq(val events: List[Event]) {
- def filter(
- eventOpt: Option[String] = None,
- entityTypeOpt: Option[String] = None,
- startTimeOpt: Option[DateTime] = None,
- untilTimeOpt: Option[DateTime] = None): EventSeq = {
-
- events
- .filter(ViewPredicates.getEventPredicate(eventOpt))
- .filter(ViewPredicates.getStartTimePredicate(startTimeOpt))
- .filter(ViewPredicates.getUntilTimePredicate(untilTimeOpt))
- .filter(ViewPredicates.getEntityTypePredicate(entityTypeOpt))
- }
-
- def filter(p: (Event => Boolean)): EventSeq = events.filter(p)
-
- def aggregateByEntityOrdered[T](init: T, op: (T, Event) => T)
- : Map[String, T] = {
- events
- .groupBy( _.entityId )
- .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op))
- .toMap
- }
-
-
-}
-
-
-@deprecated("Use LEventStore instead.", "0.9.2")
-class LBatchView(
- val appId: Int,
- val startTime: Option[DateTime],
- val untilTime: Option[DateTime]) {
-
- @transient lazy val eventsDb = Storage.getLEvents()
-
- @transient lazy val _events = eventsDb.find(
- appId = appId,
- startTime = startTime,
- untilTime = untilTime).toList
-
- @transient lazy val events: EventSeq = new EventSeq(_events)
-
- /* Aggregate event data
- *
- * @param entityType only aggregate event with entityType
- * @param startTimeOpt if specified, only aggregate event after (inclusive)
- * startTimeOpt
- * @param untilTimeOpt if specified, only aggregate event until (exclusive)
- * endTimeOpt
- */
- def aggregateProperties(
- entityType: String,
- startTimeOpt: Option[DateTime] = None,
- untilTimeOpt: Option[DateTime] = None
- ): Map[String, DataMap] = {
-
- events
- .filter(entityTypeOpt = Some(entityType))
- .filter(e => EventValidation.isSpecialEvents(e.event))
- .aggregateByEntityOrdered(
- init = None,
- op = ViewAggregators.getDataMapAggregator())
- .filter{ case (k, v) => (v != None) }
- .mapValues(_.get)
-
- }
-
- /*
- def aggregateByEntityOrdered[T](
- predicate: Event => Boolean,
- init: T,
- op: (T, Event) => T): Map[String, T] = {
-
- _events
- .filter( predicate(_) )
- .groupBy( _.entityId )
- .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op))
- .toMap
-
- }
- */
-
- /*
- def groupByEntityOrdered[T](
- predicate: Event => Boolean,
- map: Event => T): Map[String, Seq[T]] = {
-
- _events
- .filter( predicate(_) )
- .groupBy( _.entityId )
- .mapValues( _.sortBy(_.eventTime.getMillis).map(map(_)) )
- .toMap
- }
- */
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/PBatchView.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/view/PBatchView.scala b/data/src/main/scala/io/prediction/data/view/PBatchView.scala
deleted file mode 100644
index 5b0f878..0000000
--- a/data/src/main/scala/io/prediction/data/view/PBatchView.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.view
-
-import io.prediction.data.storage.hbase.HBPEvents
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventValidation
-import io.prediction.data.storage.DataMap
-import io.prediction.data.storage.Storage
-
-import org.joda.time.DateTime
-
-import org.json4s.JValue
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-
-// each JValue data associated with the time it is set
-private[prediction] case class PropTime(val d: JValue, val t: Long) extends Serializable
-
-private[prediction] case class SetProp (
- val fields: Map[String, PropTime],
- // last set time. Note: fields could be empty with valid set time
- val t: Long) extends Serializable {
-
- def ++ (that: SetProp): SetProp = {
- val commonKeys = fields.keySet.intersect(that.fields.keySet)
-
- val common: Map[String, PropTime] = commonKeys.map { k =>
- val thisData = this.fields(k)
- val thatData = that.fields(k)
- // only keep the value with latest time
- val v = if (thisData.t > thatData.t) thisData else thatData
- (k, v)
- }.toMap
-
- val combinedFields = common ++
- (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
-
- // keep the latest set time
- val combinedT = if (this.t > that.t) this.t else that.t
-
- SetProp(
- fields = combinedFields,
- t = combinedT
- )
- }
-}
-
-private[prediction] case class UnsetProp (fields: Map[String, Long]) extends Serializable {
- def ++ (that: UnsetProp): UnsetProp = {
- val commonKeys = fields.keySet.intersect(that.fields.keySet)
-
- val common: Map[String, Long] = commonKeys.map { k =>
- val thisData = this.fields(k)
- val thatData = that.fields(k)
- // only keep the value with latest time
- val v = if (thisData > thatData) thisData else thatData
- (k, v)
- }.toMap
-
- val combinedFields = common ++
- (this.fields -- commonKeys) ++ (that.fields -- commonKeys)
-
- UnsetProp(
- fields = combinedFields
- )
- }
-}
-
-private[prediction] case class DeleteEntity (t: Long) extends Serializable {
- def ++ (that: DeleteEntity): DeleteEntity = {
- if (this.t > that.t) this else that
- }
-}
-
-private[prediction] case class EventOp (
- val setProp: Option[SetProp] = None,
- val unsetProp: Option[UnsetProp] = None,
- val deleteEntity: Option[DeleteEntity] = None
-) extends Serializable {
-
- def ++ (that: EventOp): EventOp = {
- EventOp(
- setProp = (setProp ++ that.setProp).reduceOption(_ ++ _),
- unsetProp = (unsetProp ++ that.unsetProp).reduceOption(_ ++ _),
- deleteEntity = (deleteEntity ++ that.deleteEntity).reduceOption(_ ++ _)
- )
- }
-
- def toDataMap(): Option[DataMap] = {
- setProp.flatMap { set =>
-
- val unsetKeys: Set[String] = unsetProp.map( unset =>
- unset.fields.filter{ case (k, v) => (v >= set.fields(k).t) }.keySet
- ).getOrElse(Set())
-
- val combinedFields = deleteEntity.map { delete =>
- if (delete.t >= set.t) {
- None
- } else {
- val deleteKeys: Set[String] = set.fields
- .filter { case (k, PropTime(kv, t)) =>
- (delete.t >= t)
- }.keySet
- Some(set.fields -- unsetKeys -- deleteKeys)
- }
- }.getOrElse{
- Some(set.fields -- unsetKeys)
- }
-
- // Note: mapValues() doesn't return concrete Map and causes
- // NotSerializableException issue. Use map(identity) to work around this.
- // see https://issues.scala-lang.org/browse/SI-7005
- combinedFields.map(f => DataMap(f.mapValues(_.d).map(identity)))
- }
- }
-
-}
-
-private[prediction] object EventOp {
- def apply(e: Event): EventOp = {
- val t = e.eventTime.getMillis
- e.event match {
- case "$set" => {
- val fields = e.properties.fields.mapValues(jv =>
- PropTime(jv, t)
- ).map(identity)
-
- EventOp(
- setProp = Some(SetProp(fields = fields, t = t))
- )
- }
- case "$unset" => {
- val fields = e.properties.fields.mapValues(jv => t).map(identity)
- EventOp(
- unsetProp = Some(UnsetProp(fields = fields))
- )
- }
- case "$delete" => {
- EventOp(
- deleteEntity = Some(DeleteEntity(t))
- )
- }
- case _ => {
- EventOp()
- }
- }
- }
-}
-
-@deprecated("Use PEvents or PEventStore instead.", "0.9.2")
-class PBatchView(
- val appId: Int,
- val startTime: Option[DateTime],
- val untilTime: Option[DateTime],
- val sc: SparkContext) {
-
- // NOTE: parallel Events DB interface
- @transient lazy val eventsDb = Storage.getPEvents()
-
- @transient lazy val _events: RDD[Event] =
- eventsDb.getByAppIdAndTimeAndEntity(
- appId = appId,
- startTime = startTime,
- untilTime = untilTime,
- entityType = None,
- entityId = None)(sc)
-
- // TODO: change to use EventSeq?
- @transient lazy val events: RDD[Event] = _events
-
- def aggregateProperties(
- entityType: String,
- startTimeOpt: Option[DateTime] = None,
- untilTimeOpt: Option[DateTime] = None
- ): RDD[(String, DataMap)] = {
-
- _events
- .filter( e => ((e.entityType == entityType) &&
- (EventValidation.isSpecialEvents(e.event))) )
- .map( e => (e.entityId, EventOp(e) ))
- .aggregateByKey[EventOp](EventOp())(
- // within same partition
- seqOp = { case (u, v) => u ++ v },
- // across partition
- combOp = { case (accu, u) => accu ++ u }
- )
- .mapValues(_.toDataMap)
- .filter{ case (k, v) => v.isDefined }
- .map{ case (k, v) => (k, v.get) }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/view/QuickTest.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/view/QuickTest.scala b/data/src/main/scala/io/prediction/data/view/QuickTest.scala
deleted file mode 100644
index 68ade1d..0000000
--- a/data/src/main/scala/io/prediction/data/view/QuickTest.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.view
-
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.LEvents
-import io.prediction.data.storage.EventValidation
-import io.prediction.data.storage.DataMap
-import io.prediction.data.storage.Storage
-
-import scala.concurrent.ExecutionContext.Implicits.global // TODO
-
-import grizzled.slf4j.Logger
-import org.joda.time.DateTime
-
-import scala.language.implicitConversions
-
-class TestHBLEvents() {
- @transient lazy val eventsDb = Storage.getLEvents()
-
- def run(): Unit = {
- val r = eventsDb.find(
- appId = 1,
- startTime = None,
- untilTime = None,
- entityType = Some("pio_user"),
- entityId = Some("3")).toList
- println(r)
- }
-}
-
-class TestSource(val appId: Int) {
- @transient lazy val logger = Logger[this.type]
- @transient lazy val batchView = new LBatchView(appId,
- None, None)
-
- def run(): Unit = {
- println(batchView.events)
- }
-}
-
-object QuickTest {
-
- def main(args: Array[String]) {
- val t = new TestHBLEvents()
- t.run()
-
- // val ts = new TestSource(args(0).toInt)
- // ts.run()
- }
-}
-
-object TestEventTime {
- @transient lazy val batchView = new LBatchView(9, None, None)
-
- // implicit def back2list(es: EventSeq) = es.events
-
- def main(args: Array[String]) {
- val e = batchView.events.filter(
- eventOpt = Some("rate"),
- startTimeOpt = Some(new DateTime(1998, 1, 1, 0, 0))
- // untilTimeOpt = Some(new DateTime(1997, 1, 1, 0, 0))
- )
- // untilTimeOpt = Some(new DateTime(2000, 1, 1, 0, 0)))
-
- e.foreach { println }
- println()
- println()
- println()
- val u = batchView.aggregateProperties("pio_item")
- u.foreach { println }
- println()
- println()
- println()
-
- // val l: Seq[Event] = e
- val l = e.map { _.entityId }
- l.foreach { println }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala b/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala
deleted file mode 100644
index 0b64afb..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/ConnectorException.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks
-
-/** Webhooks Connnector Exception
- *
- * @param message the detail message
- * @param cause the cause
- */
-private[prediction] class ConnectorException(message: String, cause: Throwable)
- extends Exception(message, cause) {
-
- /** Webhooks Connnector Exception with cause being set to null
- *
- * @param message the detail message
- */
- def this(message: String) = this(message, null)
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala b/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala
deleted file mode 100644
index 424b6ba..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/ConnectorUtil.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks
-
-import io.prediction.data.storage.Event
-import io.prediction.data.storage.EventJson4sSupport
-
-import org.json4s.Formats
-import org.json4s.DefaultFormats
-import org.json4s.JObject
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-
-private[prediction] object ConnectorUtil {
-
- implicit val eventJson4sFormats: Formats = DefaultFormats +
- new EventJson4sSupport.APISerializer
-
- // intentionally use EventJson4sSupport.APISerializer to convert
- // from JSON to Event object. Don't allow connector directly create
- // Event object so that the Event object formation is consistent
- // by enforcing JSON format
-
- def toEvent(connector: JsonConnector, data: JObject): Event = {
- read[Event](write(connector.toEventJson(data)))
- }
-
- def toEvent(connector: FormConnector, data: Map[String, String]): Event = {
- read[Event](write(connector.toEventJson(data)))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala
deleted file mode 100644
index 9087f31..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/FormConnector.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks
-
-import org.json4s.JObject
-
-/** Connector for Webhooks connection with Form submission data format
- */
-private[prediction] trait FormConnector {
-
- // TODO: support conversion to multiple events?
-
- /** Convert from original Form submission data to Event JObject
- * @param data Map of key-value pairs in String type received through webhooks
- * @return Event JObject
- */
- def toEventJson(data: Map[String, String]): JObject
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala
deleted file mode 100644
index e0e80fe..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/JsonConnector.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks
-
-import org.json4s.JObject
-
-/** Connector for Webhooks connection */
-private[prediction] trait JsonConnector {
-
- // TODO: support conversion to multiple events?
-
- /** Convert from original JObject to Event JObject
- * @param data original JObject recevived through webhooks
- * @return Event JObject
- */
- def toEventJson(data: JObject): JObject
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala
deleted file mode 100644
index f19e009..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/exampleform/ExampleFormConnector.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks.exampleform
-
-import io.prediction.data.webhooks.FormConnector
-import io.prediction.data.webhooks.ConnectorException
-
-import org.json4s.JObject
-
-
-/** Example FormConnector with following types of webhook form data inputs:
- *
- * UserAction
- *
- * "type"="userAction"
- * "userId"="as34smg4",
- * "event"="do_something",
- * "context[ip]"="24.5.68.47", // optional
- * "context[prop1]"="2.345", // optional
- * "context[prop2]"="value1" // optional
- * "anotherProperty1"="100",
- * "anotherProperty2"="optional1", // optional
- * "timestamp"="2015-01-02T00:30:12.984Z"
- *
- * UserActionItem
- *
- * "type"="userActionItem"
- * "userId"="as34smg4",
- * "event"="do_something_on",
- * "itemId"="kfjd312bc",
- * "context[ip]"="1.23.4.56",
- * "context[prop1]"="2.345",
- * "context[prop2]"="value1",
- * "anotherPropertyA"="4.567", // optional
- * "anotherPropertyB"="false", // optional
- * "timestamp"="2015-01-15T04:20:23.567Z"
- *
- */
-private[prediction] object ExampleFormConnector extends FormConnector {
-
- override
- def toEventJson(data: Map[String, String]): JObject = {
- val json = try {
- data.get("type") match {
- case Some("userAction") => userActionToEventJson(data)
- case Some("userActionItem") => userActionItemToEventJson(data)
- case Some(x) => throw new ConnectorException(
- s"Cannot convert unknown type ${x} to event JSON")
- case None => throw new ConnectorException(
- s"The field 'type' is required.")
- }
- } catch {
- case e: ConnectorException => throw e
- case e: Exception => throw new ConnectorException(
- s"Cannot convert ${data} to event JSON. ${e.getMessage()}", e)
- }
- json
- }
-
- def userActionToEventJson(data: Map[String, String]): JObject = {
- import org.json4s.JsonDSL._
-
- // two level optional data
- val context = if (data.exists(_._1.startsWith("context["))) {
- Some(
- ("ip" -> data.get("context[ip]")) ~
- ("prop1" -> data.get("context[prop1]").map(_.toDouble)) ~
- ("prop2" -> data.get("context[prop2]"))
- )
- } else {
- None
- }
-
- val json =
- ("event" -> data("event")) ~
- ("entityType" -> "user") ~
- ("entityId" -> data("userId")) ~
- ("eventTime" -> data("timestamp")) ~
- ("properties" -> (
- ("context" -> context) ~
- ("anotherProperty1" -> data("anotherProperty1").toInt) ~
- ("anotherProperty2" -> data.get("anotherProperty2"))
- ))
- json
- }
-
-
- def userActionItemToEventJson(data: Map[String, String]): JObject = {
- import org.json4s.JsonDSL._
-
- val json =
- ("event" -> data("event")) ~
- ("entityType" -> "user") ~
- ("entityId" -> data("userId")) ~
- ("targetEntityType" -> "item") ~
- ("targetEntityId" -> data("itemId")) ~
- ("eventTime" -> data("timestamp")) ~
- ("properties" -> (
- ("context" -> (
- ("ip" -> data("context[ip]")) ~
- ("prop1" -> data("context[prop1]").toDouble) ~
- ("prop2" -> data("context[prop2]"))
- )) ~
- ("anotherPropertyA" -> data.get("anotherPropertyA").map(_.toDouble)) ~
- ("anotherPropertyB" -> data.get("anotherPropertyB").map(_.toBoolean))
- ))
- json
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala
deleted file mode 100644
index 4d4b991..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/examplejson/ExampleJsonConnector.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks.examplejson
-
-import io.prediction.data.webhooks.JsonConnector
-import io.prediction.data.webhooks.ConnectorException
-
-import org.json4s.Formats
-import org.json4s.DefaultFormats
-import org.json4s.JObject
-
-/** Example JsonConnector with following types of webhooks JSON input:
- *
- * UserAction
- *
- * {
- * "type": "userAction"
- * "userId": "as34smg4",
- * "event": "do_something",
- * "context": {
- * "ip": "24.5.68.47",
- * "prop1": 2.345,
- * "prop2": "value1"
- * },
- * "anotherProperty1": 100,
- * "anotherProperty2": "optional1",
- * "timestamp": "2015-01-02T00:30:12.984Z"
- * }
- *
- * UserActionItem
- *
- * {
- * "type": "userActionItem"
- * "userId": "as34smg4",
- * "event": "do_something_on",
- * "itemId": "kfjd312bc",
- * "context": {
- * "ip": "1.23.4.56",
- * "prop1": 2.345,
- * "prop2": "value1"
- * },
- * "anotherPropertyA": 4.567,
- * "anotherPropertyB": false,
- * "timestamp": "2015-01-15T04:20:23.567Z"
- * }
- */
-private[prediction] object ExampleJsonConnector extends JsonConnector {
-
- implicit val json4sFormats: Formats = DefaultFormats
-
- override def toEventJson(data: JObject): JObject = {
- val common = try {
- data.extract[Common]
- } catch {
- case e: Exception => throw new ConnectorException(
- s"Cannot extract Common field from ${data}. ${e.getMessage()}", e)
- }
-
- val json = try {
- common.`type` match {
- case "userAction" =>
- toEventJson(common = common, userAction = data.extract[UserAction])
- case "userActionItem" =>
- toEventJson(common = common, userActionItem = data.extract[UserActionItem])
- case x: String =>
- throw new ConnectorException(
- s"Cannot convert unknown type '${x}' to Event JSON.")
- }
- } catch {
- case e: ConnectorException => throw e
- case e: Exception => throw new ConnectorException(
- s"Cannot convert ${data} to eventJson. ${e.getMessage()}", e)
- }
-
- json
- }
-
- def toEventJson(common: Common, userAction: UserAction): JObject = {
- import org.json4s.JsonDSL._
-
- // map to EventAPI JSON
- val json =
- ("event" -> userAction.event) ~
- ("entityType" -> "user") ~
- ("entityId" -> userAction.userId) ~
- ("eventTime" -> userAction.timestamp) ~
- ("properties" -> (
- ("context" -> userAction.context) ~
- ("anotherProperty1" -> userAction.anotherProperty1) ~
- ("anotherProperty2" -> userAction.anotherProperty2)
- ))
- json
- }
-
- def toEventJson(common: Common, userActionItem: UserActionItem): JObject = {
- import org.json4s.JsonDSL._
-
- // map to EventAPI JSON
- val json =
- ("event" -> userActionItem.event) ~
- ("entityType" -> "user") ~
- ("entityId" -> userActionItem.userId) ~
- ("targetEntityType" -> "item") ~
- ("targetEntityId" -> userActionItem.itemId) ~
- ("eventTime" -> userActionItem.timestamp) ~
- ("properties" -> (
- ("context" -> userActionItem.context) ~
- ("anotherPropertyA" -> userActionItem.anotherPropertyA) ~
- ("anotherPropertyB" -> userActionItem.anotherPropertyB)
- ))
- json
- }
-
- // Common required fields
- case class Common(
- `type`: String
- )
-
- // User Actions fields
- case class UserAction (
- userId: String,
- event: String,
- context: Option[JObject],
- anotherProperty1: Int,
- anotherProperty2: Option[String],
- timestamp: String
- )
-
- // UserActionItem fields
- case class UserActionItem (
- userId: String,
- event: String,
- itemId: String,
- context: JObject,
- anotherPropertyA: Option[Double],
- anotherPropertyB: Option[Boolean],
- timestamp: String
- )
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala
deleted file mode 100644
index b2793a0..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/mailchimp/MailChimpConnector.scala
+++ /dev/null
@@ -1,305 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package io.prediction.data.webhooks.mailchimp
-
-import io.prediction.data.webhooks.FormConnector
-import io.prediction.data.webhooks.ConnectorException
-import io.prediction.data.storage.EventValidation
-import io.prediction.data.Utils
-
-import org.json4s.JObject
-
-import org.joda.time.DateTime
-import org.joda.time.format.DateTimeFormat
-
-private[prediction] object MailChimpConnector extends FormConnector {
-
- override
- def toEventJson(data: Map[String, String]): JObject = {
-
- val json = data.get("type") match {
- case Some("subscribe") => subscribeToEventJson(data)
- // UNSUBSCRIBE
- case Some("unsubscribe") => unsubscribeToEventJson(data)
- // PROFILE UPDATES
- case Some("profile") => profileToEventJson(data)
- // EMAIL UPDATE
- case Some("upemail") => upemailToEventJson(data)
- // CLEANED EMAILS
- case Some("cleaned") => cleanedToEventJson(data)
- // CAMPAIGN SENDING STATUS
- case Some("campaign") => campaignToEventJson(data)
- // invalid type
- case Some(x) => throw new ConnectorException(
- s"Cannot convert unknown MailChimp data type ${x} to event JSON")
- case None => throw new ConnectorException(
- s"The field 'type' is required for MailChimp data.")
- }
- json
- }
-
-
- val mailChimpDateTimeFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
- .withZone(EventValidation.defaultTimeZone)
-
- def parseMailChimpDateTime(s: String): DateTime = {
- mailChimpDateTimeFormat.parseDateTime(s)
- }
-
- def subscribeToEventJson(data: Map[String, String]): JObject = {
-
- import org.json4s.JsonDSL._
-
- /*
- "type": "subscribe",
- "fired_at": "2009-03-26 21:35:57",
- "data[id]": "8a25ff1d98",
- "data[list_id]": "a6b5da1054",
- "data[email]": "api@mailchimp.com",
- "data[email_type]": "html",
- "data[merges][EMAIL]": "api@mailchimp.com",
- "data[merges][FNAME]": "MailChimp",
- "data[merges][LNAME]": "API",
- "data[merges][INTERESTS]": "Group1,Group2",
- "data[ip_opt]": "10.20.10.30",
- "data[ip_signup]": "10.20.10.30"
- */
-
- // convert to ISO8601 format
- val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
- // TODO: handle optional fields
- val json =
- ("event" -> "subscribe") ~
- ("entityType" -> "user") ~
- ("entityId" -> data("data[id]")) ~
- ("targetEntityType" -> "list") ~
- ("targetEntityId" -> data("data[list_id]")) ~
- ("eventTime" -> eventTime) ~
- ("properties" -> (
- ("email" -> data("data[email]")) ~
- ("email_type" -> data("data[email_type]")) ~
- ("merges" -> (
- ("EMAIL" -> data("data[merges][EMAIL]")) ~
- ("FNAME" -> data("data[merges][FNAME]"))) ~
- ("LNAME" -> data("data[merges][LNAME]")) ~
- ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
- )) ~
- ("ip_opt" -> data("data[ip_opt]")) ~
- ("ip_signup" -> data("data[ip_signup]")
- ))
-
- json
-
- }
-
- def unsubscribeToEventJson(data: Map[String, String]): JObject = {
-
- import org.json4s.JsonDSL._
-
- /*
- "action" will either be "unsub" or "delete".
- The reason will be "manual" unless caused by a spam complaint - then it will be "abuse"
-
- "type": "unsubscribe",
- "fired_at": "2009-03-26 21:40:57",
- "data[action]": "unsub",
- "data[reason]": "manual",
- "data[id]": "8a25ff1d98",
- "data[list_id]": "a6b5da1054",
- "data[email]": "api+unsub@mailchimp.com",
- "data[email_type]": "html",
- "data[merges][EMAIL]": "api+unsub@mailchimp.com",
- "data[merges][FNAME]": "MailChimp",
- "data[merges][LNAME]": "API",
- "data[merges][INTERESTS]": "Group1,Group2",
- "data[ip_opt]": "10.20.10.30",
- "data[campaign_id]": "cb398d21d2",
- */
-
- // convert to ISO8601 format
- val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
- val json =
- ("event" -> "unsubscribe") ~
- ("entityType" -> "user") ~
- ("entityId" -> data("data[id]")) ~
- ("targetEntityType" -> "list") ~
- ("targetEntityId" -> data("data[list_id]")) ~
- ("eventTime" -> eventTime) ~
- ("properties" -> (
- ("action" -> data("data[action]")) ~
- ("reason" -> data("data[reason]")) ~
- ("email" -> data("data[email]")) ~
- ("email_type" -> data("data[email_type]")) ~
- ("merges" -> (
- ("EMAIL" -> data("data[merges][EMAIL]")) ~
- ("FNAME" -> data("data[merges][FNAME]"))) ~
- ("LNAME" -> data("data[merges][LNAME]")) ~
- ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
- )) ~
- ("ip_opt" -> data("data[ip_opt]")) ~
- ("campaign_id" -> data("data[campaign_id]")
- ))
-
- json
-
- }
-
- def profileToEventJson(data: Map[String, String]): JObject = {
-
- import org.json4s.JsonDSL._
-
- /*
- "type": "profile",
- "fired_at": "2009-03-26 21:31:21",
- "data[id]": "8a25ff1d98",
- "data[list_id]": "a6b5da1054",
- "data[email]": "api@mailchimp.com",
- "data[email_type]": "html",
- "data[merges][EMAIL]": "api@mailchimp.com",
- "data[merges][FNAME]": "MailChimp",
- "data[merges][LNAME]": "API",
- "data[merges][INTERESTS]": "Group1,Group2", \\OPTIONAL
- "data[ip_opt]": "10.20.10.30"
- */
-
- // convert to ISO8601 format
- val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
- val json =
- ("event" -> "profile") ~
- ("entityType" -> "user") ~
- ("entityId" -> data("data[id]")) ~
- ("targetEntityType" -> "list") ~
- ("targetEntityId" -> data("data[list_id]")) ~
- ("eventTime" -> eventTime) ~
- ("properties" -> (
- ("email" -> data("data[email]")) ~
- ("email_type" -> data("data[email_type]")) ~
- ("merges" -> (
- ("EMAIL" -> data("data[merges][EMAIL]")) ~
- ("FNAME" -> data("data[merges][FNAME]"))) ~
- ("LNAME" -> data("data[merges][LNAME]")) ~
- ("INTERESTS" -> data.get("data[merges][INTERESTS]"))
- )) ~
- ("ip_opt" -> data("data[ip_opt]")
- ))
-
- json
-
- }
-
- def upemailToEventJson(data: Map[String, String]): JObject = {
-
- import org.json4s.JsonDSL._
-
- /*
- "type": "upemail",
- "fired_at": "2009-03-26 22:15:09",
- "data[list_id]": "a6b5da1054",
- "data[new_id]": "51da8c3259",
- "data[new_email]": "api+new@mailchimp.com",
- "data[old_email]": "api+old@mailchimp.com"
- */
-
- // convert to ISO8601 format
- val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
- val json =
- ("event" -> "upemail") ~
- ("entityType" -> "user") ~
- ("entityId" -> data("data[new_id]")) ~
- ("targetEntityType" -> "list") ~
- ("targetEntityId" -> data("data[list_id]")) ~
- ("eventTime" -> eventTime) ~
- ("properties" -> (
- ("new_email" -> data("data[new_email]")) ~
- ("old_email" -> data("data[old_email]"))
- ))
-
- json
-
- }
-
- def cleanedToEventJson(data: Map[String, String]): JObject = {
-
- import org.json4s.JsonDSL._
-
- /*
- Reason will be one of "hard" (for hard bounces) or "abuse"
- "type": "cleaned",
- "fired_at": "2009-03-26 22:01:00",
- "data[list_id]": "a6b5da1054",
- "data[campaign_id]": "4fjk2ma9xd",
- "data[reason]": "hard",
- "data[email]": "api+cleaned@mailchimp.com"
- */
-
- // convert to ISO8601 format
- val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
- val json =
- ("event" -> "cleaned") ~
- ("entityType" -> "list") ~
- ("entityId" -> data("data[list_id]")) ~
- ("eventTime" -> eventTime) ~
- ("properties" -> (
- ("campaignId" -> data("data[campaign_id]")) ~
- ("reason" -> data("data[reason]")) ~
- ("email" -> data("data[email]"))
- ))
-
- json
-
- }
-
- def campaignToEventJson(data: Map[String, String]): JObject = {
-
- import org.json4s.JsonDSL._
-
- /*
- "type": "campaign",
- "fired_at": "2009-03-26 21:31:21",
- "data[id]": "5aa2102003",
- "data[subject]": "Test Campaign Subject",
- "data[status]": "sent",
- "data[reason]": "",
- "data[list_id]": "a6b5da1054"
- */
-
- // convert to ISO8601 format
- val eventTime = Utils.dateTimeToString(parseMailChimpDateTime(data("fired_at")))
-
- val json =
- ("event" -> "campaign") ~
- ("entityType" -> "campaign") ~
- ("entityId" -> data("data[id]")) ~
- ("targetEntityType" -> "list") ~
- ("targetEntityId" -> data("data[list_id]")) ~
- ("eventTime" -> eventTime) ~
- ("properties" -> (
- ("subject" -> data("data[subject]")) ~
- ("status" -> data("data[status]")) ~
- ("reason" -> data("data[reason]"))
- ))
-
- json
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala b/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala
deleted file mode 100644
index 318043c..0000000
--- a/data/src/main/scala/io/prediction/data/webhooks/segmentio/SegmentIOConnector.scala
+++ /dev/null
@@ -1,306 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.prediction.data.webhooks.segmentio
-
-import io.prediction.data.webhooks.{ConnectorException, JsonConnector}
-import org.json4s._
-
-private[prediction] object SegmentIOConnector extends JsonConnector {
-
- // private lazy val supportedAPI = Vector("2", "2.0", "2.0.0")
-
- implicit val json4sFormats: Formats = DefaultFormats
-
- override
- def toEventJson(data: JObject): JObject = {
- try {
- val version: String = data.values("version").toString
-/*
- if (!supportedAPI.contains(version)) {
- throw new ConnectorException(
- s"Supported segment.io API versions: [2]. got [$version]"
- )
- }
-*/
- } catch { case _: Throwable \u21d2
- throw new ConnectorException(s"Failed to get segment.io API version.")
- }
-
- val common = try {
- data.extract[Common]
- } catch {
- case e: Throwable \u21d2 throw new ConnectorException(
- s"Cannot extract Common field from $data. ${e.getMessage}", e
- )
- }
-
- try {
- common.`type` match {
- case "identify" \u21d2
- toEventJson(
- common = common,
- identify = data.extract[Events.Identify]
- )
-
- case "track" \u21d2
- toEventJson(
- common = common,
- track = data.extract[Events.Track]
- )
-
- case "alias" \u21d2
- toEventJson(
- common = common,
- alias = data.extract[Events.Alias]
- )
-
- case "page" \u21d2
- toEventJson(
- common = common,
- page = data.extract[Events.Page]
- )
-
- case "screen" \u21d2
- toEventJson(
- common = common,
- screen = data.extract[Events.Screen]
- )
-
- case "group" \u21d2
- toEventJson(
- common = common,
- group = data.extract[Events.Group]
- )
-
- case _ \u21d2
- throw new ConnectorException(
- s"Cannot convert unknown type ${common.`type`} to event JSON."
- )
- }
- } catch {
- case e: ConnectorException => throw e
- case e: Exception =>
- throw new ConnectorException(
- s"Cannot convert $data to event JSON. ${e.getMessage}", e
- )
- }
- }
-
- def toEventJson(common: Common, identify: Events.Identify ): JObject = {
- import org.json4s.JsonDSL._
- val eventProperties = "traits" \u2192 identify.traits
- toJson(common, eventProperties)
- }
-
- def toEventJson(common: Common, track: Events.Track): JObject = {
- import org.json4s.JsonDSL._
- val eventProperties =
- ("properties" \u2192 track.properties) ~
- ("event" \u2192 track.event)
- toJson(common, eventProperties)
- }
-
- def toEventJson(common: Common, alias: Events.Alias): JObject = {
- import org.json4s.JsonDSL._
- toJson(common, "previous_id" \u2192 alias.previous_id)
- }
-
- def toEventJson(common: Common, screen: Events.Screen): JObject = {
- import org.json4s.JsonDSL._
- val eventProperties =
- ("name" \u2192 screen.name) ~
- ("properties" \u2192 screen.properties)
- toJson(common, eventProperties)
- }
-
- def toEventJson(common: Common, page: Events.Page): JObject = {
- import org.json4s.JsonDSL._
- val eventProperties =
- ("name" \u2192 page.name) ~
- ("properties" \u2192 page.properties)
- toJson(common, eventProperties)
- }
-
- def toEventJson(common: Common, group: Events.Group): JObject = {
- import org.json4s.JsonDSL._
- val eventProperties =
- ("group_id" \u2192 group.group_id) ~
- ("traits" \u2192 group.traits)
- toJson(common, eventProperties)
- }
-
- private def toJson(common: Common, props: JObject): JsonAST.JObject = {
- val commonFields = commonToJson(common)
- JObject(("properties" \u2192 properties(common, props)) :: commonFields.obj)
- }
-
- private def properties(common: Common, eventProps: JObject): JObject = {
- import org.json4s.JsonDSL._
- common.context map { context \u21d2
- try {
- ("context" \u2192 Extraction.decompose(context)) ~ eventProps
- } catch {
- case e: Throwable \u21d2
- throw new ConnectorException(
- s"Cannot convert $context to event JSON. ${e.getMessage }", e
- )
- }
- } getOrElse eventProps
- }
-
- private def commonToJson(common: Common): JObject =
- commonToJson(common, common.`type`)
-
- private def commonToJson(common: Common, typ: String): JObject = {
- import org.json4s.JsonDSL._
- common.user_id.orElse(common.anonymous_id) match {
- case Some(userId) \u21d2
- ("event" \u2192 typ) ~
- ("entityType" \u2192 "user") ~
- ("entityId" \u2192 userId) ~
- ("eventTime" \u2192 common.timestamp)
-
- case None \u21d2
- throw new ConnectorException(
- "there was no `userId` or `anonymousId` in the common fields."
- )
- }
- }
-}
-
-object Events {
-
- private[prediction] case class Track(
- event: String,
- properties: Option[JObject] = None
- )
-
- private[prediction] case class Alias(previous_id: String, user_id: String)
-
- private[prediction] case class Group(
- group_id: String,
- traits: Option[JObject] = None
- )
-
- private[prediction] case class Screen(
- name: Option[String] = None,
- properties: Option[JObject] = None
- )
-
- private[prediction] case class Page(
- name: Option[String] = None,
- properties: Option[JObject] = None
- )
-
- private[prediction] case class Identify(
- user_id: String,
- traits: Option[JObject]
- )
-
-}
-
-object Common {
-
- private[prediction] case class Integrations(
- All: Boolean = false,
- Mixpanel: Boolean = false,
- Marketo: Boolean = false,
- Salesforse: Boolean = false
- )
-
- private[prediction] case class Context(
- ip: String,
- library: Library,
- user_agent: String,
- app: Option[App] = None,
- campaign: Option[Campaign] = None,
- device: Option[Device] = None,
- network: Option[Network] = None,
- location: Option[Location] = None,
- os: Option[OS] = None,
- referrer: Option[Referrer] = None,
- screen: Option[Screen] = None,
- timezone: Option[String] = None
- )
-
- private[prediction] case class Screen(width: Int, height: Int, density: Int)
-
- private[prediction] case class Referrer(id: String, `type`: String)
-
- private[prediction] case class OS(name: String, version: String)
-
- private[prediction] case class Location(
- city: Option[String] = None,
- country: Option[String] = None,
- latitude: Option[Double] = None,
- longitude: Option[Double] = None,
- speed: Option[Int] = None
- )
-
- case class Page(
- path: String,
- referrer: String,
- search: String,
- title: String,
- url: String
- )
-
- private[prediction] case class Network(
- bluetooth: Option[Boolean] = None,
- carrier: Option[String] = None,
- cellular: Option[Boolean] = None,
- wifi: Option[Boolean] = None
- )
-
- private[prediction] case class Library(name: String, version: String)
-
- private[prediction] case class Device(
- id: Option[String] = None,
- advertising_id: Option[String] = None,
- ad_tracking_enabled: Option[Boolean] = None,
- manufacturer: Option[String] = None,
- model: Option[String] = None,
- name: Option[String] = None,
- `type`: Option[String] = None,
- token: Option[String] = None
- )
-
- private[prediction] case class Campaign(
- name: Option[String] = None,
- source: Option[String] = None,
- medium: Option[String] = None,
- term: Option[String] = None,
- content: Option[String] = None
- )
-
- private[prediction] case class App(
- name: Option[String] = None,
- version: Option[String] = None,
- build: Option[String] = None
- )
-
-}
-
-private[prediction] case class Common(
- `type`: String,
- sent_at: String,
- timestamp: String,
- version: String,
- anonymous_id: Option[String] = None,
- user_id: Option[String] = None,
- context: Option[Common.Context] = None,
- integrations: Option[Common.Integrations] = None
-)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/Utils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/Utils.scala b/data/src/main/scala/org/apache/predictionio/data/Utils.scala
new file mode 100644
index 0000000..db8c7a2
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/Utils.scala
@@ -0,0 +1,50 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data
+
+import org.joda.time.DateTime
+import org.joda.time.format.ISODateTimeFormat
+
+import java.lang.IllegalArgumentException
+
+private[prediction] object Utils {
+
+ // use dateTime() for strict ISO8601 format
+ val dateTimeFormatter = ISODateTimeFormat.dateTime().withOffsetParsed()
+
+ val dateTimeNoMillisFormatter =
+ ISODateTimeFormat.dateTimeNoMillis().withOffsetParsed()
+
+ def stringToDateTime(dt: String): DateTime = {
+ // We accept two formats.
+ // 1. "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"
+ // 2. "yyyy-MM-dd'T'HH:mm:ssZZ"
+ // The first one also takes milliseconds into account.
+ try {
+ // formatting for "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"
+ dateTimeFormatter.parseDateTime(dt)
+ } catch {
+ case e: IllegalArgumentException => {
+ // handle when the datetime string doesn't specify milliseconds.
+ dateTimeNoMillisFormatter.parseDateTime(dt)
+ }
+ }
+ }
+
+ def dateTimeToString(dt: DateTime): String = dateTimeFormatter.print(dt)
+ // dt.toString
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
new file mode 100644
index 0000000..c380daa
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala
@@ -0,0 +1,80 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.webhooks.ConnectorException
+import org.apache.predictionio.data.storage.StorageException
+
+import spray.routing._
+import spray.routing.Directives._
+import spray.routing.Rejection
+import spray.http.StatusCodes
+import spray.http.StatusCode
+import spray.httpx.Json4sSupport
+
+import org.json4s.Formats
+import org.json4s.DefaultFormats
+
+object Common {
+
+ object Json4sProtocol extends Json4sSupport {
+ implicit def json4sFormats: Formats = DefaultFormats
+ }
+
+ import Json4sProtocol._
+
+ val rejectionHandler = RejectionHandler {
+ case MalformedRequestContentRejection(msg, _) :: _ =>
+ complete(StatusCodes.BadRequest, Map("message" -> msg))
+ case MissingQueryParamRejection(msg) :: _ =>
+ complete(StatusCodes.NotFound,
+ Map("message" -> s"missing required query parameter ${msg}."))
+ case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => {
+ val msg = cause match {
+ case AuthenticationFailedRejection.CredentialsRejected =>
+ "Invalid accessKey."
+ case AuthenticationFailedRejection.CredentialsMissing =>
+ "Missing accessKey."
+ }
+ complete(StatusCodes.Unauthorized, challengeHeaders, Map("message" -> msg))
+ }
+ case ChannelRejection(msg) :: _ =>
+ complete(StatusCodes.Unauthorized, Map("message" -> msg))
+ case NonExistentAppRejection(msg) :: _ =>
+ complete(StatusCodes.Unauthorized, Map("message" -> msg))
+ }
+
+ val exceptionHandler = ExceptionHandler {
+ case e: ConnectorException => {
+ val msg = s"${e.getMessage()}"
+ complete(StatusCodes.BadRequest, Map("message" -> msg))
+ }
+ case e: StorageException => {
+ val msg = s"${e.getMessage()}"
+ complete(StatusCodes.InternalServerError, Map("message" -> msg))
+ }
+ case e: Exception => {
+ val msg = s"${e.getMessage()}"
+ complete(StatusCodes.InternalServerError, Map("message" -> msg))
+ }
+ }
+}
+
+/** invalid channel */
+case class ChannelRejection(msg: String) extends Rejection
+
+/** the app doesn't exist */
+case class NonExistentAppRejection(msg: String) extends Rejection
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala
new file mode 100644
index 0000000..e25234f
--- /dev/null
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventInfo.scala
@@ -0,0 +1,24 @@
+/** Copyright 2015 TappingStone, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.api
+
+import org.apache.predictionio.data.storage.Event
+
+case class EventInfo(
+ appId: Int,
+ channelId: Option[Int],
+ event: Event)
+