You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@predictionio.apache.org by ch...@apache.org on 2017/09/28 15:55:25 UTC
[44/57] [abbrv] incubator-predictionio git commit: [PIO-106,
PIO-114] Elasticsearch 5.x singleton client with authentication
[PIO-106,PIO-114] Elasticsearch 5.x singleton client with authentication
Closes #421
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/bf84ede6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/bf84ede6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/bf84ede6
Branch: refs/heads/livedoc
Commit: bf84ede6fe475ec591e784eb453c6194befb8515
Parents: eb61358
Author: Mars Hall <ma...@heroku.com>
Authored: Tue Aug 29 11:38:34 2017 -0700
Committer: Mars Hall <ma...@heroku.com>
Committed: Tue Aug 29 11:38:34 2017 -0700
----------------------------------------------------------------------
bin/compute-classpath.sh | 21 ++-
conf/pio-env.sh.template | 3 +
.../predictionio/workflow/BatchPredict.scala | 140 ++++++++++---------
.../workflow/CleanupFunctions.scala | 65 +++++++++
.../predictionio/workflow/CoreWorkflow.scala | 66 +++++----
.../predictionio/data/storage/Storage.scala | 8 +-
storage/elasticsearch/build.sbt | 8 +-
.../storage/elasticsearch/ESAccessKeys.scala | 55 +++-----
.../data/storage/elasticsearch/ESApps.scala | 55 +++-----
.../data/storage/elasticsearch/ESChannels.scala | 50 +++----
.../elasticsearch/ESEngineInstances.scala | 84 ++++-------
.../elasticsearch/ESEvaluationInstances.scala | 71 ++++------
.../data/storage/elasticsearch/ESLEvents.scala | 49 +++++--
.../data/storage/elasticsearch/ESPEvents.scala | 29 ++--
.../storage/elasticsearch/ESSequences.scala | 30 ++--
.../storage/elasticsearch/StorageClient.scala | 69 ++++++++-
.../elasticsearch/StorageClientSpec.scala | 67 +++++++++
.../elasticsearch/StorageTestUtils.scala | 28 ++++
storage/hdfs/project/build.properties | 1 +
.../tools/export/EventsToFile.scala | 62 ++++----
.../predictionio/tools/imprt/FileToEvents.scala | 64 +++++----
21 files changed, 596 insertions(+), 429 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/bin/compute-classpath.sh
----------------------------------------------------------------------
diff --git a/bin/compute-classpath.sh b/bin/compute-classpath.sh
index 3e59ca7..7a38e0b 100755
--- a/bin/compute-classpath.sh
+++ b/bin/compute-classpath.sh
@@ -42,8 +42,25 @@ ASSEMBLY_JARS=$(printf "${MAIN_JAR}\n${DATA_JARS}" | paste -sd "," -)
# Build up classpath
CLASSPATH="${PIO_CONF_DIR}"
-CLASSPATH="$CLASSPATH:${FWDIR}/plugins/*"
-CLASSPATH="$CLASSPATH:${assembly_folder}/spark/*"
+
+# stable classpath for plugin JARs
+if [ -d "${FWDIR}/plugins" ]; then
+ lib_plugin_jars=`ls "${FWDIR}"/plugins/*`
+ lib_plugin_classpath=''
+ for J in $lib_plugin_jars; do
+ lib_plugin_classpath="${lib_plugin_classpath}:${J}"
+ done
+ CLASSPATH="$CLASSPATH${lib_plugin_classpath}"
+fi
+
+# stable classpath for Spark JARs
+lib_spark_jars=`ls "${assembly_folder}"/spark/*.jar`
+lib_spark_classpath=''
+for J in $lib_spark_jars; do
+ lib_spark_classpath="${lib_spark_classpath}:${J}"
+done
+CLASSPATH="$CLASSPATH${lib_spark_classpath}"
+
CLASSPATH="$CLASSPATH:${MAIN_JAR}"
# Add hadoop conf dir if given -- otherwise FileSystem.*, etc fail ! Note, this
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/conf/pio-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template
index 832b422..0b6b5b9 100644
--- a/conf/pio-env.sh.template
+++ b/conf/pio-env.sh.template
@@ -90,6 +90,9 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio
# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.4.1
+# Optional basic HTTP auth
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_USERNAME=my-name
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_PASSWORD=my-secret
# Elasticsearch 1.x Example
# PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch
# PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=<elasticsearch_cluster_name>
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
index 5420638..69525b1 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala
@@ -28,6 +28,7 @@ import org.apache.predictionio.controller.{Engine, Utils}
import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer}
import org.apache.predictionio.data.storage.{EngineInstance, Storage}
import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+import org.apache.predictionio.workflow.CleanupFunctions
import org.apache.spark.rdd.RDD
import org.json4s._
import org.json4s.native.JsonMethods._
@@ -146,84 +147,89 @@ object BatchPredict extends Logging {
engineInstance: EngineInstance,
engine: Engine[_, _, _, Q, P, _]): Unit = {
- val engineParams = engine.engineInstanceToEngineParams(
- engineInstance, config.jsonExtractor)
+ try {
+ val engineParams = engine.engineInstanceToEngineParams(
+ engineInstance, config.jsonExtractor)
- val kryo = KryoInstantiator.newKryoInjection
+ val kryo = KryoInstantiator.newKryoInjection
- val modelsFromEngineInstance =
- kryo.invert(modeldata.get(engineInstance.id).get.models).get.
- asInstanceOf[Seq[Any]]
+ val modelsFromEngineInstance =
+ kryo.invert(modeldata.get(engineInstance.id).get.models).get.
+ asInstanceOf[Seq[Any]]
- val prepareSparkContext = WorkflowContext(
- batch = engineInstance.engineFactory,
- executorEnv = engineInstance.env,
- mode = "Batch Predict (model)",
- sparkEnv = engineInstance.sparkConf)
+ val prepareSparkContext = WorkflowContext(
+ batch = engineInstance.engineFactory,
+ executorEnv = engineInstance.env,
+ mode = "Batch Predict (model)",
+ sparkEnv = engineInstance.sparkConf)
- val models = engine.prepareDeploy(
- prepareSparkContext,
- engineParams,
- engineInstance.id,
- modelsFromEngineInstance,
- params = WorkflowParams()
- )
+ val models = engine.prepareDeploy(
+ prepareSparkContext,
+ engineParams,
+ engineInstance.id,
+ modelsFromEngineInstance,
+ params = WorkflowParams()
+ )
- val algorithms = engineParams.algorithmParamsList.map { case (n, p) =>
- Doer(engine.algorithmClassMap(n), p)
- }
+ val algorithms = engineParams.algorithmParamsList.map { case (n, p) =>
+ Doer(engine.algorithmClassMap(n), p)
+ }
- val servingParamsWithName = engineParams.servingParams
+ val servingParamsWithName = engineParams.servingParams
- val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
- servingParamsWithName._2)
+ val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
+ servingParamsWithName._2)
- val runSparkContext = WorkflowContext(
- batch = engineInstance.engineFactory,
- executorEnv = engineInstance.env,
- mode = "Batch Predict (runner)",
- sparkEnv = engineInstance.sparkConf)
+ val runSparkContext = WorkflowContext(
+ batch = engineInstance.engineFactory,
+ executorEnv = engineInstance.env,
+ mode = "Batch Predict (runner)",
+ sparkEnv = engineInstance.sparkConf)
- val inputRDD: RDD[String] = runSparkContext.
- textFile(config.inputFilePath).
- filter(_.trim.nonEmpty)
- val queriesRDD: RDD[String] = config.queryPartitions match {
- case Some(p) => inputRDD.repartition(p)
- case None => inputRDD
- }
+ val inputRDD: RDD[String] = runSparkContext.
+ textFile(config.inputFilePath).
+ filter(_.trim.nonEmpty)
+ val queriesRDD: RDD[String] = config.queryPartitions match {
+ case Some(p) => inputRDD.repartition(p)
+ case None => inputRDD
+ }
- val predictionsRDD: RDD[String] = queriesRDD.map { queryString =>
- val jsonExtractorOption = config.jsonExtractor
- // Extract Query from Json
- val query = JsonExtractor.extract(
- jsonExtractorOption,
- queryString,
- algorithms.head.queryClass,
- algorithms.head.querySerializer,
- algorithms.head.gsonTypeAdapterFactories
- )
- // Deploy logic. First call Serving.supplement, then Algo.predict,
- // finally Serving.serve.
- val supplementedQuery = serving.supplementBase(query)
- // TODO: Parallelize the following.
- val predictions = algorithms.zip(models).map { case (a, m) =>
- a.predictBase(m, supplementedQuery)
+ val predictionsRDD: RDD[String] = queriesRDD.map { queryString =>
+ val jsonExtractorOption = config.jsonExtractor
+ // Extract Query from Json
+ val query = JsonExtractor.extract(
+ jsonExtractorOption,
+ queryString,
+ algorithms.head.queryClass,
+ algorithms.head.querySerializer,
+ algorithms.head.gsonTypeAdapterFactories
+ )
+ // Deploy logic. First call Serving.supplement, then Algo.predict,
+ // finally Serving.serve.
+ val supplementedQuery = serving.supplementBase(query)
+ // TODO: Parallelize the following.
+ val predictions = algorithms.zip(models).map { case (a, m) =>
+ a.predictBase(m, supplementedQuery)
+ }
+ // Notice that it is by design to call Serving.serve with the
+ // *original* query.
+ val prediction = serving.serveBase(query, predictions)
+ // Combine query with prediction, so the batch results are
+ // self-descriptive.
+ val predictionJValue = JsonExtractor.toJValue(
+ jsonExtractorOption,
+ Map("query" -> query,
+ "prediction" -> prediction),
+ algorithms.head.querySerializer,
+ algorithms.head.gsonTypeAdapterFactories)
+ // Return JSON string
+ compact(render(predictionJValue))
}
- // Notice that it is by design to call Serving.serve with the
- // *original* query.
- val prediction = serving.serveBase(query, predictions)
- // Combine query with prediction, so the batch results are
- // self-descriptive.
- val predictionJValue = JsonExtractor.toJValue(
- jsonExtractorOption,
- Map("query" -> query,
- "prediction" -> prediction),
- algorithms.head.querySerializer,
- algorithms.head.gsonTypeAdapterFactories)
- // Return JSON string
- compact(render(predictionJValue))
- }
- predictionsRDD.saveAsTextFile(config.outputFilePath)
+ predictionsRDD.saveAsTextFile(config.outputFilePath)
+
+ } finally {
+ CleanupFunctions.run()
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala b/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala
new file mode 100644
index 0000000..bdd8b01
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CleanupFunctions.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.workflow
+
+/** :: DeveloperApi ::
+ * Singleton object that collects anonymous functions to be
+ * executed to allow the process to end gracefully.
+ *
+ * For example, the Elasticsearch REST storage client
+ * maintains an internal connection pool that must
+ * be closed to allow the process to exit.
+ */
+object CleanupFunctions {
+ @volatile private var functions: Seq[() => Unit] = Seq.empty[() => Unit]
+
+ /** Add a function to be called during cleanup.
+ *
+ * {{{
+ * import org.apache.predictionio.workflow.CleanupFunctions
+ *
+ * CleanupFunctions.add { MyStorageClass.close }
+ * }}}
+ *
+ * @param anonymous function containing cleanup code.
+ */
+ def add(f: () => Unit): Seq[() => Unit] = {
+ functions = functions :+ f
+ functions
+ }
+
+ /** Call all cleanup functions in order added.
+ *
+ * {{{
+ * import org.apache.predictionio.workflow.CleanupFunctions
+ *
+ * try {
+ * // Much code that needs cleanup
+ * // whether successful or error thrown.
+ * } finally {
+ * CleanupFunctions.run()
+ * }
+ * }}}
+ *
+ * @param anonymous function containing cleanup code.
+ */
+ def run(): Unit = {
+ functions.foreach { f => f() }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala
index d956fd7..65270a2 100644
--- a/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala
+++ b/core/src/main/scala/org/apache/predictionio/workflow/CoreWorkflow.scala
@@ -96,6 +96,7 @@ object CoreWorkflow {
}
} finally {
logger.debug("Stopping SparkContext")
+ CleanupFunctions.run()
sc.stop()
}
}
@@ -123,40 +124,43 @@ object CoreWorkflow {
env,
params.sparkEnv,
mode.capitalize)
- val evaluationInstanceId = evaluationInstances.insert(evaluationInstance)
- logger.info(s"Starting evaluation instance ID: $evaluationInstanceId")
-
- val evaluatorResult: BaseEvaluatorResult = EvaluationWorkflow.runEvaluation(
- sc,
- evaluation,
- engine,
- engineParamsList,
- evaluator,
- params)
-
- if (evaluatorResult.noSave) {
- logger.info(s"This evaluation result is not inserted into database: $evaluatorResult")
- } else {
- val evaluatedEvaluationInstance = evaluationInstance.copy(
- status = "EVALCOMPLETED",
- id = evaluationInstanceId,
- endTime = DateTime.now,
- evaluatorResults = evaluatorResult.toOneLiner,
- evaluatorResultsHTML = evaluatorResult.toHTML,
- evaluatorResultsJSON = evaluatorResult.toJSON
- )
-
- logger.info(s"Updating evaluation instance with result: $evaluatorResult")
+ try {
+ val evaluationInstanceId = evaluationInstances.insert(evaluationInstance)
+
+ logger.info(s"Starting evaluation instance ID: $evaluationInstanceId")
+
+ val evaluatorResult: BaseEvaluatorResult = EvaluationWorkflow.runEvaluation(
+ sc,
+ evaluation,
+ engine,
+ engineParamsList,
+ evaluator,
+ params)
+
+ if (evaluatorResult.noSave) {
+ logger.info(s"This evaluation result is not inserted into database: $evaluatorResult")
+ } else {
+ val evaluatedEvaluationInstance = evaluationInstance.copy(
+ status = "EVALCOMPLETED",
+ id = evaluationInstanceId,
+ endTime = DateTime.now,
+ evaluatorResults = evaluatorResult.toOneLiner,
+ evaluatorResultsHTML = evaluatorResult.toHTML,
+ evaluatorResultsJSON = evaluatorResult.toJSON
+ )
+
+ logger.info(s"Updating evaluation instance with result: $evaluatorResult")
+
+ evaluationInstances.update(evaluatedEvaluationInstance)
+ }
+ logger.info("runEvaluation completed")
- evaluationInstances.update(evaluatedEvaluationInstance)
+ } finally {
+ logger.debug("Stop SparkContext")
+ CleanupFunctions.run()
+ sc.stop()
}
-
- logger.debug("Stop SparkContext")
-
- sc.stop()
-
- logger.info("runEvaluation completed")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
index 52442a6..70e1973 100644
--- a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala
@@ -19,6 +19,7 @@
package org.apache.predictionio.data.storage
import grizzled.slf4j.Logging
+import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.predictionio.annotation.DeveloperApi
import scala.concurrent.ExecutionContext.Implicits.global
@@ -214,7 +215,8 @@ object Storage extends Logging {
}
} catch {
case e: Throwable =>
- error(e.getMessage)
+ val stackTrace = ExceptionUtils.getStackTrace(e)
+ error(s"${e.getMessage}\n${stackTrace}\n\n")
errors += 1
r -> DataObjectMeta("", "")
}
@@ -282,7 +284,9 @@ object Storage extends Logging {
Some(ClientMeta(sourceType, client, clientConfig))
} catch {
case e: Throwable =>
- error(s"Error initializing storage client for source ${k}", e)
+ val stackTrace = ExceptionUtils.getStackTrace(e)
+ error(s"Error initializing storage client for source ${k}.\n" +
+ s"${stackTrace}\n\n")
errors += 1
None
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/build.sbt
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/build.sbt b/storage/elasticsearch/build.sbt
index da4842e..b60e86e 100644
--- a/storage/elasticsearch/build.sbt
+++ b/storage/elasticsearch/build.sbt
@@ -30,7 +30,7 @@ libraryDependencies ++= Seq(
"org.elasticsearch" %% elasticsearchSparkArtifact.value % elasticsearchVersion.value
exclude("org.apache.spark", "*"),
"org.elasticsearch" % "elasticsearch-hadoop-mr" % elasticsearchVersion.value,
- "org.scalatest" %% "scalatest" % "2.1.7" % "test")
+ "org.specs2" %% "specs2" % "2.3.13" % "test")
parallelExecution in Test := false
@@ -39,8 +39,10 @@ pomExtra := childrenPomExtra.value
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
assemblyShadeRules in assembly := Seq(
- ShadeRule.rename("org.apache.http.**" -> "shadeio.data.http.@1").inAll
-)
+ ShadeRule.rename("org.apache.http.**" ->
+ "org.apache.predictionio.shaded.org.apache.http.@1").inAll,
+ ShadeRule.rename("org.elasticsearch.client.**" ->
+ "org.apache.predictionio.shaded.org.elasticsearch.client.@1").inAll)
// skip test in assembly
test in assembly := {}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 98c2781..73ef1d0 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -27,36 +27,30 @@ import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.AccessKey
import org.apache.predictionio.data.storage.AccessKeys
import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.{ResponseException, RestClient}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
/** Elasticsearch implementation of AccessKeys. */
-class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
+class ESAccessKeys(client: RestClient, config: StorageClientConfig, index: String)
extends AccessKeys with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "accesskeys"
- val restClient = client.open()
- try {
- ESUtils.createIndex(restClient, index,
- ESUtils.getNumberOfShards(config, index.toUpperCase),
- ESUtils.getNumberOfReplicas(config, index.toUpperCase))
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> false)) ~
- ("properties" ->
- ("key" -> ("type" -> "keyword")) ~
- ("events" -> ("type" -> "keyword"))))
- ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
- } finally {
- restClient.close()
- }
+ ESUtils.createIndex(client, index,
+ ESUtils.getNumberOfShards(config, index.toUpperCase),
+ ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> false)) ~
+ ("properties" ->
+ ("key" -> ("type" -> "keyword")) ~
+ ("events" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
def insert(accessKey: AccessKey): Option[String] = {
val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
@@ -68,9 +62,8 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
if (id.isEmpty) {
return None
}
- val restClient = client.open()
try {
- val response = restClient.performRequest(
+ val response = client.performRequest(
"GET",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -92,50 +85,41 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
case e: IOException =>
error(s"Failed to access to /$index/$estype/$id", e)
None
- } finally {
- restClient.close()
}
}
def getAll(): Seq[AccessKey] = {
- val restClient = client.open()
try {
val json =
("query" ->
("match_all" -> List.empty))
- ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json)))
+ ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
- } finally {
- restClient.close()
}
}
def getByAppid(appid: Int): Seq[AccessKey] = {
- val restClient = client.open()
try {
val json =
("query" ->
("term" ->
("appid" -> appid)))
- ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json)))
+ ESUtils.getAll[AccessKey](client, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
- } finally {
- restClient.close()
}
}
def update(accessKey: AccessKey): Unit = {
val id = accessKey.key
- val restClient = client.open()
try {
val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/$id",
Map("refresh" -> "true").asJava,
@@ -151,15 +135,12 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
- } finally {
- restClient.close()
}
}
def delete(id: String): Unit = {
- val restClient = client.open()
try {
- val response = restClient.performRequest(
+ val response = client.performRequest(
"DELETE",
s"/$index/$estype/$id",
Map("refresh" -> "true").asJava)
@@ -173,8 +154,6 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String)
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/id", e)
- } finally {
- restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index 6afed12..ba48065 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -27,37 +27,31 @@ import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.App
import org.apache.predictionio.data.storage.Apps
import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.{ResponseException, RestClient}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
/** Elasticsearch implementation of Items. */
-class ESApps(client: ESClient, config: StorageClientConfig, index: String)
+class ESApps(client: RestClient, config: StorageClientConfig, index: String)
extends Apps with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "apps"
private val seq = new ESSequences(client, config, index)
- val restClient = client.open()
- try {
- ESUtils.createIndex(restClient, index,
- ESUtils.getNumberOfShards(config, index.toUpperCase),
- ESUtils.getNumberOfReplicas(config, index.toUpperCase))
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> false)) ~
- ("properties" ->
- ("id" -> ("type" -> "keyword")) ~
- ("name" -> ("type" -> "keyword"))))
- ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
- } finally {
- restClient.close()
- }
+ ESUtils.createIndex(client, index,
+ ESUtils.getNumberOfShards(config, index.toUpperCase),
+ ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> false)) ~
+ ("properties" ->
+ ("id" -> ("type" -> "keyword")) ~
+ ("name" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
def insert(app: App): Option[Int] = {
val id = app.id match {
@@ -77,9 +71,8 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
}
def get(id: Int): Option[App] = {
- val restClient = client.open()
try {
- val response = restClient.performRequest(
+ val response = client.performRequest(
"GET",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -101,20 +94,17 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
case e: IOException =>
error(s"Failed to access to /$index/$estype/$id", e)
None
- } finally {
- restClient.close()
}
}
def getByName(name: String): Option[App] = {
- val restClient = client.open()
try {
val json =
("query" ->
("term" ->
("name" -> name)))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/_search",
Map.empty[String, String].asJava,
@@ -131,33 +121,27 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
case e: IOException =>
error(s"Failed to access to /$index/$estype/_search", e)
None
- } finally {
- restClient.close()
}
}
def getAll(): Seq[App] = {
- val restClient = client.open()
try {
val json =
("query" ->
("match_all" -> Nil))
- ESUtils.getAll[App](restClient, index, estype, compact(render(json)))
+ ESUtils.getAll[App](client, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
- } finally {
- restClient.close()
}
}
def update(app: App): Unit = {
val id = app.id.toString
- val restClient = client.open()
try {
val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON);
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/$id",
Map("refresh" -> "true").asJava,
@@ -173,15 +157,12 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
- } finally {
- restClient.close()
}
}
def delete(id: Int): Unit = {
- val restClient = client.open()
try {
- val response = restClient.performRequest(
+ val response = client.performRequest(
"DELETE",
s"/$index/$estype/$id",
Map("refresh" -> "true").asJava)
@@ -195,8 +176,6 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String)
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/id", e)
- } finally {
- restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index c142beb..b5eb5c8 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -27,35 +27,29 @@ import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.Channel
import org.apache.predictionio.data.storage.Channels
import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.{ResponseException, RestClient}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
-class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
+class ESChannels(client: RestClient, config: StorageClientConfig, index: String)
extends Channels with Logging {
implicit val formats = DefaultFormats.lossless
private val estype = "channels"
private val seq = new ESSequences(client, config, index)
-
- val restClient = client.open()
- try {
- ESUtils.createIndex(restClient, index,
- ESUtils.getNumberOfShards(config, index.toUpperCase),
- ESUtils.getNumberOfReplicas(config, index.toUpperCase))
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> false)) ~
- ("properties" ->
- ("name" -> ("type" -> "keyword"))))
- ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
- } finally {
- restClient.close()
- }
+
+ ESUtils.createIndex(client, index,
+ ESUtils.getNumberOfShards(config, index.toUpperCase),
+ ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> false)) ~
+ ("properties" ->
+ ("name" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
def insert(channel: Channel): Option[Int] = {
val id = channel.id match {
@@ -75,9 +69,8 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
}
def get(id: Int): Option[Channel] = {
- val restClient = client.open()
try {
- val response = restClient.performRequest(
+ val response = client.performRequest(
"GET",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -99,34 +92,28 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
case e: IOException =>
error(s"Failed to access to /$index/$estype/$id", e)
None
- } finally {
- restClient.close()
}
}
def getByAppid(appid: Int): Seq[Channel] = {
- val restClient = client.open()
try {
val json =
("query" ->
("term" ->
("appid" -> appid)))
- ESUtils.getAll[Channel](restClient, index, estype, compact(render(json)))
+ ESUtils.getAll[Channel](client, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error(s"Failed to access to /$index/$estype/_search", e)
Nil
- } finally {
- restClient.close()
}
}
def update(channel: Channel): Boolean = {
val id = channel.id.toString
- val restClient = client.open()
try {
val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/$id",
Map("refresh" -> "true").asJava,
@@ -144,15 +131,12 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
false
- } finally {
- restClient.close()
}
}
def delete(id: Int): Unit = {
- val restClient = client.open()
try {
- val response = restClient.performRequest(
+ val response = client.performRequest(
"DELETE",
s"/$index/$estype/$id",
Map("refresh" -> "true").asJava)
@@ -166,8 +150,6 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
- } finally {
- restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index de474c1..eec5b64 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -28,46 +28,40 @@ import org.apache.predictionio.data.storage.EngineInstance
import org.apache.predictionio.data.storage.EngineInstanceSerializer
import org.apache.predictionio.data.storage.EngineInstances
import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.{ResponseException, RestClient}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
-class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: String)
+class ESEngineInstances(client: RestClient, config: StorageClientConfig, index: String)
extends EngineInstances with Logging {
implicit val formats = DefaultFormats + new EngineInstanceSerializer
private val estype = "engine_instances"
-
- val restClient = client.open()
- try {
- ESUtils.createIndex(restClient, index,
- ESUtils.getNumberOfShards(config, index.toUpperCase),
- ESUtils.getNumberOfReplicas(config, index.toUpperCase))
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> false)) ~
- ("properties" ->
- ("status" -> ("type" -> "keyword")) ~
- ("startTime" -> ("type" -> "date")) ~
- ("endTime" -> ("type" -> "date")) ~
- ("engineId" -> ("type" -> "keyword")) ~
- ("engineVersion" -> ("type" -> "keyword")) ~
- ("engineVariant" -> ("type" -> "keyword")) ~
- ("engineFactory" -> ("type" -> "keyword")) ~
- ("batch" -> ("type" -> "keyword")) ~
- ("dataSourceParams" -> ("type" -> "keyword")) ~
- ("preparatorParams" -> ("type" -> "keyword")) ~
- ("algorithmsParams" -> ("type" -> "keyword")) ~
- ("servingParams" -> ("type" -> "keyword")) ~
- ("status" -> ("type" -> "keyword"))))
- ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
- } finally {
- restClient.close()
- }
+
+ ESUtils.createIndex(client, index,
+ ESUtils.getNumberOfShards(config, index.toUpperCase),
+ ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> false)) ~
+ ("properties" ->
+ ("status" -> ("type" -> "keyword")) ~
+ ("startTime" -> ("type" -> "date")) ~
+ ("endTime" -> ("type" -> "date")) ~
+ ("engineId" -> ("type" -> "keyword")) ~
+ ("engineVersion" -> ("type" -> "keyword")) ~
+ ("engineVariant" -> ("type" -> "keyword")) ~
+ ("engineFactory" -> ("type" -> "keyword")) ~
+ ("batch" -> ("type" -> "keyword")) ~
+ ("dataSourceParams" -> ("type" -> "keyword")) ~
+ ("preparatorParams" -> ("type" -> "keyword")) ~
+ ("algorithmsParams" -> ("type" -> "keyword")) ~
+ ("servingParams" -> ("type" -> "keyword")) ~
+ ("status" -> ("type" -> "keyword"))))
+ ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
def insert(i: EngineInstance): String = {
val id = i.id match {
@@ -88,10 +82,9 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
}
def preInsert(): Option[String] = {
- val restClient = client.open()
try {
val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/",
Map("refresh" -> "true").asJava,
@@ -109,15 +102,12 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
case e: IOException =>
error(s"Failed to create $index/$estype", e)
None
- } finally {
- restClient.close()
}
}
def get(id: String): Option[EngineInstance] = {
- val restClient = client.open()
try {
- val response = restClient.performRequest(
+ val response = client.performRequest(
"GET",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -139,24 +129,19 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
case e: IOException =>
error(s"Failed to access to /$index/$estype/$id", e)
None
- } finally {
- restClient.close()
}
}
def getAll(): Seq[EngineInstance] = {
- val restClient = client.open()
try {
val json =
("query" ->
("match_all" -> List.empty))
- ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json)))
+ ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
- } finally {
- restClient.close()
}
}
@@ -164,7 +149,6 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
engineId: String,
engineVersion: String,
engineVariant: String): Seq[EngineInstance] = {
- val restClient = client.open()
try {
val json =
("query" ->
@@ -181,13 +165,11 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
("sort" -> List(
("startTime" ->
("order" -> "desc"))))
- ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json)))
+ ESUtils.getAll[EngineInstance](client, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error(s"Failed to access to /$index/$estype/_search", e)
Nil
- } finally {
- restClient.close()
}
}
@@ -202,10 +184,9 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
def update(i: EngineInstance): Unit = {
val id = i.id
- val restClient = client.open()
try {
val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/$id",
Map("refresh" -> "true").asJava,
@@ -221,15 +202,12 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
- } finally {
- restClient.close()
}
}
def delete(id: String): Unit = {
- val restClient = client.open()
try {
- val response = restClient.performRequest(
+ val response = client.performRequest(
"DELETE",
s"/$index/$estype/$id",
Map("refresh" -> "true").asJava)
@@ -243,8 +221,6 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
- } finally {
- restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 9b19cf4..1706583 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -29,43 +29,37 @@ import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
import org.apache.predictionio.data.storage.EvaluationInstances
import org.apache.predictionio.data.storage.StorageClientConfig
import org.apache.predictionio.data.storage.StorageClientException
-import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.{ResponseException, RestClient}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write
import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
-class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index: String)
+class ESEvaluationInstances(client: RestClient, config: StorageClientConfig, index: String)
extends EvaluationInstances with Logging {
implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
private val estype = "evaluation_instances"
private val seq = new ESSequences(client, config, index)
-
- val restClient = client.open()
- try {
- ESUtils.createIndex(restClient, index,
- ESUtils.getNumberOfShards(config, index.toUpperCase),
- ESUtils.getNumberOfReplicas(config, index.toUpperCase))
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> false)) ~
- ("properties" ->
- ("status" -> ("type" -> "keyword")) ~
- ("startTime" -> ("type" -> "date")) ~
- ("endTime" -> ("type" -> "date")) ~
- ("evaluationClass" -> ("type" -> "keyword")) ~
- ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
- ("batch" -> ("type" -> "keyword")) ~
- ("evaluatorResults" -> ("type" -> "text")) ~
- ("evaluatorResultsHTML" -> ("enabled" -> false)) ~
- ("evaluatorResultsJSON" -> ("enabled" -> false))))
- ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
- } finally {
- restClient.close()
- }
+
+ ESUtils.createIndex(client, index,
+ ESUtils.getNumberOfShards(config, index.toUpperCase),
+ ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> false)) ~
+ ("properties" ->
+ ("status" -> ("type" -> "keyword")) ~
+ ("startTime" -> ("type" -> "date")) ~
+ ("endTime" -> ("type" -> "date")) ~
+ ("evaluationClass" -> ("type" -> "keyword")) ~
+ ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
+ ("batch" -> ("type" -> "keyword")) ~
+ ("evaluatorResults" -> ("type" -> "text")) ~
+ ("evaluatorResultsHTML" -> ("enabled" -> false)) ~
+ ("evaluatorResultsJSON" -> ("enabled" -> false))))
+ ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
def insert(i: EvaluationInstance): String = {
val id = i.id match {
@@ -85,9 +79,8 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
}
def get(id: String): Option[EvaluationInstance] = {
- val restClient = client.open()
try {
- val response = restClient.performRequest(
+ val response = client.performRequest(
"GET",
s"/$index/$estype/$id",
Map.empty[String, String].asJava)
@@ -109,29 +102,23 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
case e: IOException =>
error(s"Failed to access to /$index/$estype/$id", e)
None
- } finally {
- restClient.close()
}
}
def getAll(): Seq[EvaluationInstance] = {
- val restClient = client.open()
try {
val json =
("query" ->
("match_all" -> List.empty))
- ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json)))
+ ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
- } finally {
- restClient.close()
}
}
def getCompleted(): Seq[EvaluationInstance] = {
- val restClient = client.open()
try {
val json =
("query" ->
@@ -140,22 +127,19 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
("sort" ->
("startTime" ->
("order" -> "desc")))
- ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json)))
+ ESUtils.getAll[EvaluationInstance](client, index, estype, compact(render(json)))
} catch {
case e: IOException =>
error("Failed to access to /$index/$estype/_search", e)
Nil
- } finally {
- restClient.close()
}
}
def update(i: EvaluationInstance): Unit = {
val id = i.id
- val restClient = client.open()
try {
val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/$id",
Map("refresh" -> "true").asJava,
@@ -171,15 +155,12 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
- } finally {
- restClient.close()
}
}
def delete(id: String): Unit = {
- val restClient = client.open()
try {
- val response = restClient.performRequest(
+ val response = client.performRequest(
"DELETE",
s"/$index/$estype/$id",
Map("refresh" -> "true").asJava)
@@ -193,8 +174,6 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/$id", e)
- } finally {
- restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
index 6c0c4a7..5e1f4c1 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
@@ -28,6 +28,7 @@ import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.Event
import org.apache.predictionio.data.storage.LEvents
import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.{ResponseException, RestClient}
import org.joda.time.DateTime
import org.json4s._
import org.json4s.JsonDSL._
@@ -37,10 +38,9 @@ import org.json4s.ext.JodaTimeSerializers
import grizzled.slf4j.Logging
import org.apache.http.message.BasicHeader
-class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String)
+class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String)
extends LEvents with Logging {
implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
- val restClient = client.open()
def getEsType(appId: Int, channelId: Option[Int] = None): String = {
channelId.map { ch =>
@@ -52,7 +52,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
val estype = getEsType(appId, channelId)
- ESUtils.createIndex(restClient, index,
+ ESUtils.createIndex(client, index,
ESUtils.getNumberOfShards(config, index.toUpperCase),
ESUtils.getNumberOfReplicas(config, index.toUpperCase))
val json =
@@ -71,7 +71,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
("tags" -> ("type" -> "keyword")) ~
("prId" -> ("type" -> "keyword")) ~
("creationTime" -> ("type" -> "date"))))
- ESUtils.createMapping(restClient, index, estype, compact(render(json)))
+ ESUtils.createMapping(client, index, estype, compact(render(json)))
true
}
@@ -82,7 +82,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
("query" ->
("match_all" -> List.empty))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
- restClient.performRequest(
+ client.performRequest(
"POST",
s"/$index/$estype/_delete_by_query",
Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
@@ -99,9 +99,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
}
}
- override def close(): Unit = {
- restClient.close()
- }
+ override def close(): Unit = {}
override def futureInsert(
event: Event,
@@ -126,7 +124,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~
("properties" -> write(event.properties.toJObject))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/$id",
Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
@@ -185,7 +183,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
}.mkString("", "\n", "\n")
val entity = new StringEntity(json)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
"/_bulk",
Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
@@ -215,6 +213,29 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
}
}
+ private def exists(client: RestClient, estype: String, id: Int): Boolean = {
+ try {
+ client.performRequest(
+ "GET",
+ s"/$index/$estype/$id",
+ Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+ case 200 => true
+ case _ => false
+ }
+ } catch {
+ case e: ResponseException =>
+ e.getResponse.getStatusLine.getStatusCode match {
+ case 404 => false
+ case _ =>
+ error(s"Failed to access to /$index/$estype/$id", e)
+ false
+ }
+ case e: IOException =>
+ error(s"Failed to access to $index/$estype/$id", e)
+ false
+ }
+ }
+
override def futureGet(
eventId: String,
appId: Int,
@@ -227,7 +248,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
("term" ->
("eventId" -> eventId)))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/_search",
Map.empty[String, String].asJava,
@@ -260,7 +281,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
("term" ->
("eventId" -> eventId)))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/_delete_by_query",
Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
@@ -301,8 +322,8 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
startTime, untilTime, entityType, entityId,
eventNames, targetEntityType, targetEntityId, reversed)
limit.getOrElse(20) match {
- case -1 => ESUtils.getEventAll(restClient, index, estype, query).toIterator
- case size => ESUtils.getEvents(restClient, index, estype, query, size).toIterator
+ case -1 => ESUtils.getEventAll(client, index, estype, query).toIterator
+ case size => ESUtils.getEvents(client, index, estype, query, size).toIterator
}
} catch {
case e: IOException =>
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
index 9f0a188..75f7639 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
@@ -41,7 +41,7 @@ import org.json4s.native.JsonMethods._
import org.json4s.ext.JodaTimeSerializers
-class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
+class ESPEvents(client: RestClient, config: StorageClientConfig, index: String)
extends PEvents {
implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
@@ -107,8 +107,6 @@ class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
eventIds: RDD[String],
appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
val estype = getEsType(appId, channelId)
- val restClient = client.open()
- try {
eventIds.foreachPartition { iter =>
iter.foreach { eventId =>
try {
@@ -117,28 +115,23 @@ class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
("term" ->
("eventId" -> eventId)))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/_delete_by_query",
Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
entity)
- val jsonResponse = parse(EntityUtils.toString(response.getEntity))
- val result = (jsonResponse \ "result").extract[String]
- result match {
- case "deleted" => true
- case _ =>
- logger.error(s"[$result] Failed to update $index/$estype:$eventId")
- false
- }
- } catch {
- case e: IOException =>
- logger.error(s"Failed to update $index/$estype:$eventId", e)
- false
+ val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+ val result = (jsonResponse \ "result").extract[String]
+ result match {
+ case "deleted" =>
+ case _ =>
+ logger.error(s"[$result] Failed to update $index/$estype:$eventId")
}
+ } catch {
+ case e: IOException =>
+ logger.error(s"Failed to update $index/$estype:$eventId", e)
}
}
- } finally {
- restClient.close()
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index 9fd31a3..018ef85 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -35,30 +35,24 @@ import org.json4s.native.Serialization.write
import grizzled.slf4j.Logging
-class ESSequences(client: ESClient, config: StorageClientConfig, index: String) extends Logging {
+class ESSequences(client: RestClient, config: StorageClientConfig, index: String) extends Logging {
implicit val formats = DefaultFormats
private val estype = "sequences"
- val restClient = client.open()
- try {
- ESUtils.createIndex(restClient, index,
- ESUtils.getNumberOfShards(config, index.toUpperCase),
- ESUtils.getNumberOfReplicas(config, index.toUpperCase))
- val mappingJson =
- (estype ->
- ("_all" -> ("enabled" -> false)) ~
- ("properties" ->
- ("n" -> ("enabled" -> false))))
- ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson)))
- } finally {
- restClient.close()
- }
+ ESUtils.createIndex(client, index,
+ ESUtils.getNumberOfShards(config, index.toUpperCase),
+ ESUtils.getNumberOfReplicas(config, index.toUpperCase))
+ val mappingJson =
+ (estype ->
+ ("_all" -> ("enabled" -> false)) ~
+ ("properties" ->
+ ("n" -> ("enabled" -> false))))
+ ESUtils.createMapping(client, index, estype, compact(render(mappingJson)))
def genNext(name: String): Long = {
- val restClient = client.open()
try {
val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON)
- val response = restClient.performRequest(
+ val response = client.performRequest(
"POST",
s"/$index/$estype/$name",
Map("refresh" -> "false").asJava,
@@ -76,8 +70,6 @@ class ESSequences(client: ESClient, config: StorageClientConfig, index: String)
} catch {
case e: IOException =>
throw new StorageClientException(s"Failed to update $index/$estype/$name", e)
- } finally {
- restClient.close()
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
index 647d180..d2c69b9 100644
--- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
+++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
@@ -18,27 +18,84 @@
package org.apache.predictionio.data.storage.elasticsearch
import org.apache.http.HttpHost
+import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
import org.apache.predictionio.data.storage.BaseStorageClient
import org.apache.predictionio.data.storage.StorageClientConfig
import org.apache.predictionio.data.storage.StorageClientException
+import org.apache.predictionio.workflow.CleanupFunctions
import org.elasticsearch.client.RestClient
+import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
import grizzled.slf4j.Logging
-case class ESClient(hosts: Seq[HttpHost]) {
- def open(): RestClient = {
+object ESClient extends Logging {
+ private var _sharedRestClient: Option[RestClient] = None
+
+ def open(
+ hosts: Seq[HttpHost],
+ basicAuth: Option[(String, String)] = None): RestClient = {
try {
- RestClient.builder(hosts: _*).build()
+ val newClient = _sharedRestClient match {
+ case Some(c) => c
+ case None => {
+ var builder = RestClient.builder(hosts: _*)
+ builder = basicAuth match {
+ case Some((username, password)) => builder.setHttpClientConfigCallback(
+ new BasicAuthProvider(username, password))
+ case None => builder}
+ builder.build()
+ }
+ }
+ _sharedRestClient = Some(newClient)
+ newClient
} catch {
case e: Throwable =>
throw new StorageClientException(e.getMessage, e)
}
}
+
+ def close(): Unit = {
+ _sharedRestClient.foreach { client =>
+ client.close()
+ _sharedRestClient = None
+ }
+ }
}
-class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
- with Logging {
+class StorageClient(val config: StorageClientConfig)
+ extends BaseStorageClient with Logging {
+
override val prefix = "ES"
- val client = ESClient(ESUtils.getHttpHosts(config))
+ val usernamePassword = (
+ config.properties.get("USERNAME"),
+ config.properties.get("PASSWORD"))
+ val optionalBasicAuth: Option[(String, String)] = usernamePassword match {
+ case (None, None) => None
+ case (username, password) => Some(
+ (username.getOrElse(""), password.getOrElse("")))
+ }
+
+ CleanupFunctions.add { ESClient.close }
+
+ val client = ESClient.open(ESUtils.getHttpHosts(config), optionalBasicAuth)
+}
+
+class BasicAuthProvider(
+ val username: String,
+ val password: String)
+ extends HttpClientConfigCallback {
+
+ val credentialsProvider = new BasicCredentialsProvider()
+ credentialsProvider.setCredentials(
+ AuthScope.ANY,
+ new UsernamePasswordCredentials(username, password))
+
+ override def customizeHttpClient(
+ httpClientBuilder: HttpAsyncClientBuilder
+ ): HttpAsyncClientBuilder = {
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala
new file mode 100644
index 0000000..91cc4d6
--- /dev/null
+++ b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClientSpec.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+import org.apache.predictionio.data.storage.{App, Apps, Storage, StorageClientConfig}
+import org.elasticsearch.client.{RestClient, Response}
+import scala.collection.JavaConverters._
+
+import org.specs2._
+import org.specs2.specification.Step
+
+class ElasticsearchStorageClientSpec extends Specification {
+ def is = s2"""
+
+ PredictionIO Storage Elasticsearch REST Client Specification ${getESClient}
+
+ """
+
+ def getESClient = sequential ^ s2"""
+
+ StorageClient should
+ - initialize metadata store ${initMetadataStore(appsDO)}
+
+ """
+
+ def initMetadataStore(appsDO: Apps) = sequential ^ s2"""
+
+ creates an app ${createsApp(appsDO)}
+ gets apps ${getApps(appsDO)}
+
+ """
+
+ val indexName = "test_pio_storage_meta_" + hashCode
+
+ def appsDO: Apps = Storage.getDataObject[Apps](StorageTestUtils.elasticsearchSourceName, indexName)
+
+ def createsApp(appsDO: Apps) = {
+ val newId: Int = 123
+ val newApp: App = App(newId, "test1", Some("App for ElasticsearchStorageClientSpec"))
+ val id: Option[Int] = appsDO.insert(newApp)
+ val createdApp: Option[App] = appsDO.get(id.get)
+ createdApp.get.id mustEqual newId
+ }
+
+ def getApps(appsDO: Apps) = {
+ val apps: Seq[App] = appsDO.getAll()
+ println(s"Storage.config ${Storage.config}")
+ println(s"getApps ${apps}")
+ apps must beAnInstanceOf[Seq[App]]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala
----------------------------------------------------------------------
diff --git a/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala
new file mode 100644
index 0000000..f891c58
--- /dev/null
+++ b/storage/elasticsearch/src/test/scala/org/apache/predictionio/data/storage/elasticsearch/StorageTestUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch
+
+object StorageTestUtils {
+ val elasticsearchSourceName = "ELASTICSEARCH"
+
+ def dropESIndex(namespace: String): Unit = {
+ // TODO
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/storage/hdfs/project/build.properties
----------------------------------------------------------------------
diff --git a/storage/hdfs/project/build.properties b/storage/hdfs/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ b/storage/hdfs/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
index c101d3f..0372a44 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala
@@ -25,6 +25,7 @@ import org.apache.predictionio.data.SparkVersionDependent
import org.apache.predictionio.tools.Runner
import org.apache.predictionio.workflow.WorkflowContext
import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.predictionio.workflow.CleanupFunctions
import grizzled.slf4j.Logging
import org.apache.spark.sql.SaveMode
@@ -69,40 +70,45 @@ object EventsToFile extends Logging {
}
}
parser.parse(args, EventsToFileArgs()) map { args =>
- // get channelId
- val channels = Storage.getMetaDataChannels
- val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
+ try {
+ // get channelId
+ val channels = Storage.getMetaDataChannels
+ val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
- val channelId: Option[Int] = args.channel.map { ch =>
- if (!channelMap.contains(ch)) {
- error(s"Channel ${ch} doesn't exist in this app.")
- sys.exit(1)
+ val channelId: Option[Int] = args.channel.map { ch =>
+ if (!channelMap.contains(ch)) {
+ error(s"Channel ${ch} doesn't exist in this app.")
+ sys.exit(1)
+ }
+
+ channelMap(ch)
}
- channelMap(ch)
- }
+ val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
- val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
+ WorkflowUtils.modifyLogging(verbose = args.verbose)
+ @transient lazy implicit val formats = Utils.json4sDefaultFormats +
+ new EventJson4sSupport.APISerializer
+ val sc = WorkflowContext(
+ mode = "Export",
+ batch = "App ID " + args.appId + channelStr,
+ executorEnv = Runner.envStringToMap(args.env))
+ val sqlSession = SparkVersionDependent.sqlSession(sc)
+ val events = Storage.getPEvents()
+ val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc)
+ val jsonStringRdd = eventsRdd.map(write(_))
+ if (args.format == "json") {
+ jsonStringRdd.saveAsTextFile(args.outputPath)
+ } else {
+ val jsonDf = sqlSession.read.json(jsonStringRdd)
+ jsonDf.write.mode(SaveMode.ErrorIfExists).parquet(args.outputPath)
+ }
+ info(s"Events are exported to ${args.outputPath}/.")
+ info("Done.")
- WorkflowUtils.modifyLogging(verbose = args.verbose)
- @transient lazy implicit val formats = Utils.json4sDefaultFormats +
- new EventJson4sSupport.APISerializer
- val sc = WorkflowContext(
- mode = "Export",
- batch = "App ID " + args.appId + channelStr,
- executorEnv = Runner.envStringToMap(args.env))
- val sqlSession = SparkVersionDependent.sqlSession(sc)
- val events = Storage.getPEvents()
- val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc)
- val jsonStringRdd = eventsRdd.map(write(_))
- if (args.format == "json") {
- jsonStringRdd.saveAsTextFile(args.outputPath)
- } else {
- val jsonDf = sqlSession.read.json(jsonStringRdd)
- jsonDf.write.mode(SaveMode.ErrorIfExists).parquet(args.outputPath)
+ } finally {
+ CleanupFunctions.run()
}
- info(s"Events are exported to ${args.outputPath}/.")
- info("Done.")
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/bf84ede6/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala b/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala
index 4b333ab..11f5a52 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala
@@ -25,6 +25,7 @@ import org.apache.predictionio.data.storage.Storage
import org.apache.predictionio.tools.Runner
import org.apache.predictionio.workflow.WorkflowContext
import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.predictionio.workflow.CleanupFunctions
import grizzled.slf4j.Logging
import org.json4s.native.Serialization._
@@ -66,41 +67,46 @@ object FileToEvents extends Logging {
}
}
parser.parse(args, FileToEventsArgs()) map { args =>
- // get channelId
- val channels = Storage.getMetaDataChannels
- val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
+ try {
+ // get channelId
+ val channels = Storage.getMetaDataChannels
+ val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap
- val channelId: Option[Int] = args.channel.map { ch =>
- if (!channelMap.contains(ch)) {
- error(s"Channel ${ch} doesn't exist in this app.")
- sys.exit(1)
+ val channelId: Option[Int] = args.channel.map { ch =>
+ if (!channelMap.contains(ch)) {
+ error(s"Channel ${ch} doesn't exist in this app.")
+ sys.exit(1)
+ }
+
+ channelMap(ch)
}
- channelMap(ch)
- }
+ val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
- val channelStr = args.channel.map(n => " Channel " + n).getOrElse("")
+ WorkflowUtils.modifyLogging(verbose = args.verbose)
+ @transient lazy implicit val formats = Utils.json4sDefaultFormats +
+ new EventJson4sSupport.APISerializer
+ val sc = WorkflowContext(
+ mode = "Import",
+ batch = "App ID " + args.appId + channelStr,
+ executorEnv = Runner.envStringToMap(args.env))
+ val rdd = sc.textFile(args.inputPath).filter(_.trim.nonEmpty).map { json =>
+ Try(read[Event](json)).recoverWith {
+ case e: Throwable =>
+ error(s"\nmalformed json => $json")
+ Failure(e)
+ }.get
+ }
+ val events = Storage.getPEvents()
+ events.write(events = rdd,
+ appId = args.appId,
+ channelId = channelId)(sc)
+ info("Events are imported.")
+ info("Done.")
- WorkflowUtils.modifyLogging(verbose = args.verbose)
- @transient lazy implicit val formats = Utils.json4sDefaultFormats +
- new EventJson4sSupport.APISerializer
- val sc = WorkflowContext(
- mode = "Import",
- batch = "App ID " + args.appId + channelStr,
- executorEnv = Runner.envStringToMap(args.env))
- val rdd = sc.textFile(args.inputPath).filter(_.trim.nonEmpty).map { json =>
- Try(read[Event](json)).recoverWith {
- case e: Throwable =>
- error(s"\nmalformed json => $json")
- Failure(e)
- }.get
+ } finally {
+ CleanupFunctions.run()
}
- val events = Storage.getPEvents()
- events.write(events = rdd,
- appId = args.appId,
- channelId = channelId)(sc)
- info("Events are imported.")
- info("Done.")
}
}
}