You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/14 12:30:01 UTC

[17/25] incubator-s2graph git commit: Merge branch 'master' into S2GRAPH-206

Merge branch 'master' into S2GRAPH-206


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/5ee1906f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/5ee1906f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/5ee1906f

Branch: refs/heads/master
Commit: 5ee1906fcfa127d8975e365b15ee07a57082fabf
Parents: 88eb052 16feda8
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu May 10 21:24:54 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri May 11 14:34:12 2018 +0900

----------------------------------------------------------------------
 dev_support/graph_mysql/schema.sql              |      1 +
 example/run.sh                                  |     14 +-
 .../org/apache/s2graph/core/schema/schema.sql   |      1 +
 .../org/apache/s2graph/core/EdgeFetcher.scala   |     38 +
 .../org/apache/s2graph/core/EdgeMutator.scala   |     45 +
 .../scala/org/apache/s2graph/core/Fetcher.scala |     17 -
 .../org/apache/s2graph/core/GraphUtil.scala     |      7 +
 .../org/apache/s2graph/core/Management.scala    |     71 +-
 .../scala/org/apache/s2graph/core/Mutator.scala |     22 -
 .../apache/s2graph/core/ResourceManager.scala   |    134 +
 .../scala/org/apache/s2graph/core/S2Graph.scala |    105 +-
 .../apache/s2graph/core/S2GraphFactory.scala    |     11 +-
 .../org/apache/s2graph/core/S2GraphLike.scala   |     49 +-
 .../apache/s2graph/core/TraversalHelper.scala   |      6 +-
 .../org/apache/s2graph/core/VertexFetcher.scala |     35 +
 .../org/apache/s2graph/core/VertexMutator.scala |     33 +
 .../core/fetcher/MemoryModelEdgeFetcher.scala   |     62 +
 .../core/fetcher/annoy/AnnoyModelFetcher.scala  |    115 +
 .../core/fetcher/fasttext/CopyModel.scala       |    121 +
 .../core/fetcher/fasttext/FastText.scala        |    194 +
 .../core/fetcher/fasttext/FastTextArgs.scala    |    116 +
 .../core/fetcher/fasttext/FastTextFetcher.scala |     56 +
 .../apache/s2graph/core/io/Conversions.scala    |      6 +-
 .../s2graph/core/model/ImportStatus.scala       |     40 -
 .../apache/s2graph/core/model/Importer.scala    |    103 -
 .../s2graph/core/model/MemoryModelFetcher.scala |     41 -
 .../s2graph/core/model/ModelManager.scala       |     85 -
 .../core/model/annoy/AnnoyModelFetcher.scala    |    115 -
 .../s2graph/core/model/fasttext/CopyModel.scala |    122 -
 .../s2graph/core/model/fasttext/FastText.scala  |    195 -
 .../core/model/fasttext/FastTextArgs.scala      |    119 -
 .../core/model/fasttext/FastTextFetcher.scala   |     64 -
 .../s2graph/core/rest/RequestParser.scala       |      8 +-
 .../org/apache/s2graph/core/schema/Label.scala  |     40 +-
 .../s2graph/core/schema/ServiceColumn.scala     |     47 +-
 .../storage/DefaultOptimisticEdgeMutator.scala  |    176 +
 .../core/storage/DefaultOptimisticMutator.scala |    171 -
 .../DefaultOptimisticVertexMutator.scala        |     44 +
 .../core/storage/OptimisticEdgeFetcher.scala    |     56 +
 .../core/storage/OptimisticMutator.scala        |     23 +-
 .../apache/s2graph/core/storage/Storage.scala   |     68 +-
 .../s2graph/core/storage/StorageReadable.scala  |     97 -
 .../storage/WriteWriteConflictResolver.scala    |      6 +-
 .../storage/hbase/AsynchbaseEdgeFetcher.scala   |    147 +
 .../hbase/AsynchbaseOptimisticEdgeFetcher.scala |     35 +
 .../hbase/AsynchbaseOptimisticMutator.scala     |    142 +
 .../core/storage/hbase/AsynchbaseStorage.scala  |    185 +-
 .../hbase/AsynchbaseStorageReadable.scala       |    367 -
 .../hbase/AsynchbaseStorageWritable.scala       |    142 -
 .../storage/hbase/AsynchbaseVertexFetcher.scala |     87 +
 .../core/storage/rocks/RocksEdgeFetcher.scala   |     91 +
 .../rocks/RocksOptimisticEdgeFetcher.scala      |     41 +
 .../storage/rocks/RocksOptimisticMutator.scala  |    133 +
 .../core/storage/rocks/RocksStorage.scala       |    101 +-
 .../storage/rocks/RocksStorageReadable.scala    |    234 -
 .../storage/rocks/RocksStorageWritable.scala    |    133 -
 .../core/storage/rocks/RocksVertexFetcher.scala |    114 +
 .../s2graph/core/utils/ImportStatus.scala       |     59 +
 .../apache/s2graph/core/utils/Importer.scala    |    122 +
 .../s2graph/core/utils/SafeUpdateCache.scala    |     11 +-
 s2core/src/test/resources/item_factors.ann      |    Bin 137800 -> 0 bytes
 s2core/src/test/resources/movie.dict            |   1682 -
 s2core/src/test/resources/test-index.dict       | 100000 ----------------
 s2core/src/test/resources/test-index.tree       |    Bin 18824 -> 0 bytes
 s2core/src/test/resources/user_factors.ann      |    Bin 76804 -> 0 bytes
 .../s2graph/core/TestCommonWithModels.scala     |     14 +-
 .../s2graph/core/fetcher/EdgeFetcherTest.scala  |     89 +
 .../apache/s2graph/core/model/FetcherTest.scala |    145 -
 .../s2graph/core/model/HDFSImporterTest.scala   |     96 -
 .../model/fasttext/FastTextFetcherTest.scala    |     78 -
 .../core/tinkerpop/S2GraphProvider.scala        |     73 +-
 .../core/tinkerpop/structure/S2GraphTest.scala  |      6 +-
 .../apache/s2graph/graphql/GraphQLServer.scala  |      8 +-
 .../org/apache/s2graph/graphql/HttpServer.scala |      4 +-
 .../graphql/repository/GraphRepository.scala    |      2 +-
 .../apache/s2graph/s2jobs/BaseSparkTest.scala   |     13 +-
 .../custom/process/ALSModelProcessTest.scala    |    200 +-
 77 files changed, 2982 insertions(+), 104451 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/example/run.sh
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 4b2274a,004b6e8..651323f
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@@ -161,6 -159,30 +159,35 @@@ object S2Graph 
      }
      ConfigFactory.parseMap(kvs)
    }
