You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by em...@apache.org on 2018/10/25 17:20:29 UTC
[predictionio] branch develop updated: [PIO-189] fix ES6
integration test (#488)
This is an automated email from the ASF dual-hosted git repository.
emergentorder pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/predictionio.git
The following commit(s) were added to refs/heads/develop by this push:
new d71c423 [PIO-189] fix ES6 integration test (#488)
d71c423 is described below
commit d71c4230ae598e1647136c1bff1d646a79874cdf
Author: Alex Merritt <le...@gmail.com>
AuthorDate: Thu Oct 25 12:20:24 2018 -0500
[PIO-189] fix ES6 integration test (#488)
* [PIO-189] fix ES6 integration test
---
.../predictionio/data/storage/elasticsearch/ESLEvents.scala | 10 +++++++++-
.../predictionio/data/storage/elasticsearch/ESPEvents.scala | 5 ++++-
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index 185be92..f275ec9 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -38,7 +38,7 @@ import org.json4s.ext.JodaTimeSerializers
import grizzled.slf4j.Logging
import org.apache.http.message.BasicHeader
-class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String)
+class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseIndex: String)
extends LEvents with Logging {
implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
@@ -52,6 +52,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
val estype = getEsType(appId, channelId)
+ val index = baseIndex + "_" + estype
ESUtils.createIndex(client, index,
ESUtils.getNumberOfShards(config, index.toUpperCase),
ESUtils.getNumberOfReplicas(config, index.toUpperCase))
@@ -77,6 +78,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
val estype = getEsType(appId, channelId)
+ val index = baseIndex + "_" + estype
try {
val json =
("query" ->
@@ -107,6 +109,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
Future {
val estype = getEsType(appId, channelId)
+ val index = baseIndex + "_" + estype
try {
val id = event.eventId.getOrElse {
ESEventsUtil.getBase64UUID
@@ -152,6 +155,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = {
Future {
val estype = getEsType(appId, channelId)
+ val index = baseIndex + "_" + estype
try {
val ids = events.map { event =>
event.eventId.getOrElse(ESEventsUtil.getBase64UUID)
@@ -214,6 +218,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
}
private def exists(client: RestClient, estype: String, id: Int): Boolean = {
+ val index = baseIndex + "_" + estype
try {
client.performRequest(
"GET",
@@ -242,6 +247,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = {
Future {
val estype = getEsType(appId, channelId)
+ val index = baseIndex + "_" + estype
try {
val json =
("query" ->
@@ -275,6 +281,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
Future {
val estype = getEsType(appId, channelId)
+ val index = baseIndex + "_" + estype
try {
val json =
("query" ->
@@ -311,6 +318,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
(implicit ec: ExecutionContext): Future[Iterator[Event]] = {
Future {
val estype = getEsType(appId, channelId)
+ val index = baseIndex + "_" + estype
try {
val query = ESUtils.createEventQuery(
startTime, untilTime, entityType, entityId,
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
index 75f7639..a86d378 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
@@ -41,7 +41,7 @@ import org.json4s.native.JsonMethods._
import org.json4s.ext.JodaTimeSerializers
-class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
+class ESPEvents(client: RestClient, config: StorageClientConfig, baseIndex: String)
extends PEvents {
implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
@@ -78,6 +78,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
eventNames, targetEntityType, targetEntityId, None)
val estype = getEsType(appId, channelId)
+ val index = baseIndex + "_" + estype
val conf = new Configuration()
conf.set("es.resource", s"$index/$estype")
conf.set("es.query", query)
@@ -97,6 +98,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
events: RDD[Event],
appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
val estype = getEsType(appId, channelId)
+ val index = baseIndex + "_" + estype
val conf = Map("es.resource" -> s"$index/$estype", "es.nodes" -> getESNodes())
events.map { event =>
ESEventsUtil.eventToPut(event, appId)
@@ -107,6 +109,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
eventIds: RDD[String],
appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
val estype = getEsType(appId, channelId)
+ val index = baseIndex + "_" + estype
eventIds.foreachPartition { iter =>
iter.foreach { eventId =>
try {