You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ta...@apache.org on 2018/10/29 14:15:52 UTC
[predictionio] branch develop updated: [PIO-182] Add async methods
to LEventStore (#482)
This is an automated email from the ASF dual-hosted git repository.
takezoe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/predictionio.git
The following commit(s) were added to refs/heads/develop by this push:
new a479fe3 [PIO-182] Add async methods to LEventStore (#482)
a479fe3 is described below
commit a479fe3e8edf087bc51037d4ec5118b5f6e9c33a
Author: Naoki Takezoe <ta...@gmail.com>
AuthorDate: Mon Oct 29 23:15:45 2018 +0900
[PIO-182] Add async methods to LEventStore (#482)
---
.../predictionio/data/store/LEventStore.scala | 138 +++++++++++++++++++--
.../data/store/java/LJavaEventStore.scala | 134 +++++++++++++++++++-
2 files changed, 262 insertions(+), 10 deletions(-)
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
index 3a82e98..a73ee80 100644
--- a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
@@ -20,15 +20,30 @@ package org.apache.predictionio.data.store
import org.apache.predictionio.data.storage.Storage
import org.apache.predictionio.data.storage.Event
-
import org.joda.time.DateTime
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
/** This object provides a set of operation to access Event Store
- * without going through Spark's parallelization
+ * without going through Spark's parallelization.
+ *
+ * Note that blocking methods of this object uses
+ * `scala.concurrent.ExecutionContext.Implicits.global` internally.
+ * Since this is a thread pool which has a number of threads equal to available
+ * processors, parallelism is limited up to the number of processors.
+ *
+ * If this limitation become bottleneck of resource usage, you can increase the
+ * number of threads by declaring following VM options before calling "pio deploy":
+ *
+ * <pre>
+ * export JAVA_OPTS="$JAVA_OPTS \
+ * -Dscala.concurrent.context.numThreads=1000 \
+ * -Dscala.concurrent.context.maxThreads=1000"
+ * </pre>
+ *
+ * You can learn more about the global execution context in the Scala documentation:
+ * [[https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context]]
*/
object LEventStore {
@@ -72,9 +87,62 @@ object LEventStore {
latest: Boolean = true,
timeout: Duration = defaultTimeout): Iterator[Event] = {
+ // Import here to ensure ExecutionContext.Implicits.global is used only in this method
+ import scala.concurrent.ExecutionContext.Implicits.global
+
+ Await.result(findByEntityAsync(
+ appName = appName,
+ entityType = entityType,
+ entityId = entityId,
+ channelName = channelName,
+ eventNames = eventNames,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ startTime = startTime,
+ untilTime = untilTime,
+ limit = limit,
+ latest = latest),
+ timeout)
+ }
+
+ /** Reads events of the specified entity. May use this in Algorithm's predict()
+ * or Serving logic to have fast event store access.
+ *
+ * @param appName return events of this app
+ * @param entityType return events of this entityType
+ * @param entityId return events of this entityId
+ * @param channelName return events of this channel (default channel if it's None)
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param limit Limit number of events. Get all events if None or Some(-1)
+ * @param latest Return latest event first (default true)
+ * @return Future[Iterator[Event]]
+ */
+ def findByEntityAsync(
+ appName: String,
+ entityType: String,
+ entityId: String,
+ channelName: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ limit: Option[Int] = None,
+ latest: Boolean = true)(implicit ec: ExecutionContext): Future[Iterator[Event]] = {
+
val (appId, channelId) = Common.appNameToId(appName, channelName)
- Await.result(eventsDb.futureFind(
+ eventsDb.futureFind(
appId = appId,
channelId = channelId,
startTime = startTime,
@@ -85,8 +153,7 @@ object LEventStore {
targetEntityType = targetEntityType,
targetEntityId = targetEntityId,
limit = limit,
- reversed = Some(latest)),
- timeout)
+ reversed = Some(latest))
}
/** Reads events generically. If entityType or entityId is not specified, it
@@ -127,9 +194,62 @@ object LEventStore {
limit: Option[Int] = None,
timeout: Duration = defaultTimeout): Iterator[Event] = {
+ // Import here to ensure ExecutionContext.Implicits.global is used only in this method
+ import scala.concurrent.ExecutionContext.Implicits.global
+
+ Await.result(findAsync(
+ appName = appName,
+ entityType = entityType,
+ entityId = entityId,
+ channelName = channelName,
+ eventNames = eventNames,
+ targetEntityType = targetEntityType,
+ targetEntityId = targetEntityId,
+ startTime = startTime,
+ untilTime = untilTime,
+ limit = limit), timeout)
+ }
+
+ /** Reads events generically. If entityType or entityId is not specified, it
+ * results in table scan.
+ *
+ * @param appName return events of this app
+ * @param entityType return events of this entityType
+ * - None means no restriction on entityType
+ * - Some(x) means entityType should match x.
+ * @param entityId return events of this entityId
+ * - None means no restriction on entityId
+ * - Some(x) means entityId should match x.
+ * @param channelName return events of this channel (default channel if it's None)
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param limit Limit number of events. Get all events if None or Some(-1)
+ * @return Future[Iterator[Event]]
+ */
+ def findAsync(
+ appName: String,
+ entityType: Option[String] = None,
+ entityId: Option[String] = None,
+ channelName: Option[String] = None,
+ eventNames: Option[Seq[String]] = None,
+ targetEntityType: Option[Option[String]] = None,
+ targetEntityId: Option[Option[String]] = None,
+ startTime: Option[DateTime] = None,
+ untilTime: Option[DateTime] = None,
+ limit: Option[Int] = None)(implicit ec: ExecutionContext): Future[Iterator[Event]] = {
+
val (appId, channelId) = Common.appNameToId(appName, channelName)
- Await.result(eventsDb.futureFind(
+ eventsDb.futureFind(
appId = appId,
channelId = channelId,
startTime = startTime,
@@ -139,7 +259,7 @@ object LEventStore {
eventNames = eventNames,
targetEntityType = targetEntityType,
targetEntityId = targetEntityId,
- limit = limit), timeout)
+ limit = limit)
}
}
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
index f4fd676..6f39feb 100644
--- a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
@@ -18,15 +18,35 @@
package org.apache.predictionio.data.store.java
+import java.util.concurrent.{CompletableFuture, CompletionStage, ExecutorService}
+
import org.apache.predictionio.data.storage.Event
import org.apache.predictionio.data.store.LEventStore
import org.joda.time.DateTime
import scala.collection.JavaConversions
import scala.concurrent.duration.Duration
+import scala.compat.java8.FutureConverters._
/** This Java-friendly object provides a set of operation to access Event Store
- * without going through Spark's parallelization
+ * without going through Spark's parallelization.
+ *
+ * Note that blocking methods of this object uses
+ * `scala.concurrent.ExecutionContext.Implicits.global` internally.
+ * Since this is a thread pool which has a number of threads equal to available
+ * processors, parallelism is limited up to the number of processors.
+ *
+ * If this limitation become bottleneck of resource usage, you can increase the
+ * number of threads by declaring following VM options before calling "pio deploy":
+ *
+ * <pre>
+ * export JAVA_OPTS="$JAVA_OPTS \
+ * -Dscala.concurrent.context.numThreads=1000 \
+ * -Dscala.concurrent.context.maxThreads=1000"
+ * </pre>
+ *
+ * You can learn more about the global execution context in the Scala documentation:
+ * [[https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context]]
*/
object LJavaEventStore {
@@ -86,6 +106,61 @@ object LJavaEventStore {
).toSeq)
}
+ /** Reads events of the specified entity. May use this in Algorithm's predict()
+ * or Serving logic to have fast event store access.
+ *
+ * @param appName return events of this app
+ * @param entityType return events of this entityType
+ * @param entityId return events of this entityId
+ * @param channelName return events of this channel (default channel if it's None)
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param limit Limit number of events. Get all events if None or Some(-1)
+ * @param latest Return latest event first
+ * @return CompletableFuture[java.util.List[Event]]
+ */
+ def findByEntityAsync(
+ appName: String,
+ entityType: String,
+ entityId: String,
+ channelName: Option[String],
+ eventNames: Option[java.util.List[String]],
+ targetEntityType: Option[Option[String]],
+ targetEntityId: Option[Option[String]],
+ startTime: Option[DateTime],
+ untilTime: Option[DateTime],
+ limit: Option[Integer],
+ latest: Boolean,
+ executorService: ExecutorService): CompletableFuture[java.util.List[Event]] = {
+
+ val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+ val limitInt = limit.map(_.intValue())
+ implicit val ec = fromExecutorService(executorService)
+
+ LEventStore.findByEntityAsync(
+ appName,
+ entityType,
+ entityId,
+ channelName,
+ eventNamesSeq,
+ targetEntityType,
+ targetEntityId,
+ startTime,
+ untilTime,
+ limitInt,
+ latest
+ ).map { x => JavaConversions.seqAsJavaList(x.toSeq) }.toJava.toCompletableFuture
+ }
+
/** Reads events generically. If entityType or entityId is not specified, it
* results in table scan.
*
@@ -142,4 +217,61 @@ object LJavaEventStore {
timeout
).toSeq)
}
+
+ /** Reads events generically. If entityType or entityId is not specified, it
+ * results in table scan.
+ *
+ * @param appName return events of this app
+ * @param entityType return events of this entityType
+ * - None means no restriction on entityType
+ * - Some(x) means entityType should match x.
+ * @param entityId return events of this entityId
+ * - None means no restriction on entityId
+ * - Some(x) means entityId should match x.
+ * @param channelName return events of this channel (default channel if it's None)
+ * @param eventNames return events with any of these event names.
+ * @param targetEntityType return events of this targetEntityType:
+ * - None means no restriction on targetEntityType
+ * - Some(None) means no targetEntityType for this event
+ * - Some(Some(x)) means targetEntityType should match x.
+ * @param targetEntityId return events of this targetEntityId
+ * - None means no restriction on targetEntityId
+ * - Some(None) means no targetEntityId for this event
+ * - Some(Some(x)) means targetEntityId should match x.
+ * @param startTime return events with eventTime >= startTime
+ * @param untilTime return events with eventTime < untilTime
+ * @param limit Limit number of events. Get all events if None or Some(-1)
+ * @return CompletableFuture[java.util.List[Event]]
+ */
+ def findAsync(
+ appName: String,
+ entityType: Option[String],
+ entityId: Option[String],
+ channelName: Option[String],
+ eventNames: Option[java.util.List[String]],
+ targetEntityType: Option[Option[String]],
+ targetEntityId: Option[Option[String]],
+ startTime: Option[DateTime],
+ untilTime: Option[DateTime],
+ limit: Option[Integer],
+ executorService: ExecutorService): CompletableFuture[java.util.List[Event]] = {
+
+ val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+ val limitInt = limit.map(_.intValue())
+ implicit val ec = fromExecutorService(executorService)
+
+ LEventStore.findAsync(
+ appName,
+ entityType,
+ entityId,
+ channelName,
+ eventNamesSeq,
+ targetEntityType,
+ targetEntityId,
+ startTime,
+ untilTime,
+ limitInt
+ ).map { x => JavaConversions.seqAsJavaList(x.toSeq) }.toJava.toCompletableFuture
+ }
+
}