+ 
+   def initMutators(graph: S2GraphLike): Unit = {
+     val management = graph.management
+ 
+     ServiceColumn.findAll().foreach { column =>
+       management.updateVertexMutator(column, column.options)
+     }
+ 
+     Label.findAll().foreach { label =>
+       management.updateEdgeMutator(label, label.options)
+     }
+   }
+ 
+   def initFetchers(graph: S2GraphLike): Unit = {
+     val management = graph.management
+ 
+     ServiceColumn.findAll().foreach { column =>
+       management.updateVertexFetcher(column, column.options)
+     }
+ 
+     Label.findAll().foreach { label =>
+       management.updateEdgeFetcher(label, label.options)
+     }
+   }
++
++  def loadFetchersAndMutators(graph: S2GraphLike): Unit = {
++    initFetchers(graph)
++    initMutators(graph)
++  }
  }
  
  class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2GraphLike {
@@@ -250,22 -275,43 +277,42 @@@
      storagePool.getOrElse(s"label:${label.label}", defaultStorage)
    }
  
-   //TODO:
-   override def getFetcher(column: ServiceColumn): Fetcher = {
-     getStorage(column.service).reader
 -
+   /* Currently, each getter on Fetcher and Mutator missing proper implementation
+   *  Please discuss what is proper way to maintain resources here and provide
+   *  right implementation(S2GRAPH-213).
+   * */
+   override def getVertexFetcher(column: ServiceColumn): VertexFetcher = {
+     resourceManager.getOrElseUpdateVertexFetcher(column)
+       .getOrElse(defaultStorage.vertexFetcher)
    }
  
-   override def getFetcher(label: Label): Fetcher = {
-     if (label.fetchConfigExist) modelManager.getFetcher(label)
-     else getStorage(label).reader
+   override def getEdgeFetcher(label: Label): EdgeFetcher = {
+     resourceManager.getOrElseUpdateEdgeFetcher(label)
+       .getOrElse(defaultStorage.edgeFetcher)
    }
  
-   override def getMutator(column: ServiceColumn): Mutator = {
-     getStorage(column.service).mutator
+   override def getAllVertexFetchers(): Seq[VertexFetcher] = {
+     resourceManager.getAllVertexFetchers()
    }
  
-   override def getMutator(label: Label): Mutator = {
-     getStorage(label).mutator
+   override def getAllEdgeFetchers(): Seq[EdgeFetcher] = {
+     resourceManager.getAllEdgeFetchers()
+   }
+ 
+   override def getVertexMutator(column: ServiceColumn): VertexMutator = {
+     resourceManager.getOrElseUpdateVertexMutator(column)
+       .getOrElse(defaultStorage.vertexMutator)
+   }
+ 
+   override def getEdgeMutator(label: Label): EdgeMutator = {
+     resourceManager.getOrElseUpdateEdgeMutator(label)
+       .getOrElse(defaultStorage.edgeMutator)
+   }
+ 
+   //TODO:
+   override def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher = {
+ //    getStorage(label).optimisticEdgeFetcher
+     null
    }
  
    //TODO:

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
index 0000000,0000000..f46df13
new file mode 100644
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
@@@ -1,0 -1,0 +1,115 @@@
++package org.apache.s2graph.core.fetcher.annoy
++
++import annoy4s.Converters.KeyConverter
++import annoy4s._
++import com.typesafe.config.Config
++import org.apache.s2graph.core._
++import org.apache.s2graph.core.types.VertexId
++
++import scala.concurrent.{ExecutionContext, Future}
++
++object AnnoyModelFetcher {
++  val IndexFilePathKey = "annoyIndexFilePath"
++//  val DictFilePathKey = "annoyDictFilePath"
++  val DimensionKey = "annoyIndexDimension"
++  val IndexTypeKey = "annoyIndexType"
++
++  //  def loadDictFromLocal(file: File): Map[Int, String] = {
++  //    val files = if (file.isDirectory) {
++  //      file.listFiles()
++  //    } else {
++  //      Array(file)
++  //    }
++  //
++  //    files.flatMap { file =>
++  //      Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, _idx) =>
++  //        val tokens = line.stripMargin.split(",")
++  //        try {
++  //          val tpl = if (tokens.length < 2) {
++  //            (tokens.head.toInt, tokens.head)
++  //          } else {
++  //            (tokens.head.toInt, tokens.tail.head)
++  //          }
++  //          Seq(tpl)
++  //        } catch {
++  //          case e: Exception => Nil
++  //        }
++  //      }
++  //    }.toMap
++  //  }
++
++  def buildAnnoy4s[T](indexPath: String)(implicit converter: KeyConverter[T]): Annoy[T] = {
++    Annoy.load[T](indexPath)
++  }
++
++  //  def buildIndex(indexPath: String,
++  //                 dictPath: String,
++  //                 dimension: Int,
++  //                 indexType: IndexType): ANNIndexWithDict = {
++  //    val dict = loadDictFromLocal(new File(dictPath))
++  //    val index = new ANNIndex(dimension, indexPath, indexType)
++  //
++  //    ANNIndexWithDict(index, dict)
++  //  }
++  //
++  //  def buildIndex(config: Config): ANNIndexWithDict = {
++  //    val indexPath = config.getString(IndexFilePathKey)
++  //    val dictPath = config.getString(DictFilePathKey)
++  //
++  //    val dimension = config.getInt(DimensionKey)
++  //    val indexType = Try { config.getString(IndexTypeKey) }.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR)
++  //
++  //    buildIndex(indexPath, dictPath, dimension, indexType)
++  //  }
++}
++
++//
++//case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) {
++//  val dictRev = dict.map(kv => kv._2 -> kv._1)
++//}
++
++class AnnoyModelFetcher(val graph: S2GraphLike) extends EdgeFetcher {
++  import AnnoyModelFetcher._
++
++  val builder = graph.elementBuilder
++
++  //  var model: ANNIndexWithDict = _
++  var model: Annoy[String] = _
++
++  override def init(config: Config)(implicit ec: ExecutionContext): Unit = {
++
++    model = AnnoyModelFetcher.buildAnnoy4s(config.getString(IndexFilePathKey))
++  }
++
++  /** Fetch **/
++  override def fetches(queryRequests: Seq[QueryRequest],
++                       prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
++    val stepResultLs = queryRequests.map { queryRequest =>
++      val vertex = queryRequest.vertex
++      val queryParam = queryRequest.queryParam
++
++      val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).map { case (tgtId, score) =>
++        val tgtVertexId = builder.newVertexId(queryParam.label.service,
++          queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), tgtId)
++
++        val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
++        val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
++
++        EdgeWithScore(edge, score, queryParam.label)
++      }
++
++      StepResult(edgeWithScores, Nil, Nil)
++    }
++
++    Future.successful(stepResultLs)
++  }
++
++  override def close(): Unit = {
++    // do clean up
++    model.close
++  }
++
++  // not supported yet.
++  override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] =
++    Future.successful(Nil)
++}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/CopyModel.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/CopyModel.scala
index 0000000,0000000..d23b390
new file mode 100644
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/CopyModel.scala
@@@ -1,0 -1,0 +1,121 @@@
++package org.apache.s2graph.core.fetcher.fasttext
++
++import java.io.{BufferedInputStream, FileInputStream, InputStream}
++import java.nio.{ByteBuffer, ByteOrder}
++import java.util
++
++import org.apache.s2graph.core.fetcher.fasttext
++import org.rocksdb._
++
++import scala.collection.JavaConverters._
++import scala.collection.mutable.ArrayBuffer
++
++object CopyModel {
++
++  def writeArgs(db: RocksDB, handle: ColumnFamilyHandle, args: fasttext.FastTextArgs): Unit = {
++    val wo = new WriteOptions().setDisableWAL(true).setSync(false)
++    db.put(handle, wo, "args".getBytes("UTF-8"), args.serialize)
++    wo.close()
++    println("done                                                      ")
++  }
++
++  def writeVocab(is: InputStream, db: RocksDB,
++                 vocabHandle: ColumnFamilyHandle, labelHandle: ColumnFamilyHandle, args: fasttext.FastTextArgs): Unit = {
++    val wo = new WriteOptions().setDisableWAL(true).setSync(false)
++    val bb = ByteBuffer.allocate(13).order(ByteOrder.LITTLE_ENDIAN)
++    val wb = new ArrayBuffer[Byte]
++    for (wid <- 0 until args.size) {
++      bb.clear()
++      wb.clear()
++      var b = is.read()
++      while (b != 0) {
++        wb += b.toByte
++        b = is.read()
++      }
++      bb.putInt(wid)
++      is.read(bb.array(), 4, 9)
++      db.put(vocabHandle, wo, wb.toArray, bb.array())
++
++      if (bb.get(12) == 1) {
++        val label = wid - args.nwords
++        db.put(labelHandle, ByteBuffer.allocate(4).putInt(label).array(), wb.toArray)
++      }
++
++      if ((wid + 1) % 1000 == 0)
++        print(f"\rprocessing ${100 * (wid + 1) / args.size.toFloat}%.2f%%")
++    }
++    println("\rdone                                                      ")
++    wo.close()
++  }
++
++  def writeVectors(is: InputStream, db: RocksDB, handle: ColumnFamilyHandle, args: fasttext.FastTextArgs): Unit = {
++    require(is.read() == 0, "not implemented")
++    val wo = new WriteOptions().setDisableWAL(true).setSync(false)
++    val bb = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN)
++    val key = ByteBuffer.allocate(8)
++    val value = new Array[Byte](args.dim * 4)
++    is.read(bb.array())
++    val m = bb.getLong
++    val n = bb.getLong
++    require(n * 4 == value.length)
++    var i = 0L
++    while (i < m) {
++      key.clear()
++      key.putLong(i)
++      is.read(value)
++      db.put(handle, wo, key.array(), value)
++      if ((i + 1) % 1000 == 0)
++        print(f"\rprocessing ${100 * (i + 1) / m.toFloat}%.2f%%")
++      i += 1
++    }
++    println("\rdone                                                      ")
++    wo.close()
++  }
++
++  def printHelp(): Unit = {
++    println("usage: CopyModel <in> <out>")
++  }
++
++  def copy(in: String, out: String): Unit = {
++    RocksDB.destroyDB(out, new Options)
++
++    val dbOptions = new DBOptions()
++      .setCreateIfMissing(true)
++      .setCreateMissingColumnFamilies(true)
++      .setAllowMmapReads(false)
++      .setMaxOpenFiles(500000)
++      .setDbWriteBufferSize(134217728)
++      .setMaxBackgroundCompactions(20)
++
++    val descriptors = new java.util.LinkedList[ColumnFamilyDescriptor]()
++    descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY))
++    descriptors.add(new ColumnFamilyDescriptor("vocab".getBytes()))
++    descriptors.add(new ColumnFamilyDescriptor("i".getBytes()))
++    descriptors.add(new ColumnFamilyDescriptor("o".getBytes()))
++    val handles = new util.LinkedList[ColumnFamilyHandle]()
++    val db = RocksDB.open(dbOptions, out, descriptors, handles)
++
++    val is = new BufferedInputStream(new FileInputStream(in))
++    val fastTextArgs = FastTextArgs.fromInputStream(is)
++
++    require(fastTextArgs.magic == FastText.FASTTEXT_FILEFORMAT_MAGIC_INT32)
++    require(fastTextArgs.version == FastText.FASTTEXT_VERSION)
++
++    println("step 1: writing args")
++    writeArgs(db, handles.get(0), fastTextArgs)
++    println("step 2: writing vocab")
++    writeVocab(is, db, handles.get(1), handles.get(0), fastTextArgs)
++    println("step 3: writing input vectors")
++    writeVectors(is, db, handles.get(2), fastTextArgs)
++    println("step 4: writing output vectors")
++    writeVectors(is, db, handles.get(3), fastTextArgs)
++    println("step 5: compactRange")
++    db.compactRange()
++    println("done")
++
++    handles.asScala.foreach(_.close())
++    db.close()
++    is.close()
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastText.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastText.scala
index 0000000,0000000..c465bd8
new file mode 100644
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastText.scala
@@@ -1,0 -1,0 +1,194 @@@
++package org.apache.s2graph.core.fetcher.fasttext
++
++import java.nio.{ByteBuffer, ByteOrder}
++import java.util
++
++import org.rocksdb.{ColumnFamilyDescriptor, ColumnFamilyHandle, DBOptions, RocksDB}
++
++import scala.collection.JavaConverters._
++import scala.collection.mutable.ArrayBuffer
++
++case class Line(labels: Array[Int], words: Array[Long])
++
++case class Entry(wid: Int, count: Long, tpe: Byte, subwords: Array[Long])
++
++object FastText {
++  val EOS = "</s>"
++  val BOW = "<"
++  val EOW = ">"
++
++  val FASTTEXT_VERSION = 12 // Version 1b
++  val FASTTEXT_FILEFORMAT_MAGIC_INT32 = 793712314
++
++  val MODEL_CBOW = 1
++  val MODEL_SG = 2
++  val MODEL_SUP = 3
++
++  val LOSS_HS = 1
++  val LOSS_NS = 2
++  val LOSS_SOFTMAX = 3
++
++  val DBPathKey = "dbPath"
++
++  def tokenize(in: String): Array[String] = in.split("\\s+") ++ Array("</s>")
++
++  def getSubwords(word: String, minn: Int, maxn: Int): Array[String] = {
++    val l = math.max(minn, 1)
++    val u = math.min(maxn, word.length)
++    val r = l to u flatMap word.sliding
++    r.filterNot(s => s == BOW || s == EOW).toArray
++  }
++
++  def hash(str: String): Long = {
++    var h = 2166136261L.toInt
++    for (b <- str.getBytes) {
++      h = (h ^ b) * 16777619
++    }
++    h & 0xffffffffL
++  }
++
++}
++
++class FastText(name: String) extends AutoCloseable {
++
++  import FastText._
++
++  private val dbOptions = new DBOptions()
++  private val descriptors = new java.util.LinkedList[ColumnFamilyDescriptor]()
++  descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY))
++  descriptors.add(new ColumnFamilyDescriptor("vocab".getBytes()))
++  descriptors.add(new ColumnFamilyDescriptor("i".getBytes()))
++  descriptors.add(new ColumnFamilyDescriptor("o".getBytes()))
++  private val handles = new util.LinkedList[ColumnFamilyHandle]()
++  private val db = RocksDB.openReadOnly(dbOptions, name, descriptors, handles)
++
++  private val defaultHandle = handles.get(0)
++  private val vocabHandle = handles.get(1)
++  private val inputVectorHandle = handles.get(2)
++  private val outputVectorHandle = handles.get(3)
++
++  private val args = FastTextArgs.fromByteArray(db.get(defaultHandle, "args".getBytes("UTF-8")))
++  private val wo = loadOutputVectors()
++  private val labels = loadLabels()
++
++  println(args)
++
++  require(args.magic == FASTTEXT_FILEFORMAT_MAGIC_INT32)
++  require(args.version == FASTTEXT_VERSION)
++
++  // only sup/softmax supported
++  // others are the future work.
++  require(args.model == MODEL_SUP)
++  require(args.loss == LOSS_SOFTMAX)
++
++  private def getVector(handle: ColumnFamilyHandle, key: Long): Array[Float] = {
++    val keyBytes = ByteBuffer.allocate(8).putLong(key).array()
++    val bb = ByteBuffer.wrap(db.get(handle, keyBytes)).order(ByteOrder.LITTLE_ENDIAN)
++    Array.fill(args.dim)(bb.getFloat)
++  }
++
++  private def loadOutputVectors(): Array[Array[Float]] =
++    Array.tabulate(args.nlabels)(key => getVector(outputVectorHandle, key.toLong))
++
++  private def loadLabels(): Array[String] = {
++    val result = new Array[String](args.nlabels)
++    val it = db.newIterator(defaultHandle)
++    var i = 0
++    it.seekToFirst()
++    while (it.isValid) {
++      val key = ByteBuffer.wrap(it.key()).getInt()
++      if (key < args.nlabels) {
++        require(i == key)
++        result(i) = new String(it.value(), "UTF-8")
++        i += 1
++      }
++      it.next()
++    }
++    result
++  }
++
++  def getInputVector(key: Long): Array[Float] = getVector(inputVectorHandle, key)
++
++  def getOutputVector(key: Long): Array[Float] = getVector(outputVectorHandle, key)
++
++  def getEntry(word: String): Entry = {
++    val raw = db.get(vocabHandle, word.getBytes("UTF-8"))
++    if (raw == null) {
++      Entry(-1, 0L, 1, Array.emptyLongArray)
++    } else {
++      val bb = ByteBuffer.wrap(raw).order(ByteOrder.LITTLE_ENDIAN)
++      val wid = bb.getInt
++      val count = bb.getLong
++      val tpe = bb.get
++      val subwords = if (word != EOS && tpe == 0) Array(wid.toLong) ++ computeSubwords(BOW + word + EOW) else Array(wid.toLong)
++      Entry(wid, count, tpe, subwords)
++    }
++  }
++
++  def computeSubwords(word: String): Array[Long] =
++    getSubwords(word, args.minn, args.maxn).map { w => args.nwords + (hash(w) % args.bucket.toLong) }
++
++  def getLine(in: String): Line = {
++    val tokens = tokenize(in)
++    val words = new ArrayBuffer[Long]()
++    val labels = new ArrayBuffer[Int]()
++    tokens foreach { token =>
++      val Entry(wid, count, tpe, subwords) = getEntry(token)
++      if (tpe == 0) {
++        // addSubwords
++        if (wid < 0) { // OOV
++          if (token != EOS) {
++            words ++= computeSubwords(BOW + token + EOW)
++          }
++        } else {
++          words ++= subwords
++        }
++      } else if (tpe == 1 && wid > 0) {
++        labels += wid - args.nwords
++      }
++    }
++    Line(labels.toArray, words.toArray)
++  }
++
++  def computeHidden(input: Array[Long]): Array[Float] = {
++    val hidden = new Array[Float](args.dim)
++    for (row <- input.map(getInputVector)) {
++      var i = 0
++      while (i < hidden.length) {
++        hidden(i) += row(i) / input.length
++        i += 1
++      }
++    }
++    hidden
++  }
++
++  def predict(line: Line, k: Int = 1): Array[(String, Float)] = {
++    val hidden = computeHidden(line.words)
++    val output = wo.map { o =>
++      o.zip(hidden).map(a => a._1 * a._2).sum
++    }
++    val max = output.max
++    var i = 0
++    var z = 0.0f
++    while (i < output.length) {
++      output(i) = math.exp((output(i) - max).toDouble).toFloat
++      z += output(i)
++      i += 1
++    }
++    i = 0
++    while (i < output.length) {
++      output(i) /= z
++      i += 1
++    }
++    output.zipWithIndex.sortBy(-_._1).take(k).map { case (prob, i) =>
++      labels(i) -> prob
++    }
++  }
++
++  def close(): Unit = {
++    handles.asScala.foreach(_.close())
++    dbOptions.close()
++    db.close()
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextArgs.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextArgs.scala
index 0000000,0000000..0ad0b15
new file mode 100644
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextArgs.scala
@@@ -1,0 -1,0 +1,116 @@@
++package org.apache.s2graph.core.fetcher.fasttext
++
++import java.io.{ByteArrayInputStream, FileInputStream, InputStream}
++import java.nio.{ByteBuffer, ByteOrder}
++
++case class FastTextArgs(
++                         magic: Int,
++                         version: Int,
++                         dim: Int,
++                         ws: Int,
++                         epoch: Int,
++                         minCount: Int,
++                         neg: Int,
++                         wordNgrams: Int,
++                         loss: Int,
++                         model: Int,
++                         bucket: Int,
++                         minn: Int,
++                         maxn: Int,
++                         lrUpdateRate: Int,
++                         t: Double,
++                         size: Int,
++                         nwords: Int,
++                         nlabels: Int,
++                         ntokens: Long,
++                         pruneidxSize: Long) {
++
++  def serialize: Array[Byte] = {
++    val bb = ByteBuffer.allocate(92).order(ByteOrder.LITTLE_ENDIAN)
++    bb.putInt(magic)
++    bb.putInt(version)
++    bb.putInt(dim)
++    bb.putInt(ws)
++    bb.putInt(epoch)
++    bb.putInt(minCount)
++    bb.putInt(neg)
++    bb.putInt(wordNgrams)
++    bb.putInt(loss)
++    bb.putInt(model)
++    bb.putInt(bucket)
++    bb.putInt(minn)
++    bb.putInt(maxn)
++    bb.putInt(lrUpdateRate)
++    bb.putDouble(t)
++    bb.putInt(size)
++    bb.putInt(nwords)
++    bb.putInt(nlabels)
++    bb.putLong(ntokens)
++    bb.putLong(pruneidxSize)
++    bb.array()
++  }
++
++  override def toString: String = {
++    s"""magic:        $magic
++       |version:      $version
++       |dim:          $dim
++       |ws :          $ws
++       |epoch:        $epoch
++       |minCount:     $minCount
++       |neg:          $neg
++       |wordNgrams:   $wordNgrams
++       |loss:         $loss
++       |model:        $model
++       |bucket:       $bucket
++       |minn:         $minn
++       |maxn:         $maxn
++       |lrUpdateRate: $lrUpdateRate
++       |t:            $t
++       |size:         $size
++       |nwords:       $nwords
++       |nlabels:      $nlabels
++       |ntokens:      $ntokens
++       |pruneIdxSize: $pruneidxSize
++       |""".stripMargin
++  }
++
++}
++
++object FastTextArgs {
++
++  private def getInt(implicit inputStream: InputStream, buffer: Array[Byte]): Int = {
++    inputStream.read(buffer, 0, 4)
++    ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getInt
++  }
++
++  private def getLong(implicit inputStream: InputStream, buffer: Array[Byte]): Long = {
++    inputStream.read(buffer, 0, 8)
++    ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getLong
++  }
++
++  private def getDouble(implicit inputStream: InputStream, buffer: Array[Byte]): Double = {
++    inputStream.read(buffer, 0, 8)
++    ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getDouble
++  }
++
++  def fromByteArray(ar: Array[Byte]): FastTextArgs =
++    fromInputStream(new ByteArrayInputStream(ar))
++
++  def fromInputStream(inputStream: InputStream): FastTextArgs = {
++    implicit val is: InputStream = inputStream
++    implicit val bytes: Array[Byte] = new Array[Byte](8)
++    FastTextArgs(
++      getInt, getInt, getInt, getInt, getInt, getInt, getInt, getInt, getInt, getInt,
++      getInt, getInt, getInt, getInt, getDouble, getInt, getInt, getInt, getLong, getLong)
++  }
++
++  def main(args: Array[String]): Unit = {
++    val args0 = FastTextArgs.fromInputStream(new FileInputStream("/Users/emeth.kim/d/g/fastText/dataset/sample.model.bin"))
++    val serialized = args0.serialize
++    val args1 = FastTextArgs.fromByteArray(serialized)
++
++    println(args0)
++    println(args1)
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
index 0000000,0000000..af1c0cc
new file mode 100644
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
@@@ -1,0 -1,0 +1,56 @@@
++package org.apache.s2graph.core.fetcher.fasttext
++
++import com.typesafe.config.Config
++import org.apache.s2graph.core._
++import org.apache.s2graph.core.types.VertexId
++import org.apache.s2graph.core.utils.logger
++
++import scala.concurrent.{ExecutionContext, Future}
++
++
++class FastTextFetcher(val graph: S2GraphLike) extends EdgeFetcher {
++  val builder = graph.elementBuilder
++  var fastText: FastText = _
++
++  override def init(config: Config)(implicit ec: ExecutionContext): Unit = {
++    val dbPath = config.getString(FastText.DBPathKey)
++
++    try {
++      fastText = new FastText(dbPath)
++    } catch {
++      case e: Throwable =>
++        logger.error(s"[Init]: Failed.", e)
++        println(e)
++        throw e
++    }
++  }
++
++  override def fetches(queryRequests: Seq[QueryRequest],
++                       prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
++    val stepResultLs = queryRequests.map { queryRequest =>
++      val vertex = queryRequest.vertex
++      val queryParam = queryRequest.queryParam
++      val line = fastText.getLine(vertex.innerId.toIdString())
++
++      val edgeWithScores = fastText.predict(line, queryParam.limit).map { case (_label, score) =>
++        val tgtVertexId = builder.newVertexId(queryParam.label.service,
++          queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), _label)
++
++        val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
++        val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
++
++        EdgeWithScore(edge, score, queryParam.label)
++      }
++
++      StepResult(edgeWithScores, Nil, Nil)
++    }
++
++    Future.successful(stepResultLs)
++  }
++
++  override def close(): Unit = if (fastText != null) fastText.close()
++
++  // not supported yet.
++  override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] =
++    Future.successful(Nil)
++}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index b10feb9,c3abd03..0000000
deleted file mode 100644,100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ /dev/null
@@@ -1,97 -1,102 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements.  See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership.  The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License.  You may obtain a copy of the License at
-- * 
-- *   http://www.apache.org/licenses/LICENSE-2.0
-- * 
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied.  See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--
--package org.apache.s2graph.core.storage
--
--import com.typesafe.config.Config
--import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
--import org.apache.s2graph.core._
--import org.apache.s2graph.core.types.VertexId
--import org.apache.s2graph.core.utils.logger
--
--import scala.concurrent.{ExecutionContext, Future}
--
- trait StorageReadable extends Fetcher {
 -trait StorageReadable extends EdgeFetcher {
--  val io: StorageIO
--  val serDe: StorageSerDe
--// /**
--//    * responsible to fire parallel fetch call into storage and create future that will return merged result.
--//    *
--//    * @param queryRequests
--//    * @param prevStepEdges
--//    * @return
--//    */
--//  def fetches(queryRequests: Seq[QueryRequest],
--//              prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
--
--  def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
--
--  def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
 -
 -
 -
 -
 -
--
--  protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
--
-   protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
 -//  protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
--
--
--  def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = {
--    val queryParam = QueryParam(labelName = edge.innerLabel.label,
--      direction = GraphUtil.fromDirection(edge.getDir()),
--      tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
--      cacheTTLInMillis = -1)
--    val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
--    val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
--
--    fetchKeyValues(queryRequest, edge).map { kvs =>
--      val (edgeOpt, kvOpt) =
--        if (kvs.isEmpty) (None, None)
--        else {
--          import CanSKeyValue._
--          val snapshotEdgeOpt = io.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
--          val _kvOpt = kvs.headOption
--          (snapshotEdgeOpt, _kvOpt)
--        }
--      (edgeOpt, kvOpt)
--    } recoverWith { case ex: Throwable =>
--      logger.error(s"fetchQueryParam failed. fallback return.", ex)
--      throw new FetchTimeoutException(s"${edge.toLogString}")
--    }
--  }
- 
-   def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
-     def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
-       if (kvs.isEmpty) Nil
-       else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
-     }
- 
-     val futures = vertices.map { vertex =>
-       val queryParam = QueryParam.Empty
-       val q = Query.toQuery(Seq(vertex), Seq(queryParam))
-       val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
- 
-       fetchKeyValues(queryRequest, vertex).map { kvs =>
-         fromResult(kvs, vertex.serviceColumn.schemaVersion)
-       } recoverWith {
-         case ex: Throwable => Future.successful(Nil)
-       }
-     }
--
-     Future.sequence(futures).map(_.flatten)
-   }
 -//  def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
 -//    def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
 -//      if (kvs.isEmpty) Nil
 -//      else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
 -//    }
 -//
 -//    val futures = vertices.map { vertex =>
 -//      val queryParam = QueryParam.Empty
 -//      val q = Query.toQuery(Seq(vertex), Seq(queryParam))
 -//      val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
 -//
 -//      fetchKeyValues(queryRequest, vertex).map { kvs =>
 -//        fromResult(kvs, vertex.serviceColumn.schemaVersion)
 -//      } recoverWith {
 -//        case ex: Throwable => Future.successful(Nil)
 -//      }
 -//    }
 -//
 -//    Future.sequence(futures).map(_.flatten)
 -//  }
--
--}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 8dfbe1e,e6779fa..1f93174
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@@ -66,6 -66,6 +66,8 @@@ class BaseSparkTest extends FunSuite wi
      super.beforeAll()
      s2 = S2GraphHelper.getS2Graph(s2Config, true)
  
