You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/14 12:29:52 UTC

[08/25] incubator-s2graph git commit: start working on ModelServing examples.

start working on ModelServing examples.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/130fed26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/130fed26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/130fed26

Branch: refs/heads/master
Commit: 130fed262f5a4a529fed764103b5ee275a10e508
Parents: 8696d15
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu May 3 18:07:46 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu May 3 18:07:46 2018 +0900

----------------------------------------------------------------------
 example/movielens/jobdesc.template              |  43 +++++++-
 example/run.sh                                  |   3 +
 .../apache/s2graph/s2jobs/JobDescription.scala  |   4 +-
 .../custom/process/ALSAnnoyBuildProcess.scala   |  87 ---------------
 .../task/custom/process/ALSModelProcess.scala   | 110 +++++++++++++++++++
 .../process/ALSAnnoyBuildProcessTest.scala      | 104 ------------------
 .../custom/process/ALSModelProcessTest.scala    | 104 ++++++++++++++++++
 7 files changed, 260 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/example/movielens/jobdesc.template
----------------------------------------------------------------------
diff --git a/example/movielens/jobdesc.template b/example/movielens/jobdesc.template
index 0b0f0ad..13ef1cd 100644
--- a/example/movielens/jobdesc.template
+++ b/example/movielens/jobdesc.template
@@ -56,7 +56,7 @@
       ],
       "type": "sql",
       "options": {
-        "sql": "SELECT \n(unix_timestamp() * 1000) as timestamp, \n'v' as elem, \nCAST(movieId AS LONG) AS id, \n'movielens' as service, \n'Movie' as column, \nto_json(\nnamed_struct(\n 'title', title, \n 'genres', genres\n)\n) as props \nFROM movies \nWHERE movieId != 'movieId'"
+        "sql": "SELECT \n(unix_timestamp() * 1000) as timestamp, \n'v' as elem, \nCAST(movieId AS INT) AS id, \n'movielens' as service, \n'Movie' as column, \nto_json(\nnamed_struct(\n 'title', title, \n 'genres', genres\n)\n) as props \nFROM movies \nWHERE movieId != 'movieId'"
       }
     },
     {
@@ -66,7 +66,7 @@
       ],
       "type": "sql",
       "options": {
-        "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS LONG) as `from`, \nCAST(movieId AS LONG) as to, \n'rated' as label, \nto_json(\nnamed_struct(\n 'score', CAST(rating as float)\n)\n) as props \nFROM ratings \nWHERE userId != 'userId'"
+        "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS INT) as `from`, \nCAST(movieId AS INT) as to, \n'rated' as label, \nto_json(\nnamed_struct(\n 'score', CAST(rating as float)\n)\n) as props \nFROM ratings \nWHERE userId != 'userId'"
       }
     },
     {
@@ -76,7 +76,7 @@
       ],
       "type": "sql",
       "options": {
-        "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS LONG) as `from`, \nCAST(movieId AS LONG) as to, \n'tagged' as label, \nto_json(\nnamed_struct('tag', tag)\n) as props \nFROM tags \nWHERE userId != 'userId'"
+        "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS INT) as `from`, \nCAST(movieId AS INT) as to, \n'tagged' as label, \nto_json(\nnamed_struct('tag', tag)\n) as props \nFROM tags \nWHERE userId != 'userId'"
       }
     },
     {
@@ -89,6 +89,32 @@
       "options": {
         "sql": "SELECT * FROM edge_rated UNION SELECT * FROM edge_tagged"
       }
+    },
+    {
+      "name": "build_als_input",
+      "inputs": [
+        "edge_rated"
+      ],
+      "type": "sql",
+      "options": {
+        "sql": "SELECT \n`from` as userId, `to` as movieId, 1.0 as rating FROM edge_rated"
+      }
+    },
+    {
+      "name": "factorize_rating",
+      "inputs": [
+        "build_als_input"
+      ],
+      "type": "custom",
+      "options": {
+        "class": "org.apache.s2graph.s2jobs.task.custom.process.ALSModelProcess",
+        "rank": "10",
+        "maxIter": "5",
+        "regParam": "0.01",
+        "userCol": "userId",
+        "itemCol": "movieId",
+        "ratingCol": "rating"
+      }
     }
   ],
   "sink": [
@@ -117,6 +143,17 @@
         "s2.spark.sql.streaming.sink.grouped.size": "10",
         "s2.spark.sql.streaming.sink.wait.time": "10"
       }
