You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/14 12:29:52 UTC
[08/25] incubator-s2graph git commit: start working on ModelServing
examples.
start working on ModelServing examples.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/130fed26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/130fed26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/130fed26
Branch: refs/heads/master
Commit: 130fed262f5a4a529fed764103b5ee275a10e508
Parents: 8696d15
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu May 3 18:07:46 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu May 3 18:07:46 2018 +0900
----------------------------------------------------------------------
example/movielens/jobdesc.template | 43 +++++++-
example/run.sh | 3 +
.../apache/s2graph/s2jobs/JobDescription.scala | 4 +-
.../custom/process/ALSAnnoyBuildProcess.scala | 87 ---------------
.../task/custom/process/ALSModelProcess.scala | 110 +++++++++++++++++++
.../process/ALSAnnoyBuildProcessTest.scala | 104 ------------------
.../custom/process/ALSModelProcessTest.scala | 104 ++++++++++++++++++
7 files changed, 260 insertions(+), 195 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/example/movielens/jobdesc.template
----------------------------------------------------------------------
diff --git a/example/movielens/jobdesc.template b/example/movielens/jobdesc.template
index 0b0f0ad..13ef1cd 100644
--- a/example/movielens/jobdesc.template
+++ b/example/movielens/jobdesc.template
@@ -56,7 +56,7 @@
],
"type": "sql",
"options": {
- "sql": "SELECT \n(unix_timestamp() * 1000) as timestamp, \n'v' as elem, \nCAST(movieId AS LONG) AS id, \n'movielens' as service, \n'Movie' as column, \nto_json(\nnamed_struct(\n 'title', title, \n 'genres', genres\n)\n) as props \nFROM movies \nWHERE movieId != 'movieId'"
+ "sql": "SELECT \n(unix_timestamp() * 1000) as timestamp, \n'v' as elem, \nCAST(movieId AS INT) AS id, \n'movielens' as service, \n'Movie' as column, \nto_json(\nnamed_struct(\n 'title', title, \n 'genres', genres\n)\n) as props \nFROM movies \nWHERE movieId != 'movieId'"
}
},
{
@@ -66,7 +66,7 @@
],
"type": "sql",
"options": {
- "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS LONG) as `from`, \nCAST(movieId AS LONG) as to, \n'rated' as label, \nto_json(\nnamed_struct(\n 'score', CAST(rating as float)\n)\n) as props \nFROM ratings \nWHERE userId != 'userId'"
+ "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS INT) as `from`, \nCAST(movieId AS INT) as to, \n'rated' as label, \nto_json(\nnamed_struct(\n 'score', CAST(rating as float)\n)\n) as props \nFROM ratings \nWHERE userId != 'userId'"
}
},
{
@@ -76,7 +76,7 @@
],
"type": "sql",
"options": {
- "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS LONG) as `from`, \nCAST(movieId AS LONG) as to, \n'tagged' as label, \nto_json(\nnamed_struct('tag', tag)\n) as props \nFROM tags \nWHERE userId != 'userId'"
+ "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS INT) as `from`, \nCAST(movieId AS INT) as to, \n'tagged' as label, \nto_json(\nnamed_struct('tag', tag)\n) as props \nFROM tags \nWHERE userId != 'userId'"
}
},
{
@@ -89,6 +89,32 @@
"options": {
"sql": "SELECT * FROM edge_rated UNION SELECT * FROM edge_tagged"
}
+ },
+ {
+ "name": "build_als_input",
+ "inputs": [
+ "edge_rated"
+ ],
+ "type": "sql",
+ "options": {
+ "sql": "SELECT \n`from` as userId, `to` as movieId, 1.0 as rating FROM edge_rated"
+ }
+ },
+ {
+ "name": "factorize_rating",
+ "inputs": [
+ "build_als_input"
+ ],
+ "type": "custom",
+ "options": {
+ "class": "org.apache.s2graph.s2jobs.task.custom.process.ALSModelProcess",
+ "rank": "10",
+ "maxIter": "5",
+ "regParam": "0.01",
+ "userCol": "userId",
+ "itemCol": "movieId",
+ "ratingCol": "rating"
+ }
}
],
"sink": [
@@ -117,6 +143,17 @@
"s2.spark.sql.streaming.sink.grouped.size": "10",
"s2.spark.sql.streaming.sink.wait.time": "10"
}
+ },
+ {
+ "name": "annoy_index_build",
+ "inputs": [
+ "factorize_rating"
+ ],
+ "type": "annoy",
+ "options": {
+ "itemFactors": "/tmp/itemFactors",
+ "path": "/tmp/annoy_result"
+ }
}
]
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/example/run.sh
----------------------------------------------------------------------
diff --git a/example/run.sh b/example/run.sh
index fd324ac..5256974 100644
--- a/example/run.sh
+++ b/example/run.sh
@@ -41,3 +41,6 @@ q "Finally, we import example data to service"
sh ./import_data.sh $SERVICE
[ $? -ne 0 ] && { exit -1; }
+#q "Run ML Model into S2Graph by importing Model."
+#sh ./import_model.sh $SERVICE
+#[ $? -ne 0 ] && { exit -1; }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
index 6abbe86..dc32bc5 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.s2jobs
import play.api.libs.json.{JsValue, Json}
import org.apache.s2graph.s2jobs.task._
+import org.apache.s2graph.s2jobs.task.custom.process.AnnoyIndexBuildSink
case class JobDescription(
name:String,
@@ -65,7 +66,7 @@ object JobDescription extends Logger {
logger.debug(s"custom class init.. $customClass")
Class.forName(customClass)
- .getConstructor(TaskConf.getClass)
+ .getConstructor(classOf[TaskConf])
.newInstance(conf)
.asInstanceOf[task.Process]
@@ -82,6 +83,7 @@ object JobDescription extends Logger {
case "file" => new FileSink(jobName, conf)
case "es" => new ESSink(jobName, conf)
case "s2graph" => new S2GraphSink(jobName, conf)
+ case "annoy" => new AnnoyIndexBuildSink(jobName, conf)
case _ => throw new IllegalArgumentException(s"unsupported sink type : ${conf.`type`}")
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala
deleted file mode 100644
index 968bf5a..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-package org.apache.s2graph.s2jobs.task.custom.process
-
-import java.io.File
-
-import annoy4s.{Angular, Annoy}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
-import org.apache.s2graph.s2jobs.task.TaskConf
-import org.apache.spark.ml.recommendation.{ALS, ALSModel}
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
-
-object ALSAnnoyBuildProcess {
-
- def buildAnnoyIndex(ss: SparkSession,
- conf: TaskConf,
- dataFrame: DataFrame): Unit = {
- // annoy tree params.
- val outputPath = conf.options("outputPath")
- val localInputPath = conf.options("localInputPath")
- val localIndexPath = conf.options("localIndexPath")
- val numDimensions = conf.options.getOrElse("dimensions", "10").toInt
-
- // als model params.
- val rank = conf.options.getOrElse("rank", numDimensions.toString).toInt
- val maxIter = conf.options.getOrElse("maxIter", "5").toInt
- val regParam = conf.options.getOrElse("regParam", "0.01").toDouble
- val userCol = conf.options.getOrElse("userCol", "userId")
- val itemCol = conf.options.getOrElse("itemCol", "movieId")
- val ratingCol = conf.options.getOrElse("ratingCol", "rating")
-
- assert(rank == numDimensions)
-
- val als = new ALS()
- .setRank(rank)
- .setMaxIter(maxIter)
- .setRegParam(regParam)
- .setUserCol(userCol)
- .setItemCol(itemCol)
- .setRatingCol(ratingCol)
-
- val model = als.fit(dataFrame)
-
- saveFeatures(ss, model.itemFactors, outputPath)
- copyToLocal(ss.sparkContext.hadoopConfiguration, outputPath, localInputPath)
-
- FileUtil.fullyDelete(new File(localIndexPath))
-
- Annoy.create[Int](s"${localInputPath}", numDimensions, outputDir = s"$localIndexPath", Angular)
- }
-
- def saveFeatures(ss: SparkSession,
- dataFrame: DataFrame,
- outputPath: String,
- idCol: String = "id",
- featuresCol: String = "features"): Unit = {
- import ss.sqlContext.implicits._
-
- val result = dataFrame.map { row =>
- val id = row.getAs[Int](idCol)
- val vector = row.getAs[Seq[Float]](featuresCol)
- (Seq(id) ++ vector).mkString(" ")
- }
-
- result.write.mode(SaveMode.Overwrite).csv(outputPath)
- }
-
- def copyToLocal(configuration: Configuration,
- remoteInputPath: String,
- localOutputPath: String,
- merge: Boolean = true): Unit = {
- val fs = FileSystem.get(configuration)
- val localFs = FileSystem.getLocal(configuration)
- localFs.deleteOnExit(new Path(localOutputPath))
-
- if (merge)
- FileUtil.copyMerge(fs, new Path(remoteInputPath), localFs, new Path(localOutputPath), false, configuration, "")
- else
- fs.copyToLocalFile(new Path(remoteInputPath), new Path(localOutputPath))
- }
-}
-class ALSAnnoyBuildProcess(conf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(conf) {
- override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = ???
-
- override def mandatoryOptions: Set[String] = Set("outputPath")
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
new file mode 100644
index 0000000..26ef6ad
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
@@ -0,0 +1,110 @@
+package org.apache.s2graph.s2jobs.task.custom.process
+
+import java.io.File
+
+import annoy4s.{Angular, Annoy}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
+import org.apache.s2graph.s2jobs.task.{Sink, TaskConf}
+import org.apache.spark.ml.recommendation.{ALS, ALSModel}
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+object ALSModelProcess {
+
+ def runALS(ss: SparkSession,
+ conf: TaskConf,
+ dataFrame: DataFrame): DataFrame = {
+ // als model params.
+ val rank = conf.options.getOrElse("rank", "10").toInt
+ val maxIter = conf.options.getOrElse("maxIter", "5").toInt
+ val regParam = conf.options.getOrElse("regParam", "0.01").toDouble
+ val userCol = conf.options.getOrElse("userCol", "userId")
+ val itemCol = conf.options.getOrElse("itemCol", "movieId")
+ val ratingCol = conf.options.getOrElse("ratingCol", "rating")
+
+// assert(rank == numDimensions)
+
+ val als = new ALS()
+ .setRank(rank)
+ .setMaxIter(maxIter)
+ .setRegParam(regParam)
+ .setUserCol(userCol)
+ .setItemCol(itemCol)
+ .setRatingCol(ratingCol)
+
+ val model = als.fit(dataFrame)
+
+ model.itemFactors
+ }
+
+ def buildAnnoyIndex(conf: TaskConf,
+ dataFrame: DataFrame): Unit = {
+ // annoy tree params.
+ val itemFactorsPath = conf.options("itemFactors")
+ val tempPath = conf.options.getOrElse("tempPath", "/tmp")
+
+ val tempInputPath = tempPath + "/_tmp"
+
+ val annoyResultPath = conf.options("path")
+ val numDimensions = conf.options.getOrElse("dimensions", "10").toInt
+
+ saveFeatures(dataFrame, itemFactorsPath)
+ copyToLocal(dataFrame.sparkSession.sparkContext.hadoopConfiguration, itemFactorsPath, tempInputPath)
+
+ FileUtil.fullyDelete(new File(annoyResultPath))
+
+ Annoy.create[Int](s"${tempInputPath}", numDimensions, outputDir = s"$annoyResultPath", Angular)
+ }
+
+ def saveFeatures(dataFrame: DataFrame,
+ outputPath: String,
+ idCol: String = "id",
+ featuresCol: String = "features"): Unit = {
+
+ import dataFrame.sparkSession.implicits._
+
+ val result = dataFrame.map { row =>
+ val id = row.getAs[Int](idCol)
+ val vector = row.getAs[Seq[Float]](featuresCol)
+ (Seq(id) ++ vector).mkString(" ")
+ }
+
+ result.write.mode(SaveMode.Overwrite).csv(outputPath)
+ }
+
+ def copyToLocal(configuration: Configuration,
+ remoteInputPath: String,
+ localOutputPath: String,
+ merge: Boolean = true): Unit = {
+ val fs = FileSystem.get(configuration)
+ val localFs = FileSystem.getLocal(configuration)
+ localFs.deleteOnExit(new Path(localOutputPath))
+
+ if (merge)
+ FileUtil.copyMerge(fs, new Path(remoteInputPath), localFs, new Path(localOutputPath), false, configuration, "")
+ else
+ fs.copyToLocalFile(new Path(remoteInputPath), new Path(localOutputPath))
+ }
+}
+
+class ALSModelProcess(conf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(conf) {
+ override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = {
+ ALSModelProcess.runALS(ss, conf, inputMap.head._2)
+ }
+ override def mandatoryOptions: Set[String] = Set.empty
+}
+
+class AnnoyIndexBuildSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
+ override val FORMAT: String = "parquet"
+
+ override def mandatoryOptions: Set[String] = Set("path", "itemFactors")
+
+ override def write(inputDF: DataFrame): Unit = {
+ val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
+
+ if (inputDF.isStreaming) throw new IllegalStateException("AnnoyIndexBuildSink can not be run as streaming.")
+ else {
+ ALSModelProcess.buildAnnoyIndex(conf, inputDF)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala
deleted file mode 100644
index 6279e82..0000000
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.apache.s2graph.s2jobs.task.custom.process
-
-import java.io.File
-
-import com.holdenkarau.spark.testing.DataFrameSuiteBase
-import com.typesafe.config.ConfigFactory
-import org.apache.commons.io.FileUtils
-import org.apache.s2graph.core.Integrate.IntegrateCommon
-import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.{Query, QueryParam}
-import org.apache.s2graph.core.model.{AnnoyModelFetcher, HDFSImporter, ModelManager}
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.s2jobs.task.TaskConf
-
-import scala.concurrent.{Await, ExecutionContext}
-import scala.concurrent.duration.Duration
-import scala.io.Source
-
-class ALSAnnoyBuildProcessTest extends IntegrateCommon with DataFrameSuiteBase {
- import scala.collection.JavaConverters._
-
- // this test require adding movie lens rating data(u.data, movie.txt) under resources
- // so ignore for now until figure out how to automate download dataset.
- ignore("RUN ALS on movie lens rating data and build annoy index on itemFeatures, finally query.") {
- import spark.sqlContext.implicits._
- val ratingPath = this.getClass.getResource("/u.data").toURI.getPath
-
- val ratings = Source.fromFile(new File(ratingPath)).getLines().toSeq.map { line =>
- val tokens = line.split("\t")
- (tokens(0).toInt, tokens(1).toInt, tokens(2).toFloat)
- }.toDF("userId", "movieId", "rating")
-
- val outputPath = "/tmp"
- val localInputPath = "/tmp/annoy_input"
- val localIndexPath = "/tmp/annoy_result"
-
- val taskOptions = Map(
- "outputPath" -> outputPath,
- "localInputPath" -> localInputPath,
- "localIndexPath" -> localIndexPath
- )
-
- val conf = TaskConf("test", "test", Nil, taskOptions)
- ALSAnnoyBuildProcess.buildAnnoyIndex(spark, conf, ratings)
-
- val labelName = "annoy_model_fetcher_test"
-
- val remoteIndexFilePath = s"${localIndexPath}/annoy-index"
- val remoteDictFilePath = this.getClass.getResource(s"/movie.dict").toURI.getPath
-
- val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
- val serviceColumn =
- management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
-
- val options = s"""{
- | "importer": {
- | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter"
- | },
- | "fetcher": {
- | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.AnnoyModelFetcher",
- | "${AnnoyModelFetcher.IndexFilePathKey}": "${remoteIndexFilePath}",
- | "${AnnoyModelFetcher.DictFilePathKey}": "${remoteDictFilePath}",
- | "${AnnoyModelFetcher.DimensionKey}": 10
- | }
- |}""".stripMargin
-
- Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
-
- val label = management.createLabel(
- labelName,
- serviceColumn,
- serviceColumn,
- true,
- service.serviceName,
- Seq.empty[Index].asJava,
- Seq.empty[Prop].asJava,
- "strong",
- null,
- -1,
- "v3",
- "gz",
- options
- )
- val config = ConfigFactory.parseString(options)
- val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
- Await.result(importerFuture, Duration("3 minutes"))
-
- Thread.sleep(10000)
-
- val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "Toy Story (1995)")
- val queryParam = QueryParam(labelName = labelName, limit = 5)
-
- val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
- val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
-
- stepResult.edgeWithScores.foreach { es =>
- println(es.edge.tgtVertex.innerIdVal)
- }
-
- // clean up temp directory.
- FileUtils.deleteDirectory(new File(outputPath))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
new file mode 100644
index 0000000..d16ebf0
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
@@ -0,0 +1,104 @@
+package org.apache.s2graph.s2jobs.task.custom.process
+
+import java.io.File
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import com.typesafe.config.ConfigFactory
+import org.apache.commons.io.FileUtils
+import org.apache.s2graph.core.Integrate.IntegrateCommon
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.{Query, QueryParam}
+import org.apache.s2graph.core.model.{AnnoyModelFetcher, HDFSImporter, ModelManager}
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.s2jobs.task.TaskConf
+
+import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.duration.Duration
+import scala.io.Source
+
+class ALSModelProcessTest extends IntegrateCommon with DataFrameSuiteBase {
+ import scala.collection.JavaConverters._
+
+ // this test require adding movie lens rating data(u.data, movie.txt) under resources
+ // so ignore for now until figure out how to automate download dataset.
+// ignore("RUN ALS on movie lens rating data and build annoy index on itemFeatures, finally query.") {
+// import spark.sqlContext.implicits._
+// val ratingPath = this.getClass.getResource("/u.data").toURI.getPath
+//
+// val ratings = Source.fromFile(new File(ratingPath)).getLines().toSeq.map { line =>
+// val tokens = line.split("\t")
+// (tokens(0).toInt, tokens(1).toInt, tokens(2).toFloat)
+// }.toDF("userId", "movieId", "rating")
+//
+// val outputPath = "/tmp"
+// val localInputPath = "/tmp/annoy_input"
+// val localIndexPath = "/tmp/annoy_result"
+//
+// val taskOptions = Map(
+// "outputPath" -> outputPath,
+// "localInputPath" -> localInputPath,
+// "localIndexPath" -> localIndexPath
+// )
+//
+// val conf = TaskConf("test", "test", Nil, taskOptions)
+// ALSModelProcess.buildAnnoyIndex(spark, conf, ratings)
+//
+// val labelName = "annoy_model_fetcher_test"
+//
+// val remoteIndexFilePath = s"${localIndexPath}/annoy-index"
+// val remoteDictFilePath = this.getClass.getResource(s"/movie.dict").toURI.getPath
+//
+// val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
+// val serviceColumn =
+// management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
+//
+// val options = s"""{
+// | "importer": {
+// | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter"
+// | },
+// | "fetcher": {
+// | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.AnnoyModelFetcher",
+// | "${AnnoyModelFetcher.IndexFilePathKey}": "${remoteIndexFilePath}",
+// | "${AnnoyModelFetcher.DictFilePathKey}": "${remoteDictFilePath}",
+// | "${AnnoyModelFetcher.DimensionKey}": 10
+// | }
+// |}""".stripMargin
+//
+// Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
+//
+// val label = management.createLabel(
+// labelName,
+// serviceColumn,
+// serviceColumn,
+// true,
+// service.serviceName,
+// Seq.empty[Index].asJava,
+// Seq.empty[Prop].asJava,
+// "strong",
+// null,
+// -1,
+// "v3",
+// "gz",
+// options
+// )
+// val config = ConfigFactory.parseString(options)
+// val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
+// Await.result(importerFuture, Duration("3 minutes"))
+//
+// Thread.sleep(10000)
+//
+// val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "Toy Story (1995)")
+// val queryParam = QueryParam(labelName = labelName, limit = 5)
+//
+// val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
+// val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
+//
+// stepResult.edgeWithScores.foreach { es =>
+// println(es.edge.tgtVertex.innerIdVal)
+// }
+//
+// // clean up temp directory.
+// FileUtils.deleteDirectory(new File(outputPath))
+// }
+
+}