You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ta...@apache.org on 2018/10/29 14:15:52 UTC

[predictionio] branch develop updated: [PIO-182] Add async methods to LEventStore (#482)

This is an automated email from the ASF dual-hosted git repository.

takezoe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/predictionio.git


The following commit(s) were added to refs/heads/develop by this push:
     new a479fe3  [PIO-182] Add async methods to LEventStore (#482)
a479fe3 is described below

commit a479fe3e8edf087bc51037d4ec5118b5f6e9c33a
Author: Naoki Takezoe <ta...@gmail.com>
AuthorDate: Mon Oct 29 23:15:45 2018 +0900

    [PIO-182] Add async methods to LEventStore (#482)
---
 .../predictionio/data/store/LEventStore.scala      | 138 +++++++++++++++++++--
 .../data/store/java/LJavaEventStore.scala          | 134 +++++++++++++++++++-
 2 files changed, 262 insertions(+), 10 deletions(-)

diff --git a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
index 3a82e98..a73ee80 100644
--- a/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/store/LEventStore.scala
@@ -20,15 +20,30 @@ package org.apache.predictionio.data.store
 
 import org.apache.predictionio.data.storage.Storage
 import org.apache.predictionio.data.storage.Event
-
 import org.joda.time.DateTime
 
-import scala.concurrent.Await
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration.Duration
 
 /** This object provides a set of operation to access Event Store
-  * without going through Spark's parallelization
+  * without going through Spark's parallelization.
+  *
+  * Note that blocking methods of this object uses
+  * `scala.concurrent.ExecutionContext.Implicits.global` internally.
+  * Since this is a thread pool which has a number of threads equal to available
+  * processors, parallelism is limited up to the number of processors.
+  *
+  * If this limitation become bottleneck of resource usage, you can increase the
+  * number of threads by declaring following VM options before calling "pio deploy":
+  *
+  * <pre>
+  * export JAVA_OPTS="$JAVA_OPTS \
+  *   -Dscala.concurrent.context.numThreads=1000 \
+  *   -Dscala.concurrent.context.maxThreads=1000"
+  * </pre>
+  *
+  * You can learn more about the global execution context in the Scala documentation:
+  * [[https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context]]
   */
 object LEventStore {
 
@@ -72,9 +87,62 @@ object LEventStore {
     latest: Boolean = true,
     timeout: Duration = defaultTimeout): Iterator[Event] = {
 
+    // Import here to ensure ExecutionContext.Implicits.global is used only in this method
+    import scala.concurrent.ExecutionContext.Implicits.global
+
+    Await.result(findByEntityAsync(
+      appName = appName,
+      entityType = entityType,
+      entityId = entityId,
+      channelName = channelName,
+      eventNames = eventNames,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      startTime = startTime,
+      untilTime = untilTime,
+      limit = limit,
+      latest = latest),
+      timeout)
+  }
+
+  /** Reads events of the specified entity. May use this in Algorithm's predict()
+    * or Serving logic to have fast event store access.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @param latest Return latest event first (default true)
+    * @return Future[Iterator[Event]]
+    */
+  def findByEntityAsync(
+    appName: String,
+    entityType: String,
+    entityId: String,
+    channelName: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    limit: Option[Int] = None,
+    latest: Boolean = true)(implicit ec: ExecutionContext): Future[Iterator[Event]] = {
+
     val (appId, channelId) = Common.appNameToId(appName, channelName)
 
-    Await.result(eventsDb.futureFind(
+    eventsDb.futureFind(
       appId = appId,
       channelId = channelId,
       startTime = startTime,
@@ -85,8 +153,7 @@ object LEventStore {
       targetEntityType = targetEntityType,
       targetEntityId = targetEntityId,
       limit = limit,
-      reversed = Some(latest)),
-      timeout)
+      reversed = Some(latest))
   }
 
   /** Reads events generically. If entityType or entityId is not specified, it
@@ -127,9 +194,62 @@ object LEventStore {
     limit: Option[Int] = None,
     timeout: Duration = defaultTimeout): Iterator[Event] = {
 
+    // Import here to ensure ExecutionContext.Implicits.global is used only in this method
+    import scala.concurrent.ExecutionContext.Implicits.global
+
+    Await.result(findAsync(
+      appName = appName,
+      entityType = entityType,
+      entityId = entityId,
+      channelName = channelName,
+      eventNames = eventNames,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      startTime = startTime,
+      untilTime = untilTime,
+      limit = limit), timeout)
+  }
+
+  /** Reads events generically. If entityType or entityId is not specified, it
+    * results in table scan.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    *   - None means no restriction on entityType
+    *   - Some(x) means entityType should match x.
+    * @param entityId return events of this entityId
+    *   - None means no restriction on entityId
+    *   - Some(x) means entityId should match x.
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @return Future[Iterator[Event]]
+    */
+  def findAsync(
+    appName: String,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    channelName: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    limit: Option[Int] = None)(implicit ec: ExecutionContext): Future[Iterator[Event]] = {
+
     val (appId, channelId) = Common.appNameToId(appName, channelName)
 
-    Await.result(eventsDb.futureFind(
+    eventsDb.futureFind(
       appId = appId,
       channelId = channelId,
       startTime = startTime,
@@ -139,7 +259,7 @@ object LEventStore {
       eventNames = eventNames,
       targetEntityType = targetEntityType,
       targetEntityId = targetEntityId,
-      limit = limit), timeout)
+      limit = limit)
   }
 
 }
diff --git a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
index f4fd676..6f39feb 100644
--- a/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/store/java/LJavaEventStore.scala
@@ -18,15 +18,35 @@
 
 package org.apache.predictionio.data.store.java
 
+import java.util.concurrent.{CompletableFuture, CompletionStage, ExecutorService}
+
 import org.apache.predictionio.data.storage.Event
 import org.apache.predictionio.data.store.LEventStore
 import org.joda.time.DateTime
 
 import scala.collection.JavaConversions
 import scala.concurrent.duration.Duration
+import scala.compat.java8.FutureConverters._
 
 /** This Java-friendly object provides a set of operation to access Event Store
-  * without going through Spark's parallelization
+  * without going through Spark's parallelization.
+  *
+  * Note that blocking methods of this object uses
+  * `scala.concurrent.ExecutionContext.Implicits.global` internally.
+  * Since this is a thread pool which has a number of threads equal to available
+  * processors, parallelism is limited up to the number of processors.
+  *
+  * If this limitation become bottleneck of resource usage, you can increase the
+  * number of threads by declaring following VM options before calling "pio deploy":
+  *
+  * <pre>
+  * export JAVA_OPTS="$JAVA_OPTS \
+  *   -Dscala.concurrent.context.numThreads=1000 \
+  *   -Dscala.concurrent.context.maxThreads=1000"
+  * </pre>
+  *
+  * You can learn more about the global execution context in the Scala documentation:
+  * [[https://docs.scala-lang.org/overviews/core/futures.html#the-global-execution-context]]
   */
 object LJavaEventStore {
 
@@ -86,6 +106,61 @@ object LJavaEventStore {
       ).toSeq)
   }
 
+  /** Reads events of the specified entity. May use this in Algorithm's predict()
+    * or Serving logic to have fast event store access.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    * @param entityId return events of this entityId
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @param latest Return latest event first
+    * @return CompletableFuture[java.util.List[Event]]
+    */
+  def findByEntityAsync(
+    appName: String,
+    entityType: String,
+    entityId: String,
+    channelName: Option[String],
+    eventNames: Option[java.util.List[String]],
+    targetEntityType: Option[Option[String]],
+    targetEntityId: Option[Option[String]],
+    startTime: Option[DateTime],
+    untilTime: Option[DateTime],
+    limit: Option[Integer],
+    latest: Boolean,
+    executorService: ExecutorService): CompletableFuture[java.util.List[Event]] = {
+
+    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+    val limitInt = limit.map(_.intValue())
+    implicit val ec = fromExecutorService(executorService)
+
+    LEventStore.findByEntityAsync(
+      appName,
+      entityType,
+      entityId,
+      channelName,
+      eventNamesSeq,
+      targetEntityType,
+      targetEntityId,
+      startTime,
+      untilTime,
+      limitInt,
+      latest
+    ).map { x => JavaConversions.seqAsJavaList(x.toSeq) }.toJava.toCompletableFuture
+  }
+
   /** Reads events generically. If entityType or entityId is not specified, it
     * results in table scan.
     *
@@ -142,4 +217,61 @@ object LJavaEventStore {
         timeout
       ).toSeq)
   }
+
+  /** Reads events generically. If entityType or entityId is not specified, it
+    * results in table scan.
+    *
+    * @param appName return events of this app
+    * @param entityType return events of this entityType
+    *   - None means no restriction on entityType
+    *   - Some(x) means entityType should match x.
+    * @param entityId return events of this entityId
+    *   - None means no restriction on entityId
+    *   - Some(x) means entityId should match x.
+    * @param channelName return events of this channel (default channel if it's None)
+    * @param eventNames return events with any of these event names.
+    * @param targetEntityType return events of this targetEntityType:
+    *   - None means no restriction on targetEntityType
+    *   - Some(None) means no targetEntityType for this event
+    *   - Some(Some(x)) means targetEntityType should match x.
+    * @param targetEntityId return events of this targetEntityId
+    *   - None means no restriction on targetEntityId
+    *   - Some(None) means no targetEntityId for this event
+    *   - Some(Some(x)) means targetEntityId should match x.
+    * @param startTime return events with eventTime >= startTime
+    * @param untilTime return events with eventTime < untilTime
+    * @param limit Limit number of events. Get all events if None or Some(-1)
+    * @return CompletableFuture[java.util.List[Event]]
+    */
+  def findAsync(
+    appName: String,
+    entityType: Option[String],
+    entityId: Option[String],
+    channelName: Option[String],
+    eventNames: Option[java.util.List[String]],
+    targetEntityType: Option[Option[String]],
+    targetEntityId: Option[Option[String]],
+    startTime: Option[DateTime],
+    untilTime: Option[DateTime],
+    limit: Option[Integer],
+    executorService: ExecutorService): CompletableFuture[java.util.List[Event]] = {
+
+    val eventNamesSeq = eventNames.map(JavaConversions.asScalaBuffer(_).toSeq)
+    val limitInt = limit.map(_.intValue())
+    implicit val ec = fromExecutorService(executorService)
+
+    LEventStore.findAsync(
+      appName,
+      entityType,
+      entityId,
+      channelName,
+      eventNamesSeq,
+      targetEntityType,
+      targetEntityId,
+      startTime,
+      untilTime,
+      limitInt
+    ).map { x => JavaConversions.seqAsJavaList(x.toSeq) }.toJava.toCompletableFuture
+  }
+
 }