+    },
+    {
+      "name": "annoy_index_build",
+      "inputs": [
+        "factorize_rating"
+      ],
+      "type": "annoy",
+      "options": {
+        "itemFactors": "/tmp/itemFactors",
+        "path": "/tmp/annoy_result"
+      }
     }
   ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/example/run.sh
----------------------------------------------------------------------
diff --git a/example/run.sh b/example/run.sh
index fd324ac..5256974 100644
--- a/example/run.sh
+++ b/example/run.sh
@@ -41,3 +41,6 @@ q "Finally, we import example data to service"
 sh ./import_data.sh $SERVICE
 [ $? -ne 0 ] && { exit -1; }
 
+#q "Run ML Model into S2Graph by importing Model."
+#sh ./import_model.sh $SERVICE
+#[ $? -ne 0 ] && { exit -1; }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
index 6abbe86..dc32bc5 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.s2jobs
 
 import play.api.libs.json.{JsValue, Json}
 import org.apache.s2graph.s2jobs.task._
+import org.apache.s2graph.s2jobs.task.custom.process.AnnoyIndexBuildSink
 
 case class JobDescription(
                            name:String,
@@ -65,7 +66,7 @@ object JobDescription extends Logger {
             logger.debug(s"custom class init.. $customClass")
 
             Class.forName(customClass)
-              .getConstructor(TaskConf.getClass)
+              .getConstructor(classOf[TaskConf])
               .newInstance(conf)
               .asInstanceOf[task.Process]
 
@@ -82,6 +83,7 @@ object JobDescription extends Logger {
       case "file" => new FileSink(jobName, conf)
       case "es" => new ESSink(jobName, conf)
       case "s2graph" => new S2GraphSink(jobName, conf)
+      case "annoy" => new AnnoyIndexBuildSink(jobName, conf)
       case _ => throw new IllegalArgumentException(s"unsupported sink type : ${conf.`type`}")
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala
deleted file mode 100644
index 968bf5a..0000000
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-package org.apache.s2graph.s2jobs.task.custom.process
-
-import java.io.File
-
-import annoy4s.{Angular, Annoy}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
-import org.apache.s2graph.s2jobs.task.TaskConf
-import org.apache.spark.ml.recommendation.{ALS, ALSModel}
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
-
-object ALSAnnoyBuildProcess {
-
-  def buildAnnoyIndex(ss: SparkSession,
-                      conf: TaskConf,
-                      dataFrame: DataFrame): Unit = {
-    // annoy tree params.
-    val outputPath = conf.options("outputPath")
-    val localInputPath = conf.options("localInputPath")
-    val localIndexPath = conf.options("localIndexPath")
-    val numDimensions = conf.options.getOrElse("dimensions", "10").toInt
-
-    // als model params.
-    val rank = conf.options.getOrElse("rank", numDimensions.toString).toInt
-    val maxIter = conf.options.getOrElse("maxIter", "5").toInt
-    val regParam = conf.options.getOrElse("regParam", "0.01").toDouble
-    val userCol = conf.options.getOrElse("userCol", "userId")
-    val itemCol = conf.options.getOrElse("itemCol", "movieId")
-    val ratingCol = conf.options.getOrElse("ratingCol", "rating")
-
-    assert(rank == numDimensions)
-
-    val als = new ALS()
-      .setRank(rank)
-      .setMaxIter(maxIter)
-      .setRegParam(regParam)
-      .setUserCol(userCol)
-      .setItemCol(itemCol)
-      .setRatingCol(ratingCol)
-
-    val model = als.fit(dataFrame)
-
-    saveFeatures(ss, model.itemFactors, outputPath)
-    copyToLocal(ss.sparkContext.hadoopConfiguration, outputPath, localInputPath)
-
-    FileUtil.fullyDelete(new File(localIndexPath))
-
-    Annoy.create[Int](s"${localInputPath}", numDimensions, outputDir = s"$localIndexPath", Angular)
-  }
-
-  def saveFeatures(ss: SparkSession,
-                   dataFrame: DataFrame,
-                   outputPath: String,
-                   idCol: String = "id",
-                   featuresCol: String = "features"): Unit = {
-    import ss.sqlContext.implicits._
-
-    val result = dataFrame.map { row =>
-      val id = row.getAs[Int](idCol)
-      val vector = row.getAs[Seq[Float]](featuresCol)
-      (Seq(id) ++ vector).mkString(" ")
-    }
-
-    result.write.mode(SaveMode.Overwrite).csv(outputPath)
-  }
-
-  def copyToLocal(configuration: Configuration,
-                  remoteInputPath: String,
-                  localOutputPath: String,
-                  merge: Boolean = true): Unit  = {
-    val fs = FileSystem.get(configuration)
-    val localFs = FileSystem.getLocal(configuration)
-    localFs.deleteOnExit(new Path(localOutputPath))
-
-    if (merge)
-      FileUtil.copyMerge(fs, new Path(remoteInputPath), localFs, new Path(localOutputPath), false, configuration, "")
-    else
-      fs.copyToLocalFile(new Path(remoteInputPath), new Path(localOutputPath))
-  }
-}
-class ALSAnnoyBuildProcess(conf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(conf) {
-  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = ???
-
-  override def mandatoryOptions: Set[String] = Set("outputPath")
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
new file mode 100644
index 0000000..26ef6ad
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
@@ -0,0 +1,110 @@
+package org.apache.s2graph.s2jobs.task.custom.process
+
+import java.io.File
+
+import annoy4s.{Angular, Annoy}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
+import org.apache.s2graph.s2jobs.task.{Sink, TaskConf}
+import org.apache.spark.ml.recommendation.{ALS, ALSModel}
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+object ALSModelProcess {
+
+  def runALS(ss: SparkSession,
+                      conf: TaskConf,
+                      dataFrame: DataFrame): DataFrame = {
+    // als model params.
+    val rank = conf.options.getOrElse("rank", "10").toInt
+    val maxIter = conf.options.getOrElse("maxIter", "5").toInt
+    val regParam = conf.options.getOrElse("regParam", "0.01").toDouble
+    val userCol = conf.options.getOrElse("userCol", "userId")
+    val itemCol = conf.options.getOrElse("itemCol", "movieId")
+    val ratingCol = conf.options.getOrElse("ratingCol", "rating")
+
+//    assert(rank == numDimensions)
+
+    val als = new ALS()
+      .setRank(rank)
+      .setMaxIter(maxIter)
+      .setRegParam(regParam)
+      .setUserCol(userCol)
+      .setItemCol(itemCol)
+      .setRatingCol(ratingCol)
+
+    val model = als.fit(dataFrame)
+
+    model.itemFactors
+  }
+
+  def buildAnnoyIndex(conf: TaskConf,
+                      dataFrame: DataFrame): Unit = {
+    // annoy tree params.
+    val itemFactorsPath = conf.options("itemFactors")
+    val tempPath = conf.options.getOrElse("tempPath", "/tmp")
+
+    val tempInputPath = tempPath + "/_tmp"
+
+    val annoyResultPath = conf.options("path")
+    val numDimensions = conf.options.getOrElse("dimensions", "10").toInt
+
+    saveFeatures(dataFrame, itemFactorsPath)
+    copyToLocal(dataFrame.sparkSession.sparkContext.hadoopConfiguration, itemFactorsPath, tempInputPath)
+
+    FileUtil.fullyDelete(new File(annoyResultPath))
+
+    Annoy.create[Int](s"${tempInputPath}", numDimensions, outputDir = s"$annoyResultPath", Angular)
+  }
+
+  def saveFeatures(dataFrame: DataFrame,
+                   outputPath: String,
+                   idCol: String = "id",
+                   featuresCol: String = "features"): Unit = {
+
+    import dataFrame.sparkSession.implicits._
+
+    val result = dataFrame.map { row =>
+      val id = row.getAs[Int](idCol)
+      val vector = row.getAs[Seq[Float]](featuresCol)
+      (Seq(id) ++ vector).mkString(" ")
+    }
+
+    result.write.mode(SaveMode.Overwrite).csv(outputPath)
+  }
+
+  def copyToLocal(configuration: Configuration,
+                  remoteInputPath: String,
+                  localOutputPath: String,
+                  merge: Boolean = true): Unit  = {
+    val fs = FileSystem.get(configuration)
+    val localFs = FileSystem.getLocal(configuration)
+    localFs.deleteOnExit(new Path(localOutputPath))
+
+    if (merge)
+      FileUtil.copyMerge(fs, new Path(remoteInputPath), localFs, new Path(localOutputPath), false, configuration, "")
+    else
+      fs.copyToLocalFile(new Path(remoteInputPath), new Path(localOutputPath))
+  }
+}
+
+class ALSModelProcess(conf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(conf) {
+  override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = {
+    ALSModelProcess.runALS(ss, conf, inputMap.head._2)
+  }
+  override def mandatoryOptions: Set[String] = Set.empty
+}
+
+class AnnoyIndexBuildSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) {
+  override val FORMAT: String = "parquet"
+
+  override def mandatoryOptions: Set[String] = Set("path", "itemFactors")
+
+  override def write(inputDF: DataFrame): Unit = {
+    val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism)
+
+    if (inputDF.isStreaming) throw new IllegalStateException("AnnoyIndexBuildSink can not be run as streaming.")
+    else {
+      ALSModelProcess.buildAnnoyIndex(conf, inputDF)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala
deleted file mode 100644
index 6279e82..0000000
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.apache.s2graph.s2jobs.task.custom.process
-
-import java.io.File
-
-import com.holdenkarau.spark.testing.DataFrameSuiteBase
-import com.typesafe.config.ConfigFactory
-import org.apache.commons.io.FileUtils
-import org.apache.s2graph.core.Integrate.IntegrateCommon
-import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.{Query, QueryParam}
-import org.apache.s2graph.core.model.{AnnoyModelFetcher, HDFSImporter, ModelManager}
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.s2jobs.task.TaskConf
-
-import scala.concurrent.{Await, ExecutionContext}
-import scala.concurrent.duration.Duration
-import scala.io.Source
-
-class ALSAnnoyBuildProcessTest extends IntegrateCommon with DataFrameSuiteBase {
-  import scala.collection.JavaConverters._
-
-  // this test require adding movie lens rating data(u.data, movie.txt) under resources
-  // so ignore for now until figure out how to automate download dataset.
-  ignore("RUN ALS on movie lens rating data and build annoy index on itemFeatures, finally query.") {
-    import spark.sqlContext.implicits._
-    val ratingPath = this.getClass.getResource("/u.data").toURI.getPath
-
-    val ratings = Source.fromFile(new File(ratingPath)).getLines().toSeq.map { line =>
-      val tokens = line.split("\t")
-      (tokens(0).toInt, tokens(1).toInt, tokens(2).toFloat)
-    }.toDF("userId", "movieId", "rating")
-
-    val outputPath = "/tmp"
-    val localInputPath = "/tmp/annoy_input"
-    val localIndexPath = "/tmp/annoy_result"
-
-    val taskOptions = Map(
-      "outputPath" -> outputPath,
-      "localInputPath" -> localInputPath,
-      "localIndexPath" -> localIndexPath
-    )
-
-    val conf = TaskConf("test", "test", Nil, taskOptions)
-    ALSAnnoyBuildProcess.buildAnnoyIndex(spark, conf, ratings)
-
-    val labelName = "annoy_model_fetcher_test"
-
-    val remoteIndexFilePath = s"${localIndexPath}/annoy-index"
-    val remoteDictFilePath = this.getClass.getResource(s"/movie.dict").toURI.getPath
-
-    val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
-    val serviceColumn =
-      management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
-
-    val options = s"""{
-                     | "importer": {
-                     |   "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter"
-                     | },
-                     | "fetcher": {
-                     |   "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.AnnoyModelFetcher",
-                     |   "${AnnoyModelFetcher.IndexFilePathKey}": "${remoteIndexFilePath}",
-                     |   "${AnnoyModelFetcher.DictFilePathKey}": "${remoteDictFilePath}",
-                     |   "${AnnoyModelFetcher.DimensionKey}": 10
-                     | }
-                     |}""".stripMargin
-
-    Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
-
-    val label = management.createLabel(
-      labelName,
-      serviceColumn,
-      serviceColumn,
-      true,
-      service.serviceName,
-      Seq.empty[Index].asJava,
-      Seq.empty[Prop].asJava,
-      "strong",
-      null,
-      -1,
-      "v3",
-      "gz",
-      options
-    )
-    val config = ConfigFactory.parseString(options)
-    val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
-    Await.result(importerFuture, Duration("3 minutes"))
-
-    Thread.sleep(10000)
-
-    val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "Toy Story (1995)")
-    val queryParam = QueryParam(labelName = labelName, limit = 5)
-
-    val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
-    val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
-
-    stepResult.edgeWithScores.foreach { es =>
-      println(es.edge.tgtVertex.innerIdVal)
-    }
-
-    // clean up temp directory.
-    FileUtils.deleteDirectory(new File(outputPath))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
new file mode 100644
index 0000000..d16ebf0
--- /dev/null
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
@@ -0,0 +1,104 @@
+package org.apache.s2graph.s2jobs.task.custom.process
+
+import java.io.File
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
+import com.typesafe.config.ConfigFactory
+import org.apache.commons.io.FileUtils
+import org.apache.s2graph.core.Integrate.IntegrateCommon
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
+import org.apache.s2graph.core.{Query, QueryParam}
+import org.apache.s2graph.core.model.{AnnoyModelFetcher, HDFSImporter, ModelManager}
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.s2jobs.task.TaskConf
+
+import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.duration.Duration
+import scala.io.Source
+
+class ALSModelProcessTest extends IntegrateCommon with DataFrameSuiteBase {
+  import scala.collection.JavaConverters._
+
+  // this test require adding movie lens rating data(u.data, movie.txt) under resources
+  // so ignore for now until figure out how to automate download dataset.
+//  ignore("RUN ALS on movie lens rating data and build annoy index on itemFeatures, finally query.") {
+//    import spark.sqlContext.implicits._
+//    val ratingPath = this.getClass.getResource("/u.data").toURI.getPath
+//
+//    val ratings = Source.fromFile(new File(ratingPath)).getLines().toSeq.map { line =>
+//      val tokens = line.split("\t")
+//      (tokens(0).toInt, tokens(1).toInt, tokens(2).toFloat)
+//    }.toDF("userId", "movieId", "rating")
+//
+//    val outputPath = "/tmp"
+//    val localInputPath = "/tmp/annoy_input"
+//    val localIndexPath = "/tmp/annoy_result"
+//
+//    val taskOptions = Map(
+//      "outputPath" -> outputPath,
+//      "localInputPath" -> localInputPath,
+//      "localIndexPath" -> localIndexPath
+//    )
+//
+//    val conf = TaskConf("test", "test", Nil, taskOptions)
+//    ALSModelProcess.buildAnnoyIndex(spark, conf, ratings)
+//
+//    val labelName = "annoy_model_fetcher_test"
+//
+//    val remoteIndexFilePath = s"${localIndexPath}/annoy-index"
+//    val remoteDictFilePath = this.getClass.getResource(s"/movie.dict").toURI.getPath
+//
+//    val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
+//    val serviceColumn =
+//      management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
+//
+//    val options = s"""{
+//                     | "importer": {
+//                     |   "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter"
+//                     | },
+//                     | "fetcher": {
+//                     |   "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.AnnoyModelFetcher",
+//                     |   "${AnnoyModelFetcher.IndexFilePathKey}": "${remoteIndexFilePath}",
+//                     |   "${AnnoyModelFetcher.DictFilePathKey}": "${remoteDictFilePath}",
+//                     |   "${AnnoyModelFetcher.DimensionKey}": 10
+//                     | }
+//                     |}""".stripMargin
+//
+//    Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
+//
+//    val label = management.createLabel(
+//      labelName,
+//      serviceColumn,
+//      serviceColumn,
+//      true,
+//      service.serviceName,
+//      Seq.empty[Index].asJava,
+//      Seq.empty[Prop].asJava,
+//      "strong",
+//      null,
+//      -1,
+//      "v3",
+//      "gz",
+//      options
+//    )
+//    val config = ConfigFactory.parseString(options)
+//    val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
+//    Await.result(importerFuture, Duration("3 minutes"))
+//
+//    Thread.sleep(10000)
+//
+//    val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "Toy Story (1995)")
+//    val queryParam = QueryParam(labelName = labelName, limit = 5)
+//
+//    val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
+//    val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
+//
+//    stepResult.edgeWithScores.foreach { es =>
+//      println(es.edge.tgtVertex.innerIdVal)
+//    }
+//
+//    // clean up temp directory.
+//    FileUtils.deleteDirectory(new File(outputPath))
+//  }
+
+}