++    S2Graph.loadFetchersAndMutators(s2)
++    
      deleteRecursively(new File(options.output))
      deleteRecursively(new File(options.tempDir))
  

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
index bd14352,0000000..3f12e8c
mode 100644,000000..100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
@@@ -1,229 -1,0 +1,143 @@@
 +package org.apache.s2graph.s2jobs.task.custom.process
 +
 +import java.io.File
 +
 +import com.holdenkarau.spark.testing.DataFrameSuiteBase
- import com.typesafe.config.ConfigFactory
 +import org.apache.commons.io.FileUtils
- import org.apache.s2graph.core.Integrate.IntegrateCommon
 +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
- import org.apache.s2graph.core.model.annoy.AnnoyModelFetcher
- import org.apache.s2graph.core.{Query, QueryParam}
- import org.apache.s2graph.core.model.ModelManager
++import org.apache.s2graph.core.fetcher.annoy.AnnoyModelFetcher
++import org.apache.s2graph.core.{Query, QueryParam, ResourceManager}
 +import org.apache.s2graph.core.schema.Label
++import org.apache.s2graph.s2jobs.BaseSparkTest
 +import org.apache.s2graph.s2jobs.task.TaskConf
 +
 +import scala.concurrent.{Await, ExecutionContext}
 +import scala.concurrent.duration.Duration
 +import scala.io.Source
 +
