You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by do...@apache.org on 2017/02/11 22:00:10 UTC
[03/10] incubator-predictionio git commit: add ESClient to close
RestClient
add ESClient to close RestClient
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/d4e75ab5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/d4e75ab5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/d4e75ab5
Branch: refs/heads/feature/es5
Commit: d4e75ab5441c2d7278c0cc55c6f1d3b51c9479a0
Parents: 36b79d7
Author: Shinsuke Sugaya <sh...@yahoo.co.jp>
Authored: Mon Jan 16 17:28:54 2017 +0900
Committer: Shinsuke Sugaya <sh...@yahoo.co.jp>
Committed: Mon Jan 16 17:28:54 2017 +0900
----------------------------------------------------------------------
.../predictionio/workflow/CreateWorkflow.scala | 74 +++++++--------
.../storage/elasticsearch/ESAccessKeys.scala | 48 +++++++---
.../data/storage/elasticsearch/ESApps.scala | 48 +++++++---
.../data/storage/elasticsearch/ESChannels.scala | 42 ++++++---
.../elasticsearch/ESEngineInstances.scala | 75 ++++++++++-----
.../elasticsearch/ESEngineManifests.scala | 23 +++--
.../elasticsearch/ESEvaluationInstances.scala | 62 +++++++-----
.../data/storage/elasticsearch/ESLEvents.scala | 99 ++++++++++++--------
.../data/storage/elasticsearch/ESPEvents.scala | 12 +--
.../storage/elasticsearch/ESSequences.scala | 22 +++--
.../data/storage/elasticsearch/ESUtils.scala | 6 +-
.../storage/elasticsearch/StorageClient.scala | 24 +++--
12 files changed, 330 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
index edfc1b6..899ace2 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala
@@ -223,40 +223,36 @@ object CreateWorkflow extends Logging {
engineFactoryObj.engineParams(wfc.engineParamsKey)
}
- try {
- val engineInstance = EngineInstance(
- id = "",
- status = "INIT",
- startTime = DateTime.now,
- endTime = DateTime.now,
- engineId = wfc.engineId,
- engineVersion = wfc.engineVersion,
- engineVariant = variantId,
- engineFactory = engineFactory,
- batch = wfc.batch,
- env = pioEnvVars,
- sparkConf = workflowParams.sparkEnv,
- dataSourceParams =
- JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
- preparatorParams =
- JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
- algorithmsParams =
- JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
- servingParams =
- JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
+ val engineInstance = EngineInstance(
+ id = "",
+ status = "INIT",
+ startTime = DateTime.now,
+ endTime = DateTime.now,
+ engineId = wfc.engineId,
+ engineVersion = wfc.engineVersion,
+ engineVariant = variantId,
+ engineFactory = engineFactory,
+ batch = wfc.batch,
+ env = pioEnvVars,
+ sparkConf = workflowParams.sparkEnv,
+ dataSourceParams =
+ JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams),
+ preparatorParams =
+ JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams),
+ algorithmsParams =
+ JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList),
+ servingParams =
+ JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams))
- val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
- engineInstance)
+ val engineInstanceId = Storage.getMetaDataEngineInstances.insert(
+ engineInstance)
- CoreWorkflow.runTrain(
- env = pioEnvVars,
- params = workflowParams,
- engine = trainableEngine,
- engineParams = engineParams,
- engineInstance = engineInstance.copy(id = engineInstanceId))
- } finally {
- Storage.getLEvents().close()
- }
+ CoreWorkflow.runTrain(
+ env = pioEnvVars,
+ params = workflowParams,
+ engine = trainableEngine,
+ engineParams = engineParams,
+ engineInstance = engineInstance.copy(id = engineInstanceId))
} else {
val workflowParams = WorkflowParams(
verbose = wfc.verbosity,
@@ -271,15 +267,11 @@ object CreateWorkflow extends Logging {
env = pioEnvVars,
sparkConf = workflowParams.sparkEnv
)
- try {
- Workflow.runEvaluation(
- evaluation = evaluation.get,
- engineParamsGenerator = engineParamsGenerator.get,
- evaluationInstance = evaluationInstance,
- params = workflowParams)
- } finally {
- Storage.getLEvents().close()
- }
+ Workflow.runEvaluation(
+ evaluation = evaluation.get,
+ engineParamsGenerator = engineParamsGenerator.get,
+ evaluationInstance = evaluationInstance,
+ params = workflowParams)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 2c69cf4..9156fab 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -37,19 +37,24 @@ import grizzled.slf4j.Logging
import org.elasticsearch.client.ResponseException
/** Elasticsearch implementation of AccessKeys. */
-class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: String)
+class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
extends AccessKeys with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "accesskeys"
- ESUtils.createIndex(client, index)
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> 0)) ~
- ("properties" ->
- ("key" -> ("type" -> "keyword")) ~
- ("events" -> ("type" -> "keyword"))))
- ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+ val restClient = client.open()
+ try {
+ ESUtils.createIndex(restClient, index)
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> 0)) ~
+ ("properties" ->
+ ("key" -> ("type" -> "keyword")) ~
+ ("events" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+ } finally {
+ restClient.close()
+ }
def insert(accessKey: AccessKey): Option[String] = {
val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
@@ -58,8 +63,9 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin
}
def get(id: String): Option[AccessKey] = {
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"GET",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -81,41 +87,50 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin
case e: IOException =>
error("Failed to access to /$index/$estype/$key", e)
None
+ } finally {
+ restClient.close()
}
}
def getAll(): Seq[AccessKey] = {
+ val restClient = client.open()
try {
val json =
("query" ->
("match_all" -> List.empty))
- ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
+ ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
+ } finally {
+ restClient.close()
}
}
def getByAppid(appid: Int): Seq[AccessKey] = {
+ val restClient = client.open()
try {
val json =
("query" ->
("term" ->
("appid" -> appid)))
- ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
+ ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
+ } finally {
+ restClient.close()
}
}
def update(accessKey: AccessKey): Unit = {
val id = accessKey.key
+ val restClient = client.open()
try {
val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON)
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/$id",
Map.empty[String, String].asJava,
@@ -131,12 +146,15 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
+ } finally {
+ restClient.close()
}
}
def delete(id: String): Unit = {
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"DELETE",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -150,6 +168,8 @@ class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: Strin
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/id", e)
+ } finally {
+ restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index 7a65379..0379c90 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -37,20 +37,25 @@ import grizzled.slf4j.Logging
import org.elasticsearch.client.ResponseException
/** Elasticsearch implementation of Items. */
-class ESApps(client: RestClient, config: StorageClientConfig, index: String)
+class ESApps(client: ESClient, config: StorageClientConfig, index: String)
extends Apps with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "apps"
private val seq = new ESSequences(client, config, index)
- ESUtils.createIndex(client, index)
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> 0))~
- ("properties" ->
- ("id" -> ("type" -> "keyword")) ~
- ("name" -> ("type" -> "keyword"))))
- ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+ val restClient = client.open()
+ try {
+ ESUtils.createIndex(restClient, index)
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> 0)) ~
+ ("properties" ->
+ ("id" -> ("type" -> "keyword")) ~
+ ("name" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+ } finally {
+ restClient.close()
+ }
def insert(app: App): Option[Int] = {
val id =
@@ -64,8 +69,9 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
}
def get(id: Int): Option[App] = {
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"GET",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -87,17 +93,20 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
case e: IOException =>
error(s"Failed to access to /$index/$estype/$id", e)
None
+ } finally {
+ restClient.close()
}
}
def getByName(name: String): Option[App] = {
+ val restClient = client.open()
try {
val json =
("query" ->
("term" ->
("name" -> name)))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/_search",
Map.empty[String, String].asJava,
@@ -114,27 +123,33 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
case e: IOException =>
error(s"Failed to access to /$index/$estype/_search", e)
None
+ } finally {
+ restClient.close()
}
}
def getAll(): Seq[App] = {
+ val restClient = client.open()
try {
val json =
("query" ->
("match_all" -> List.empty))
- ESUtils.getAll[App](client, index, estype, compact(render(json)))
+ ESUtils.getAll[App](restClient, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
+ } finally {
+ restClient.close()
}
}
def update(app: App): Unit = {
val id = app.id.toString
+ val restClient = client.open()
try {
val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON);
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/$id",
Map.empty[String, String].asJava,
@@ -150,12 +165,15 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
+ } finally {
+ restClient.close()
}
}
def delete(id: Int): Unit = {
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"DELETE",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -169,6 +187,8 @@ class ESApps(client: RestClient, config: StorageClientConfig, index: String)
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/id", e)
+ } finally {
+ restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index c90d668..b319c26 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -36,19 +36,24 @@ import org.json4s.native.Serialization.write
import grizzled.slf4j.Logging
import org.elasticsearch.client.ResponseException
-class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
+class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
extends Channels with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "channels"
private val seq = new ESSequences(client, config, index)
- ESUtils.createIndex(client, index)
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> 0))~
- ("properties" ->
- ("name" -> ("type" -> "keyword"))))
- ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+ val restClient = client.open()
+ try {
+ ESUtils.createIndex(restClient, index)
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> 0)) ~
+ ("properties" ->
+ ("name" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+ } finally {
+ restClient.close()
+ }
def insert(channel: Channel): Option[Int] = {
val id =
@@ -62,8 +67,9 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
}
def get(id: Int): Option[Channel] = {
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"GET",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -85,28 +91,34 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
case e: IOException =>
error(s"Failed to access to /$index/$estype/$id", e)
None
+ } finally {
+ restClient.close()
}
}
def getByAppid(appid: Int): Seq[Channel] = {
+ val restClient = client.open()
try {
val json =
("query" ->
("term" ->
("appid" -> appid)))
- ESUtils.getAll[Channel](client, index, estype, compact(render(json)))
+ ESUtils.getAll[Channel](restClient, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error(s"Failed to access to /$index/$estype/_search", e)
Nil
+ } finally {
+ restClient.close()
}
}
def update(channel: Channel): Boolean = {
val id = channel.id.toString
+ val restClient = client.open()
try {
val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON)
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/$id",
Map.empty[String, String].asJava,
@@ -124,12 +136,15 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
false
+ } finally {
+ restClient.close()
}
}
def delete(id: Int): Unit = {
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"DELETE",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -143,7 +158,8 @@ class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
+ } finally {
+ restClient.close()
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index 08f87f3..68cdeac 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -37,30 +37,35 @@ import org.json4s.native.Serialization.write
import grizzled.slf4j.Logging
import org.elasticsearch.client.ResponseException
-class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: String)
+class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: String)
extends EngineInstances with Logging {
implicit val formats = DefaultFormats + new EngineInstanceSerializer
private val estype = "engine_instances"
- ESUtils.createIndex(client, index)
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> 0))~
- ("properties" ->
- ("status" -> ("type" -> "keyword")) ~
- ("startTime" -> ("type" -> "date")) ~
- ("endTime" -> ("type" -> "date")) ~
- ("engineId" -> ("type" -> "keyword")) ~
- ("engineVersion" -> ("type" -> "keyword")) ~
- ("engineVariant" -> ("type" -> "keyword")) ~
- ("engineFactory" -> ("type" -> "keyword")) ~
- ("batch" -> ("type" -> "keyword")) ~
- ("dataSourceParams" -> ("type" -> "keyword")) ~
- ("preparatorParams" -> ("type" -> "keyword")) ~
- ("algorithmsParams" -> ("type" -> "keyword")) ~
- ("servingParams" -> ("type" -> "keyword")) ~
- ("status" -> ("type" -> "keyword"))))
- ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+ val restClient = client.open()
+ try {
+ ESUtils.createIndex(restClient, index)
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> 0)) ~
+ ("properties" ->
+ ("status" -> ("type" -> "keyword")) ~
+ ("startTime" -> ("type" -> "date")) ~
+ ("endTime" -> ("type" -> "date")) ~
+ ("engineId" -> ("type" -> "keyword")) ~
+ ("engineVersion" -> ("type" -> "keyword")) ~
+ ("engineVariant" -> ("type" -> "keyword")) ~
+ ("engineFactory" -> ("type" -> "keyword")) ~
+ ("batch" -> ("type" -> "keyword")) ~
+ ("dataSourceParams" -> ("type" -> "keyword")) ~
+ ("preparatorParams" -> ("type" -> "keyword")) ~
+ ("algorithmsParams" -> ("type" -> "keyword")) ~
+ ("servingParams" -> ("type" -> "keyword")) ~
+ ("status" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+ } finally {
+ restClient.close()
+ }
def insert(i: EngineInstance): String = {
val id = i.id match {
@@ -81,9 +86,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
}
def preInsert(): Option[String] = {
+ val restClient = client.open()
try {
val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/",
Map.empty[String, String].asJava,
@@ -101,12 +107,15 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
case e: IOException =>
error(s"Failed to create $index/$estype", e)
None
+ } finally {
+ restClient.close()
}
}
def get(id: String): Option[EngineInstance] = {
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"GET",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -128,19 +137,24 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
case e: IOException =>
error(s"Failed to access to /$index/$estype/$id", e)
None
+ } finally {
+ restClient.close()
}
}
def getAll(): Seq[EngineInstance] = {
+ val restClient = client.open()
try {
val json =
("query" ->
("match_all" -> List.empty))
- ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
+ ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
+ } finally {
+ restClient.close()
}
}
@@ -148,6 +162,7 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
engineId: String,
engineVersion: String,
engineVariant: String): Seq[EngineInstance] = {
+ val restClient = client.open()
try {
val json =
("query" ->
@@ -164,11 +179,13 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
("sort" -> List(
("startTime" ->
("order" -> "desc"))))
- ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
+ ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error(s"Failed to access to /$index/$estype/_search", e)
Nil
+ } finally {
+ restClient.close()
}
}
@@ -183,9 +200,10 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
def update(i: EngineInstance): Unit = {
val id = i.id
+ val restClient = client.open()
try {
val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/$id",
Map.empty[String, String].asJava,
@@ -201,12 +219,15 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
+ } finally {
+ restClient.close()
}
}
def delete(id: String): Unit = {
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"DELETE",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -220,6 +241,8 @@ class ESEngineInstances(client: RestClient, config: StorageClientConfig, index:
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
+ } finally {
+ restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
index a965c71..ae4d86b 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineManifests.scala
@@ -37,7 +37,7 @@ import org.json4s.native.Serialization.write
import grizzled.slf4j.Logging
import org.elasticsearch.client.ResponseException
-class ESEngineManifests(client: RestClient, config: StorageClientConfig, index: String)
+class ESEngineManifests(client: ESClient, config: StorageClientConfig, index: String)
extends EngineManifests with Logging {
implicit val formats = DefaultFormats + new EngineManifestSerializer
private val estype = "engine_manifests"
@@ -45,9 +45,10 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index:
def insert(engineManifest: EngineManifest): Unit = {
val id = esid(engineManifest.id, engineManifest.version)
+ val restClient = client.open()
try {
val entity = new NStringEntity(write(engineManifest), ContentType.APPLICATION_JSON)
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/$id",
Map.empty[String, String].asJava,
@@ -63,13 +64,16 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index:
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
+ } finally {
+ restClient.close()
}
}
def get(id: String, version: String): Option[EngineManifest] = {
val esId = esid(id, version)
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"GET",
s"/$index/$estype/$esId",
Map.empty[String, String].asJava)
@@ -91,20 +95,24 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index:
case e: IOException =>
error(s"Failed to access to /$index/$estype/$esId", e)
None
+ } finally {
+ restClient.close()
}
-
}
def getAll(): Seq[EngineManifest] = {
+ val restClient = client.open()
try {
val json =
("query" ->
("match_all" -> List.empty))
- ESUtils.getAll[EngineManifest](client, index, estype, compact(render(json)))
+ ESUtils.getAll[EngineManifest](restClient, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
+ } finally {
+ restClient.close()
}
}
@@ -113,8 +121,9 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index:
def delete(id: String, version: String): Unit = {
val esId = esid(id, version)
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"DELETE",
s"/$index/$estype/$esId",
Map.empty[String, String].asJava)
@@ -128,6 +137,8 @@ class ESEngineManifests(client: RestClient, config: StorageClientConfig, index:
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$esId", e)
+ } finally {
+ restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 0e71f79..1f798f0 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -38,27 +38,32 @@ import org.json4s.native.Serialization.write
import grizzled.slf4j.Logging
import org.elasticsearch.client.ResponseException
-class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, index: String)
+class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index: String)
extends EvaluationInstances with Logging {
implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
private val estype = "evaluation_instances"
private val seq = new ESSequences(client, config, index)
- ESUtils.createIndex(client, index)
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> 0))~
- ("properties" ->
- ("status" -> ("type" -> "keyword")) ~
- ("startTime" -> ("type" -> "date")) ~
- ("endTime" -> ("type" -> "date")) ~
- ("evaluationClass" -> ("type" -> "keyword")) ~
- ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
- ("batch" -> ("type" -> "keyword")) ~
- ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~
- ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~
- ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no"))))
- ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+ val restClient = client.open()
+ try {
+ ESUtils.createIndex(restClient, index)
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> 0)) ~
+ ("properties" ->
+ ("status" -> ("type" -> "keyword")) ~
+ ("startTime" -> ("type" -> "date")) ~
+ ("endTime" -> ("type" -> "date")) ~
+ ("evaluationClass" -> ("type" -> "keyword")) ~
+ ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
+ ("batch" -> ("type" -> "keyword")) ~
+ ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~
+ ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~
+ ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no"))))
+ ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+ } finally {
+ restClient.close()
+ }
def insert(i: EvaluationInstance): String = {
val id = i.id match {
@@ -74,8 +79,9 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind
}
def get(id: String): Option[EvaluationInstance] = {
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"GET",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -97,23 +103,29 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind
case e: IOException =>
error(s"Failed to access to /$index/$estype/$id", e)
None
+ } finally {
+ restClient.close()
}
}
def getAll(): Seq[EvaluationInstance] = {
+ val restClient = client.open()
try {
val json =
("query" ->
("match_all" -> List.empty))
- ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
+ ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
+ } finally {
+ restClient.close()
}
}
def getCompleted(): Seq[EvaluationInstance] = {
+ val restClient = client.open()
try {
val json =
("query" ->
@@ -122,19 +134,22 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind
("sort" ->
("startTime" ->
("order" -> "desc")))
- ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
+ ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
+ } finally {
+ restClient.close()
}
}
def update(i: EvaluationInstance): Unit = {
val id = i.id
+ val restClient = client.open()
try {
val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/$id",
Map.empty[String, String].asJava,
@@ -150,12 +165,15 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
+ } finally {
+ restClient.close()
}
}
def delete(id: String): Unit = {
+ val restClient = client.open()
try {
- val response = client.performRequest(
+ val response = restClient.performRequest(
"DELETE",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -169,6 +187,8 @@ class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, ind
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
+ } finally {
+ restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index ef25204..b4f7dc5 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -40,7 +40,7 @@ import org.json4s.ext.JodaTimeSerializers
import grizzled.slf4j.Logging
import org.elasticsearch.client.ResponseException
-class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String)
+class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String)
extends LEvents with Logging {
implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
private val seq = new ESSequences(client, config, index)
@@ -56,41 +56,47 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
val estype = getEsType(appId, channelId)
- ESUtils.createIndex(client, index)
- val json =
- (estype ->
- ("_all" -> ("enabled" -> 0)) ~
- ("properties" ->
- ("name" -> ("type" -> "keyword")) ~
- ("eventId" -> ("type" -> "keyword")) ~
- ("event" -> ("type" -> "keyword")) ~
- ("entityType" -> ("type" -> "keyword")) ~
- ("entityId" -> ("type" -> "keyword")) ~
- ("targetEntityType" -> ("type" -> "keyword")) ~
- ("targetEntityId" -> ("type" -> "keyword")) ~
+ val restClient = client.open()
+ try {
+ ESUtils.createIndex(restClient, index)
+ val json =
+ (estype ->
+ ("_all" -> ("enabled" -> 0)) ~
("properties" ->
- ("type" -> "nested") ~
+ ("name" -> ("type" -> "keyword")) ~
+ ("eventId" -> ("type" -> "keyword")) ~
+ ("event" -> ("type" -> "keyword")) ~
+ ("entityType" -> ("type" -> "keyword")) ~
+ ("entityId" -> ("type" -> "keyword")) ~
+ ("targetEntityType" -> ("type" -> "keyword")) ~
+ ("targetEntityId" -> ("type" -> "keyword")) ~
("properties" ->
- ("fields" -> ("type" -> "nested") ~
- ("properties" ->
- ("user" -> ("type" -> "long")) ~
- ("num" -> ("type" -> "long")))))) ~
- ("eventTime" -> ("type" -> "date")) ~
- ("tags" -> ("type" -> "keyword")) ~
- ("prId" -> ("type" -> "keyword")) ~
- ("creationTime" -> ("type" -> "date"))))
- ESUtils.createMapping(client, index, estype, compact(render(json)))
+ ("type" -> "nested") ~
+ ("properties" ->
+ ("fields" -> ("type" -> "nested") ~
+ ("properties" ->
+ ("user" -> ("type" -> "long")) ~
+ ("num" -> ("type" -> "long")))))) ~
+ ("eventTime" -> ("type" -> "date")) ~
+ ("tags" -> ("type" -> "keyword")) ~
+ ("prId" -> ("type" -> "keyword")) ~
+ ("creationTime" -> ("type" -> "date"))))
+ ESUtils.createMapping(restClient, index, estype, compact(render(json)))
+ } finally {
+ restClient.close()
+ }
true
}
override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
val estype = getEsType(appId, channelId)
+ val restClient = client.open()
try {
val json =
("query" ->
("match_all" -> List.empty))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
- client.performRequest(
+ restClient.performRequest(
"POST",
s"/$index/$estype/_delete_by_query",
Map.empty[String, String].asJava,
@@ -104,14 +110,13 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
case e: Exception =>
error(s"Failed to remove $index/$estype", e)
false
+ } finally {
+ restClient.close()
}
}
override def close(): Unit = {
- try client.close() catch {
- case e: Exception =>
- error("Failed to close client.", e)
- }
+ // nothing
}
override def futureInsert(
@@ -120,15 +125,16 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
Future {
val estype = getEsType(appId, channelId)
- val id = event.eventId.getOrElse {
- var roll = seq.genNext(seqName)
- while (exists(estype, roll)) roll = seq.genNext(seqName)
- roll.toString
- }
- val json = write(event.copy(eventId = Some(id)))
+ val restClient = client.open()
try {
+ val id = event.eventId.getOrElse {
+ var roll = seq.genNext(seqName)
+ while (exists(restClient, estype, roll)) roll = seq.genNext(seqName)
+ roll.toString
+ }
+ val json = write(event.copy(eventId = Some(id)))
val entity = new NStringEntity(json, ContentType.APPLICATION_JSON);
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/$id",
Map.empty[String, String].asJava,
@@ -144,15 +150,17 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
}
} catch {
case e: IOException =>
- error(s"Failed to update $index/$estype/$id: $json", e)
+ error(s"Failed to update $index/$estype/<id>", e)
""
+ } finally {
+ restClient.close()
}
}
}
- private def exists(estype: String, id: Int): Boolean = {
+ private def exists(restClient: RestClient, estype: String, id: Int): Boolean = {
try {
- client.performRequest(
+ restClient.performRequest(
"GET",
s"/$index/$estype/$id",
Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
@@ -179,13 +187,14 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = {
Future {
val estype = getEsType(appId, channelId)
+ val restClient = client.open()
try {
val json =
("query" ->
("term" ->
("eventId" -> eventId)))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/_search",
Map.empty[String, String].asJava,
@@ -202,6 +211,8 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
None
+ } finally {
+ restClient.close()
}
}
}
@@ -212,13 +223,14 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
Future {
val estype = getEsType(appId, channelId)
+ val restClient = client.open()
try {
val json =
("query" ->
("term" ->
("eventId" -> eventId)))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/_delete_by_query",
Map.empty[String, String].asJava)
@@ -234,6 +246,8 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
case e: IOException =>
error(s"Failed to update $index/$estype:$eventId", e)
false
+ } finally {
+ restClient.close()
}
}
}
@@ -253,15 +267,18 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index:
(implicit ec: ExecutionContext): Future[Iterator[Event]] = {
Future {
val estype = getEsType(appId, channelId)
+ val restClient = client.open()
try {
val query = ESUtils.createEventQuery(
startTime, untilTime, entityType, entityId,
eventNames, targetEntityType, targetEntityId, None)
- ESUtils.getAll[Event](client, index, estype, query).toIterator
+ ESUtils.getAll[Event](restClient, index, estype, query).toIterator
} catch {
case e: IOException =>
error(e.getMessage)
Iterator[Event]()
+ } finally {
+ restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
index 0e3eec8..5784b3f 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
@@ -41,16 +41,10 @@ import org.json4s.native.JsonMethods._
import org.json4s.ext.JodaTimeSerializers
-class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
+class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
extends PEvents {
implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
- // client is not used.
- try client.close() catch {
- case e: Exception =>
- logger.error("Failed to close client.", e)
- }
-
def getEsType(appId: Int, channelId: Option[Int] = None): String = {
channelId.map { ch =>
s"${appId}_${ch}"
@@ -114,7 +108,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
eventIds: RDD[String],
appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
val estype = getEsType(appId, channelId)
- val restClient = ESUtils.createRestClient(config)
+ val restClient = client.open()
try {
eventIds.foreachPartition { iter =>
iter.foreach { eventId =>
@@ -124,7 +118,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
("term" ->
("eventId" -> eventId)))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/_delete_by_query",
Map.empty[String, String].asJava)
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index c067f3a..4eb8cd7 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -35,20 +35,26 @@ import org.json4s.native.Serialization.write
import grizzled.slf4j.Logging
-class ESSequences(client: RestClient, config: StorageClientConfig, index: String) extends Logging {
+class ESSequences(client: ESClient, config: StorageClientConfig, index: String) extends Logging {
implicit val formats = DefaultFormats
private val estype = "sequences"
- ESUtils.createIndex(client, index)
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> 0)))
- ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
+ val restClient = client.open()
+ try {
+ ESUtils.createIndex(restClient, index)
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> 0)))
+ ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
+ } finally {
+ restClient.close()
+ }
def genNext(name: String): Int = {
+ val restClient = client.open()
try {
val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON)
- val response = client.performRequest(
+ val response = restClient.performRequest(
"POST",
s"/$index/$estype/$name",
Map.empty[String, String].asJava,
@@ -66,6 +72,8 @@ class ESSequences(client: RestClient, config: StorageClientConfig, index: String
} catch {
case e: IOException =>
throw new StorageClientException(s"Failed to update $index/$estype/$name", e)
+ } finally {
+ restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index 68e3f57..db841b6 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -151,15 +151,13 @@ object ESUtils {
|}""".stripMargin
}
- def createRestClient(config: StorageClientConfig): RestClient = {
+ def getHttpHosts(config: StorageClientConfig): Seq[HttpHost] = {
val hosts = config.properties.get("HOSTS").
map(_.split(",").toSeq).getOrElse(Seq("localhost"))
val ports = config.properties.get("PORTS").
map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200))
val schemes = config.properties.get("SCHEMES").
map(_.split(",").toSeq).getOrElse(Seq("http"))
- val httpHosts = (hosts, ports, schemes).zipped.map(
- (h, p, s) => new HttpHost(h, p, s))
- RestClient.builder(httpHosts: _*).build()
+ (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/d4e75ab5/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
index 912d467..647d180 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
@@ -17,22 +17,28 @@
package org.apache.predictionio.data.storage.elasticsearch
-import grizzled.slf4j.Logging
+import org.apache.http.HttpHost
import org.apache.predictionio.data.storage.BaseStorageClient
import org.apache.predictionio.data.storage.StorageClientConfig
import org.apache.predictionio.data.storage.StorageClientException
-import java.net.InetAddress
import org.elasticsearch.client.RestClient
-import org.apache.http.HttpHost
+
+import grizzled.slf4j.Logging
+
+case class ESClient(hosts: Seq[HttpHost]) {
+ def open(): RestClient = {
+ try {
+ RestClient.builder(hosts: _*).build()
+ } catch {
+ case e: Throwable =>
+ throw new StorageClientException(e.getMessage, e)
+ }
+ }
+}
class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
with Logging {
override val prefix = "ES"
- val client = try {
- ESUtils.createRestClient(config)
- } catch {
- case e: Throwable =>
- throw new StorageClientException(e.getMessage, e)
- }
+ val client = ESClient(ESUtils.getHttpHosts(config))
}