You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by em...@apache.org on 2018/10/25 17:20:29 UTC

[predictionio] branch develop updated: [PIO-189] fix ES6 integration test (#488)

This is an automated email from the ASF dual-hosted git repository.

emergentorder pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/predictionio.git


The following commit(s) were added to refs/heads/develop by this push:
     new d71c423  [PIO-189] fix ES6 integration test (#488)
d71c423 is described below

commit d71c4230ae598e1647136c1bff1d646a79874cdf
Author: Alex Merritt <le...@gmail.com>
AuthorDate: Thu Oct 25 12:20:24 2018 -0500

    [PIO-189] fix ES6 integration test (#488)
    
    * [PIO-189] fix ES6 integration test
---
 .../predictionio/data/storage/elasticsearch/ESLEvents.scala    | 10 +++++++++-
 .../predictionio/data/storage/elasticsearch/ESPEvents.scala    |  5 ++++-
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index 185be92..f275ec9 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -38,7 +38,7 @@ import org.json4s.ext.JodaTimeSerializers
 import grizzled.slf4j.Logging
 import org.apache.http.message.BasicHeader
 
-class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String)
+class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseIndex: String)
     extends LEvents with Logging {
   implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
 
@@ -52,6 +52,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
 
   override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
     val estype = getEsType(appId, channelId)
+    val index = baseIndex + "_" + estype
     ESUtils.createIndex(client, index,
       ESUtils.getNumberOfShards(config, index.toUpperCase),
       ESUtils.getNumberOfReplicas(config, index.toUpperCase))
@@ -77,6 +78,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
 
   override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
     val estype = getEsType(appId, channelId)
+    val index = baseIndex + "_" + estype
     try {
       val json =
         ("query" ->
@@ -107,6 +109,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val index = baseIndex + "_" + estype
       try {
         val id = event.eventId.getOrElse {
           ESEventsUtil.getBase64UUID
@@ -152,6 +155,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val index = baseIndex + "_" + estype
       try {
         val ids = events.map { event =>
           event.eventId.getOrElse(ESEventsUtil.getBase64UUID)
@@ -214,6 +218,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
   }
 
   private def exists(client: RestClient, estype: String, id: Int): Boolean = {
+    val index = baseIndex + "_" + estype
     try {
       client.performRequest(
         "GET",
@@ -242,6 +247,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val index = baseIndex + "_" + estype
       try {
         val json =
           ("query" ->
@@ -275,6 +281,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
     channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val index = baseIndex + "_" + estype
       try {
         val json =
           ("query" ->
@@ -311,6 +318,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
     (implicit ec: ExecutionContext): Future[Iterator[Event]] = {
     Future {
       val estype = getEsType(appId, channelId)
+      val index = baseIndex + "_" + estype
       try {
         val query = ESUtils.createEventQuery(
           startTime, untilTime, entityType, entityId,
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
index 75f7639..a86d378 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
@@ -41,7 +41,7 @@ import org.json4s.native.JsonMethods._
 import org.json4s.ext.JodaTimeSerializers
 
 
-class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
+class ESPEvents(client: RestClient, config: StorageClientConfig, baseIndex: String)
     extends PEvents {
   implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
 
@@ -78,6 +78,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
       eventNames, targetEntityType, targetEntityId, None)
 
     val estype = getEsType(appId, channelId)
+    val index = baseIndex + "_" + estype
     val conf = new Configuration()
     conf.set("es.resource", s"$index/$estype")
     conf.set("es.query", query)
@@ -97,6 +98,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
     events: RDD[Event],
     appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
     val estype = getEsType(appId, channelId)
+    val index = baseIndex + "_" + estype
     val conf = Map("es.resource" -> s"$index/$estype", "es.nodes" -> getESNodes())
     events.map { event =>
       ESEventsUtil.eventToPut(event, appId)
@@ -107,6 +109,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
     eventIds: RDD[String],
     appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
     val estype = getEsType(appId, channelId)
+    val index = baseIndex + "_" + estype
       eventIds.foreachPartition { iter =>
         iter.foreach { eventId =>
           try {