- class ALSModelProcessTest extends IntegrateCommon with DataFrameSuiteBase {
++class ALSModelProcessTest extends BaseSparkTest {
 +  import scala.collection.JavaConverters._
 +
-   // this test require adding movie lens rating data(u.data, movie.txt) under resources
-   // so ignore for now until figure out how to automate download dataset.
- //  ignore("RUN ALS on movie lens rating data and build annoy index on itemFeatures, finally query.") {
- //    import spark.sqlContext.implicits._
- //    val ratingPath = this.getClass.getResource("/u.data").toURI.getPath
- //
- //    val ratings = Source.fromFile(new File(ratingPath)).getLines().toSeq.map { line =>
- //      val tokens = line.split("\t")
- //      (tokens(0).toInt, tokens(1).toInt, tokens(2).toFloat)
- //    }.toDF("userId", "movieId", "rating")
- //
- //    val outputPath = "/tmp"
- //    val localInputPath = "/tmp/annoy_input"
- //    val localIndexPath = "/tmp/annoy_result"
- //
- //    val taskOptions = Map(
- //      "outputPath" -> outputPath,
- //      "localInputPath" -> localInputPath,
- //      "localIndexPath" -> localIndexPath
- //    )
- //
- //    val conf = TaskConf("test", "test", Nil, taskOptions)
- //    ALSModelProcess.buildAnnoyIndex(spark, conf, ratings)
- //
- //    val labelName = "annoy_model_fetcher_test"
- //
- //    val remoteIndexFilePath = s"${localIndexPath}/annoy-index"
- //    val remoteDictFilePath = this.getClass.getResource(s"/movie.dict").toURI.getPath
- //
- //    val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
- //    val serviceColumn =
- //      management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
- //
- //    val options = s"""{
- //                     | "importer": {
- //                     |   "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter"
- //                     | },
- //                     | "fetcher": {
- //                     |   "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.annoy.AnnoyModelFetcher",
- //                     |   "${AnnoyModelFetcher.IndexFilePathKey}": "${remoteIndexFilePath}",
- //                     |   "${AnnoyModelFetcher.DictFilePathKey}": "${remoteDictFilePath}",
- //                     |   "${AnnoyModelFetcher.DimensionKey}": 10
- //                     | }
- //                     |}""".stripMargin
- //
- //    Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
- //
- //    val label = management.createLabel(
- //      labelName,
- //      serviceColumn,
- //      serviceColumn,
- //      true,
- //      service.serviceName,
- //      Seq.empty[Index].asJava,
- //      Seq.empty[Prop].asJava,
- //      "strong",
- //      null,
- //      -1,
- //      "v3",
- //      "gz",
- //      options
- //    )
- //    val config = ConfigFactory.parseString(options)
- //    val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
- //    Await.result(importerFuture, Duration("3 minutes"))
- //
- //    Thread.sleep(10000)
- //
- //    val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "Toy Story (1995)")
- //    val queryParam = QueryParam(labelName = labelName, limit = 5)
- //
- //    val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
- //    val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
- //
- //    stepResult.edgeWithScores.foreach { es =>
- //      println(es.edge.tgtVertex.innerIdVal)
- //    }
- //
- //    // clean up temp directory.
- //    FileUtils.deleteDirectory(new File(outputPath))
- //  }
- 
-   def annoyLabelOptions(indexPath: String, dictPath: String): String = {
++  // this test require adding movie
++  def annoyLabelOptions(indexPath: String): String = {
 +    val options = s"""{
-                      | "importer": {
-                      |   "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter"
-                      | },
 +                     | "fetcher": {
-                      |   "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.annoy.AnnoyModelFetcher",
++                     |   "${ResourceManager.ClassNameKey}": "org.apache.s2graph.core.fetcher.annoy.AnnoyModelFetcher",
 +                     |   "${AnnoyModelFetcher.IndexFilePathKey}": "${indexPath}",
-                      |   "${AnnoyModelFetcher.DictFilePathKey}": "${dictPath}",
 +                     |   "${AnnoyModelFetcher.DimensionKey}": 10
 +                     | }
 +                     |}""".stripMargin
 +    options
 +  }
-   def labelImport(labelName: String, indexPath: String, dictPath: String): Label = {
++  def createLabel(labelName: String): Label = {
++    val management = s2.management
 +    val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
 +    val serviceColumn =
 +      management.createServiceColumn("s2graph", "movie", "string", Seq(Prop("age", "0", "int", true)))
 +
-     val options = annoyLabelOptions(indexPath, dictPath)
- 
-     Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
- 
-     val label = management.createLabel(
-       labelName,
-       serviceColumn,
-       serviceColumn,
-       true,
-       service.serviceName,
-       Seq.empty[Index].asJava,
-       Seq.empty[Prop].asJava,
-       "strong",
-       null,
-       -1,
-       "v3",
-       "gz",
-       options
-     )
- 
-     val config = ConfigFactory.parseString(options)
-     val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
-     Await.result(importerFuture, Duration("3 minutes"))
++    Label.findByName(labelName, useCache = false) match {
++      case None =>
++        management.createLabel(
++          labelName,
++          service.serviceName, serviceColumn.columnName, serviceColumn.columnType,
++          service.serviceName, serviceColumn.columnName, serviceColumn.columnType,
++          service.serviceName,
++          Seq.empty[Index],
++          Seq.empty[Prop],
++          isDirected = true,
++          consistencyLevel = "strong",
++          hTableName = None,
++          hTableTTL = None,
++          schemaVersion = "v3",
++          compressionAlgorithm = "gz",
++          options = None
++        ).get
++      case Some(label) => label
++    }
++  }
++
++  def registerEdgeFetcher(labelName: String, indexPath: String): Label = {
++    val label = createLabel(labelName)
++    s2.management.updateEdgeFetcher(label, Option(annoyLabelOptions(indexPath)))
 +
 +    Thread.sleep(10000)
 +
 +    label
 +  }
 +
 +  def buildALS(ratingsPath: String, indexPath: String) = {
 +    import spark.sqlContext.implicits._
 +
 +    FileUtils.deleteQuietly(new File(indexPath))
 +
 +    val buffer = scala.collection.mutable.ListBuffer.empty[(Int, Int, Float)]
 +
 +    val lines = Source.fromFile(ratingsPath).getLines()
 +    // skip over header.
 +    lines.next()
 +
 +    while (lines.hasNext) {
 +      val line = lines.next()
 +      try {
 +        val Array(userId, movieId, rating, ts) = line.split(",")
 +        buffer += ((userId.toInt, movieId.toInt, rating.toFloat))
 +      } catch {
 +        case e: Exception => // skip over.
 +      }
 +    }
 +
 +    val rating = buffer.toDF("userId", "movieId", "rating")
 +
 +    val processConf = TaskConf(name = "test", `type` = "test", inputs = Nil,
 +      options = Map.empty)
 +
 +    val process = new ALSModelProcess(processConf)
 +    val df = process.execute(spark, Map("test" -> rating))
 +
 +    val sinkConf = TaskConf(name = "sink", `type` = "sink", inputs = Nil,
 +      options = Map("path" -> indexPath, "itemFactors" -> indexPath))
 +
 +    val sink = new AnnoyIndexBuildSink("sinkTest", sinkConf)
 +    sink.write(df)
 +  }
 +
++  def generateDataset = {
++    import sys.process._
++
++    val generateInputScript = "sh ./example/movielens/generate_input.sh"
++    generateInputScript !
++  }
++
 +  test("ALS ModelProcess and AnnoyIndexBuildSink") {
-     import spark.sqlContext.implicits._
++    val labelName = "annoy_index_test"
++
++    generateDataset
 +
-     val inputPath = "/Users/shon/Workspace/incubator-s2graph/example/movielens/input/ratings.csv"
-     val indexPath = "./annoy_result"
-     val dictPath = "./example/movielens/input/movie.dict"
++    val inputPath = "input/ratings.csv"
++    val indexPath = "annoy_result"
++//    val dictPath = "input/movie.dict"
 +
 +    buildALS(inputPath, indexPath)
 +
-     val labelName = "annoy_index_test"
-     val label = labelImport(labelName, indexPath, dictPath)
- //    val options = annoyLabelOptions(indexPath, dictPath)
- //
- //    val config = ConfigFactory.parseString(label.options.get).getConfig("fetcher")
- //    val config = ConfigFactory.parseString(options).getConfig("fetcher")
- 
- //    val ANNIndexWithDict(index, dict) = AnnoyModelFetcher.buildIndex(config)
- //    val v = index.getItemVector(1)
- //
- //    import scala.collection.JavaConverters._
- //    index.getNearest(v, 10).asScala.foreach { x =>
- //      println(x)
- //    }
- 
- 
- //
-     val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
++
++    val label = registerEdgeFetcher(labelName, indexPath)
++
++    val service = s2.management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
 +    val serviceColumn =
-       management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
++      s2.management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
 +
-     val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "1")
++    val vertex = s2.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "1")
 +    val queryParam = QueryParam(labelName = labelName, limit = 5)
 +
 +    val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
-     val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
++    val stepResult = Await.result(s2.getEdges(query), Duration("60 seconds"))
 +
 +    stepResult.edgeWithScores.foreach { es =>
 +      println(es.edge.tgtVertex.innerIdVal)
 +    }
++
++    Label.delete(label.id.get)
++    FileUtils.deleteDirectory(new File("input"))
++    FileUtils.deleteDirectory(new File("annoy_result"))
 +  }
 +}