You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/14 12:30:01 UTC
[17/25] incubator-s2graph git commit: Merge branch 'master' into
S2GRAPH-206
Merge branch 'master' into S2GRAPH-206
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/5ee1906f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/5ee1906f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/5ee1906f
Branch: refs/heads/master
Commit: 5ee1906fcfa127d8975e365b15ee07a57082fabf
Parents: 88eb052 16feda8
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu May 10 21:24:54 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri May 11 14:34:12 2018 +0900
----------------------------------------------------------------------
dev_support/graph_mysql/schema.sql | 1 +
example/run.sh | 14 +-
.../org/apache/s2graph/core/schema/schema.sql | 1 +
.../org/apache/s2graph/core/EdgeFetcher.scala | 38 +
.../org/apache/s2graph/core/EdgeMutator.scala | 45 +
.../scala/org/apache/s2graph/core/Fetcher.scala | 17 -
.../org/apache/s2graph/core/GraphUtil.scala | 7 +
.../org/apache/s2graph/core/Management.scala | 71 +-
.../scala/org/apache/s2graph/core/Mutator.scala | 22 -
.../apache/s2graph/core/ResourceManager.scala | 134 +
.../scala/org/apache/s2graph/core/S2Graph.scala | 105 +-
.../apache/s2graph/core/S2GraphFactory.scala | 11 +-
.../org/apache/s2graph/core/S2GraphLike.scala | 49 +-
.../apache/s2graph/core/TraversalHelper.scala | 6 +-
.../org/apache/s2graph/core/VertexFetcher.scala | 35 +
.../org/apache/s2graph/core/VertexMutator.scala | 33 +
.../core/fetcher/MemoryModelEdgeFetcher.scala | 62 +
.../core/fetcher/annoy/AnnoyModelFetcher.scala | 115 +
.../core/fetcher/fasttext/CopyModel.scala | 121 +
.../core/fetcher/fasttext/FastText.scala | 194 +
.../core/fetcher/fasttext/FastTextArgs.scala | 116 +
.../core/fetcher/fasttext/FastTextFetcher.scala | 56 +
.../apache/s2graph/core/io/Conversions.scala | 6 +-
.../s2graph/core/model/ImportStatus.scala | 40 -
.../apache/s2graph/core/model/Importer.scala | 103 -
.../s2graph/core/model/MemoryModelFetcher.scala | 41 -
.../s2graph/core/model/ModelManager.scala | 85 -
.../core/model/annoy/AnnoyModelFetcher.scala | 115 -
.../s2graph/core/model/fasttext/CopyModel.scala | 122 -
.../s2graph/core/model/fasttext/FastText.scala | 195 -
.../core/model/fasttext/FastTextArgs.scala | 119 -
.../core/model/fasttext/FastTextFetcher.scala | 64 -
.../s2graph/core/rest/RequestParser.scala | 8 +-
.../org/apache/s2graph/core/schema/Label.scala | 40 +-
.../s2graph/core/schema/ServiceColumn.scala | 47 +-
.../storage/DefaultOptimisticEdgeMutator.scala | 176 +
.../core/storage/DefaultOptimisticMutator.scala | 171 -
.../DefaultOptimisticVertexMutator.scala | 44 +
.../core/storage/OptimisticEdgeFetcher.scala | 56 +
.../core/storage/OptimisticMutator.scala | 23 +-
.../apache/s2graph/core/storage/Storage.scala | 68 +-
.../s2graph/core/storage/StorageReadable.scala | 97 -
.../storage/WriteWriteConflictResolver.scala | 6 +-
.../storage/hbase/AsynchbaseEdgeFetcher.scala | 147 +
.../hbase/AsynchbaseOptimisticEdgeFetcher.scala | 35 +
.../hbase/AsynchbaseOptimisticMutator.scala | 142 +
.../core/storage/hbase/AsynchbaseStorage.scala | 185 +-
.../hbase/AsynchbaseStorageReadable.scala | 367 -
.../hbase/AsynchbaseStorageWritable.scala | 142 -
.../storage/hbase/AsynchbaseVertexFetcher.scala | 87 +
.../core/storage/rocks/RocksEdgeFetcher.scala | 91 +
.../rocks/RocksOptimisticEdgeFetcher.scala | 41 +
.../storage/rocks/RocksOptimisticMutator.scala | 133 +
.../core/storage/rocks/RocksStorage.scala | 101 +-
.../storage/rocks/RocksStorageReadable.scala | 234 -
.../storage/rocks/RocksStorageWritable.scala | 133 -
.../core/storage/rocks/RocksVertexFetcher.scala | 114 +
.../s2graph/core/utils/ImportStatus.scala | 59 +
.../apache/s2graph/core/utils/Importer.scala | 122 +
.../s2graph/core/utils/SafeUpdateCache.scala | 11 +-
s2core/src/test/resources/item_factors.ann | Bin 137800 -> 0 bytes
s2core/src/test/resources/movie.dict | 1682 -
s2core/src/test/resources/test-index.dict | 100000 ----------------
s2core/src/test/resources/test-index.tree | Bin 18824 -> 0 bytes
s2core/src/test/resources/user_factors.ann | Bin 76804 -> 0 bytes
.../s2graph/core/TestCommonWithModels.scala | 14 +-
.../s2graph/core/fetcher/EdgeFetcherTest.scala | 89 +
.../apache/s2graph/core/model/FetcherTest.scala | 145 -
.../s2graph/core/model/HDFSImporterTest.scala | 96 -
.../model/fasttext/FastTextFetcherTest.scala | 78 -
.../core/tinkerpop/S2GraphProvider.scala | 73 +-
.../core/tinkerpop/structure/S2GraphTest.scala | 6 +-
.../apache/s2graph/graphql/GraphQLServer.scala | 8 +-
.../org/apache/s2graph/graphql/HttpServer.scala | 4 +-
.../graphql/repository/GraphRepository.scala | 2 +-
.../apache/s2graph/s2jobs/BaseSparkTest.scala | 13 +-
.../custom/process/ALSModelProcessTest.scala | 200 +-
77 files changed, 2982 insertions(+), 104451 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/example/run.sh
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 4b2274a,004b6e8..651323f
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@@ -161,6 -159,30 +159,35 @@@ object S2Graph
}
ConfigFactory.parseMap(kvs)
}
+
+ def initMutators(graph: S2GraphLike): Unit = {
+ val management = graph.management
+
+ ServiceColumn.findAll().foreach { column =>
+ management.updateVertexMutator(column, column.options)
+ }
+
+ Label.findAll().foreach { label =>
+ management.updateEdgeMutator(label, label.options)
+ }
+ }
+
+ def initFetchers(graph: S2GraphLike): Unit = {
+ val management = graph.management
+
+ ServiceColumn.findAll().foreach { column =>
+ management.updateVertexFetcher(column, column.options)
+ }
+
+ Label.findAll().foreach { label =>
+ management.updateEdgeFetcher(label, label.options)
+ }
+ }
++
++ def loadFetchersAndMutators(graph: S2GraphLike): Unit = {
++ initFetchers(graph)
++ initMutators(graph)
++ }
}
class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2GraphLike {
@@@ -250,22 -275,43 +277,42 @@@
storagePool.getOrElse(s"label:${label.label}", defaultStorage)
}
- //TODO:
- override def getFetcher(column: ServiceColumn): Fetcher = {
- getStorage(column.service).reader
-
+ /* Currently, each getter on Fetcher and Mutator missing proper implementation
+ * Please discuss what is proper way to maintain resources here and provide
+ * right implementation(S2GRAPH-213).
+ * */
+ override def getVertexFetcher(column: ServiceColumn): VertexFetcher = {
+ resourceManager.getOrElseUpdateVertexFetcher(column)
+ .getOrElse(defaultStorage.vertexFetcher)
}
- override def getFetcher(label: Label): Fetcher = {
- if (label.fetchConfigExist) modelManager.getFetcher(label)
- else getStorage(label).reader
+ override def getEdgeFetcher(label: Label): EdgeFetcher = {
+ resourceManager.getOrElseUpdateEdgeFetcher(label)
+ .getOrElse(defaultStorage.edgeFetcher)
}
- override def getMutator(column: ServiceColumn): Mutator = {
- getStorage(column.service).mutator
+ override def getAllVertexFetchers(): Seq[VertexFetcher] = {
+ resourceManager.getAllVertexFetchers()
}
- override def getMutator(label: Label): Mutator = {
- getStorage(label).mutator
+ override def getAllEdgeFetchers(): Seq[EdgeFetcher] = {
+ resourceManager.getAllEdgeFetchers()
+ }
+
+ override def getVertexMutator(column: ServiceColumn): VertexMutator = {
+ resourceManager.getOrElseUpdateVertexMutator(column)
+ .getOrElse(defaultStorage.vertexMutator)
+ }
+
+ override def getEdgeMutator(label: Label): EdgeMutator = {
+ resourceManager.getOrElseUpdateEdgeMutator(label)
+ .getOrElse(defaultStorage.edgeMutator)
+ }
+
+ //TODO:
+ override def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher = {
+ // getStorage(label).optimisticEdgeFetcher
+ null
}
//TODO:
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
index 0000000,0000000..f46df13
new file mode 100644
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/annoy/AnnoyModelFetcher.scala
@@@ -1,0 -1,0 +1,115 @@@
++package org.apache.s2graph.core.fetcher.annoy
++
++import annoy4s.Converters.KeyConverter
++import annoy4s._
++import com.typesafe.config.Config
++import org.apache.s2graph.core._
++import org.apache.s2graph.core.types.VertexId
++
++import scala.concurrent.{ExecutionContext, Future}
++
++object AnnoyModelFetcher {
++ val IndexFilePathKey = "annoyIndexFilePath"
++// val DictFilePathKey = "annoyDictFilePath"
++ val DimensionKey = "annoyIndexDimension"
++ val IndexTypeKey = "annoyIndexType"
++
++ // def loadDictFromLocal(file: File): Map[Int, String] = {
++ // val files = if (file.isDirectory) {
++ // file.listFiles()
++ // } else {
++ // Array(file)
++ // }
++ //
++ // files.flatMap { file =>
++ // Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, _idx) =>
++ // val tokens = line.stripMargin.split(",")
++ // try {
++ // val tpl = if (tokens.length < 2) {
++ // (tokens.head.toInt, tokens.head)
++ // } else {
++ // (tokens.head.toInt, tokens.tail.head)
++ // }
++ // Seq(tpl)
++ // } catch {
++ // case e: Exception => Nil
++ // }
++ // }
++ // }.toMap
++ // }
++
++ def buildAnnoy4s[T](indexPath: String)(implicit converter: KeyConverter[T]): Annoy[T] = {
++ Annoy.load[T](indexPath)
++ }
++
++ // def buildIndex(indexPath: String,
++ // dictPath: String,
++ // dimension: Int,
++ // indexType: IndexType): ANNIndexWithDict = {
++ // val dict = loadDictFromLocal(new File(dictPath))
++ // val index = new ANNIndex(dimension, indexPath, indexType)
++ //
++ // ANNIndexWithDict(index, dict)
++ // }
++ //
++ // def buildIndex(config: Config): ANNIndexWithDict = {
++ // val indexPath = config.getString(IndexFilePathKey)
++ // val dictPath = config.getString(DictFilePathKey)
++ //
++ // val dimension = config.getInt(DimensionKey)
++ // val indexType = Try { config.getString(IndexTypeKey) }.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR)
++ //
++ // buildIndex(indexPath, dictPath, dimension, indexType)
++ // }
++}
++
++//
++//case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) {
++// val dictRev = dict.map(kv => kv._2 -> kv._1)
++//}
++
++class AnnoyModelFetcher(val graph: S2GraphLike) extends EdgeFetcher {
++ import AnnoyModelFetcher._
++
++ val builder = graph.elementBuilder
++
++ // var model: ANNIndexWithDict = _
++ var model: Annoy[String] = _
++
++ override def init(config: Config)(implicit ec: ExecutionContext): Unit = {
++
++ model = AnnoyModelFetcher.buildAnnoy4s(config.getString(IndexFilePathKey))
++ }
++
++ /** Fetch **/
++ override def fetches(queryRequests: Seq[QueryRequest],
++ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
++ val stepResultLs = queryRequests.map { queryRequest =>
++ val vertex = queryRequest.vertex
++ val queryParam = queryRequest.queryParam
++
++ val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).map { case (tgtId, score) =>
++ val tgtVertexId = builder.newVertexId(queryParam.label.service,
++ queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), tgtId)
++
++ val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
++ val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
++
++ EdgeWithScore(edge, score, queryParam.label)
++ }
++
++ StepResult(edgeWithScores, Nil, Nil)
++ }
++
++ Future.successful(stepResultLs)
++ }
++
++ override def close(): Unit = {
++ // do clean up
++ model.close
++ }
++
++ // not supported yet.
++ override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] =
++ Future.successful(Nil)
++}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/CopyModel.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/CopyModel.scala
index 0000000,0000000..d23b390
new file mode 100644
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/CopyModel.scala
@@@ -1,0 -1,0 +1,121 @@@
++package org.apache.s2graph.core.fetcher.fasttext
++
++import java.io.{BufferedInputStream, FileInputStream, InputStream}
++import java.nio.{ByteBuffer, ByteOrder}
++import java.util
++
++import org.apache.s2graph.core.fetcher.fasttext
++import org.rocksdb._
++
++import scala.collection.JavaConverters._
++import scala.collection.mutable.ArrayBuffer
++
++object CopyModel {
++
++ def writeArgs(db: RocksDB, handle: ColumnFamilyHandle, args: fasttext.FastTextArgs): Unit = {
++ val wo = new WriteOptions().setDisableWAL(true).setSync(false)
++ db.put(handle, wo, "args".getBytes("UTF-8"), args.serialize)
++ wo.close()
++ println("done ")
++ }
++
++ def writeVocab(is: InputStream, db: RocksDB,
++ vocabHandle: ColumnFamilyHandle, labelHandle: ColumnFamilyHandle, args: fasttext.FastTextArgs): Unit = {
++ val wo = new WriteOptions().setDisableWAL(true).setSync(false)
++ val bb = ByteBuffer.allocate(13).order(ByteOrder.LITTLE_ENDIAN)
++ val wb = new ArrayBuffer[Byte]
++ for (wid <- 0 until args.size) {
++ bb.clear()
++ wb.clear()
++ var b = is.read()
++ while (b != 0) {
++ wb += b.toByte
++ b = is.read()
++ }
++ bb.putInt(wid)
++ is.read(bb.array(), 4, 9)
++ db.put(vocabHandle, wo, wb.toArray, bb.array())
++
++ if (bb.get(12) == 1) {
++ val label = wid - args.nwords
++ db.put(labelHandle, ByteBuffer.allocate(4).putInt(label).array(), wb.toArray)
++ }
++
++ if ((wid + 1) % 1000 == 0)
++ print(f"\rprocessing ${100 * (wid + 1) / args.size.toFloat}%.2f%%")
++ }
++ println("\rdone ")
++ wo.close()
++ }
++
++ def writeVectors(is: InputStream, db: RocksDB, handle: ColumnFamilyHandle, args: fasttext.FastTextArgs): Unit = {
++ require(is.read() == 0, "not implemented")
++ val wo = new WriteOptions().setDisableWAL(true).setSync(false)
++ val bb = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN)
++ val key = ByteBuffer.allocate(8)
++ val value = new Array[Byte](args.dim * 4)
++ is.read(bb.array())
++ val m = bb.getLong
++ val n = bb.getLong
++ require(n * 4 == value.length)
++ var i = 0L
++ while (i < m) {
++ key.clear()
++ key.putLong(i)
++ is.read(value)
++ db.put(handle, wo, key.array(), value)
++ if ((i + 1) % 1000 == 0)
++ print(f"\rprocessing ${100 * (i + 1) / m.toFloat}%.2f%%")
++ i += 1
++ }
++ println("\rdone ")
++ wo.close()
++ }
++
++ def printHelp(): Unit = {
++ println("usage: CopyModel <in> <out>")
++ }
++
++ def copy(in: String, out: String): Unit = {
++ RocksDB.destroyDB(out, new Options)
++
++ val dbOptions = new DBOptions()
++ .setCreateIfMissing(true)
++ .setCreateMissingColumnFamilies(true)
++ .setAllowMmapReads(false)
++ .setMaxOpenFiles(500000)
++ .setDbWriteBufferSize(134217728)
++ .setMaxBackgroundCompactions(20)
++
++ val descriptors = new java.util.LinkedList[ColumnFamilyDescriptor]()
++ descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY))
++ descriptors.add(new ColumnFamilyDescriptor("vocab".getBytes()))
++ descriptors.add(new ColumnFamilyDescriptor("i".getBytes()))
++ descriptors.add(new ColumnFamilyDescriptor("o".getBytes()))
++ val handles = new util.LinkedList[ColumnFamilyHandle]()
++ val db = RocksDB.open(dbOptions, out, descriptors, handles)
++
++ val is = new BufferedInputStream(new FileInputStream(in))
++ val fastTextArgs = FastTextArgs.fromInputStream(is)
++
++ require(fastTextArgs.magic == FastText.FASTTEXT_FILEFORMAT_MAGIC_INT32)
++ require(fastTextArgs.version == FastText.FASTTEXT_VERSION)
++
++ println("step 1: writing args")
++ writeArgs(db, handles.get(0), fastTextArgs)
++ println("step 2: writing vocab")
++ writeVocab(is, db, handles.get(1), handles.get(0), fastTextArgs)
++ println("step 3: writing input vectors")
++ writeVectors(is, db, handles.get(2), fastTextArgs)
++ println("step 4: writing output vectors")
++ writeVectors(is, db, handles.get(3), fastTextArgs)
++ println("step 5: compactRange")
++ db.compactRange()
++ println("done")
++
++ handles.asScala.foreach(_.close())
++ db.close()
++ is.close()
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastText.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastText.scala
index 0000000,0000000..c465bd8
new file mode 100644
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastText.scala
@@@ -1,0 -1,0 +1,194 @@@
++package org.apache.s2graph.core.fetcher.fasttext
++
++import java.nio.{ByteBuffer, ByteOrder}
++import java.util
++
++import org.rocksdb.{ColumnFamilyDescriptor, ColumnFamilyHandle, DBOptions, RocksDB}
++
++import scala.collection.JavaConverters._
++import scala.collection.mutable.ArrayBuffer
++
++case class Line(labels: Array[Int], words: Array[Long])
++
++case class Entry(wid: Int, count: Long, tpe: Byte, subwords: Array[Long])
++
++object FastText {
++ val EOS = "</s>"
++ val BOW = "<"
++ val EOW = ">"
++
++ val FASTTEXT_VERSION = 12 // Version 1b
++ val FASTTEXT_FILEFORMAT_MAGIC_INT32 = 793712314
++
++ val MODEL_CBOW = 1
++ val MODEL_SG = 2
++ val MODEL_SUP = 3
++
++ val LOSS_HS = 1
++ val LOSS_NS = 2
++ val LOSS_SOFTMAX = 3
++
++ val DBPathKey = "dbPath"
++
++ def tokenize(in: String): Array[String] = in.split("\\s+") ++ Array("</s>")
++
++ def getSubwords(word: String, minn: Int, maxn: Int): Array[String] = {
++ val l = math.max(minn, 1)
++ val u = math.min(maxn, word.length)
++ val r = l to u flatMap word.sliding
++ r.filterNot(s => s == BOW || s == EOW).toArray
++ }
++
++ def hash(str: String): Long = {
++ var h = 2166136261L.toInt
++ for (b <- str.getBytes) {
++ h = (h ^ b) * 16777619
++ }
++ h & 0xffffffffL
++ }
++
++}
++
++class FastText(name: String) extends AutoCloseable {
++
++ import FastText._
++
++ private val dbOptions = new DBOptions()
++ private val descriptors = new java.util.LinkedList[ColumnFamilyDescriptor]()
++ descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY))
++ descriptors.add(new ColumnFamilyDescriptor("vocab".getBytes()))
++ descriptors.add(new ColumnFamilyDescriptor("i".getBytes()))
++ descriptors.add(new ColumnFamilyDescriptor("o".getBytes()))
++ private val handles = new util.LinkedList[ColumnFamilyHandle]()
++ private val db = RocksDB.openReadOnly(dbOptions, name, descriptors, handles)
++
++ private val defaultHandle = handles.get(0)
++ private val vocabHandle = handles.get(1)
++ private val inputVectorHandle = handles.get(2)
++ private val outputVectorHandle = handles.get(3)
++
++ private val args = FastTextArgs.fromByteArray(db.get(defaultHandle, "args".getBytes("UTF-8")))
++ private val wo = loadOutputVectors()
++ private val labels = loadLabels()
++
++ println(args)
++
++ require(args.magic == FASTTEXT_FILEFORMAT_MAGIC_INT32)
++ require(args.version == FASTTEXT_VERSION)
++
++ // only sup/softmax supported
++ // others are the future work.
++ require(args.model == MODEL_SUP)
++ require(args.loss == LOSS_SOFTMAX)
++
++ private def getVector(handle: ColumnFamilyHandle, key: Long): Array[Float] = {
++ val keyBytes = ByteBuffer.allocate(8).putLong(key).array()
++ val bb = ByteBuffer.wrap(db.get(handle, keyBytes)).order(ByteOrder.LITTLE_ENDIAN)
++ Array.fill(args.dim)(bb.getFloat)
++ }
++
++ private def loadOutputVectors(): Array[Array[Float]] =
++ Array.tabulate(args.nlabels)(key => getVector(outputVectorHandle, key.toLong))
++
++ private def loadLabels(): Array[String] = {
++ val result = new Array[String](args.nlabels)
++ val it = db.newIterator(defaultHandle)
++ var i = 0
++ it.seekToFirst()
++ while (it.isValid) {
++ val key = ByteBuffer.wrap(it.key()).getInt()
++ if (key < args.nlabels) {
++ require(i == key)
++ result(i) = new String(it.value(), "UTF-8")
++ i += 1
++ }
++ it.next()
++ }
++ result
++ }
++
++ def getInputVector(key: Long): Array[Float] = getVector(inputVectorHandle, key)
++
++ def getOutputVector(key: Long): Array[Float] = getVector(outputVectorHandle, key)
++
++ def getEntry(word: String): Entry = {
++ val raw = db.get(vocabHandle, word.getBytes("UTF-8"))
++ if (raw == null) {
++ Entry(-1, 0L, 1, Array.emptyLongArray)
++ } else {
++ val bb = ByteBuffer.wrap(raw).order(ByteOrder.LITTLE_ENDIAN)
++ val wid = bb.getInt
++ val count = bb.getLong
++ val tpe = bb.get
++ val subwords = if (word != EOS && tpe == 0) Array(wid.toLong) ++ computeSubwords(BOW + word + EOW) else Array(wid.toLong)
++ Entry(wid, count, tpe, subwords)
++ }
++ }
++
++ def computeSubwords(word: String): Array[Long] =
++ getSubwords(word, args.minn, args.maxn).map { w => args.nwords + (hash(w) % args.bucket.toLong) }
++
++ def getLine(in: String): Line = {
++ val tokens = tokenize(in)
++ val words = new ArrayBuffer[Long]()
++ val labels = new ArrayBuffer[Int]()
++ tokens foreach { token =>
++ val Entry(wid, count, tpe, subwords) = getEntry(token)
++ if (tpe == 0) {
++ // addSubwords
++ if (wid < 0) { // OOV
++ if (token != EOS) {
++ words ++= computeSubwords(BOW + token + EOW)
++ }
++ } else {
++ words ++= subwords
++ }
++ } else if (tpe == 1 && wid > 0) {
++ labels += wid - args.nwords
++ }
++ }
++ Line(labels.toArray, words.toArray)
++ }
++
++ def computeHidden(input: Array[Long]): Array[Float] = {
++ val hidden = new Array[Float](args.dim)
++ for (row <- input.map(getInputVector)) {
++ var i = 0
++ while (i < hidden.length) {
++ hidden(i) += row(i) / input.length
++ i += 1
++ }
++ }
++ hidden
++ }
++
++ def predict(line: Line, k: Int = 1): Array[(String, Float)] = {
++ val hidden = computeHidden(line.words)
++ val output = wo.map { o =>
++ o.zip(hidden).map(a => a._1 * a._2).sum
++ }
++ val max = output.max
++ var i = 0
++ var z = 0.0f
++ while (i < output.length) {
++ output(i) = math.exp((output(i) - max).toDouble).toFloat
++ z += output(i)
++ i += 1
++ }
++ i = 0
++ while (i < output.length) {
++ output(i) /= z
++ i += 1
++ }
++ output.zipWithIndex.sortBy(-_._1).take(k).map { case (prob, i) =>
++ labels(i) -> prob
++ }
++ }
++
++ def close(): Unit = {
++ handles.asScala.foreach(_.close())
++ dbOptions.close()
++ db.close()
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextArgs.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextArgs.scala
index 0000000,0000000..0ad0b15
new file mode 100644
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextArgs.scala
@@@ -1,0 -1,0 +1,116 @@@
++package org.apache.s2graph.core.fetcher.fasttext
++
++import java.io.{ByteArrayInputStream, FileInputStream, InputStream}
++import java.nio.{ByteBuffer, ByteOrder}
++
++case class FastTextArgs(
++ magic: Int,
++ version: Int,
++ dim: Int,
++ ws: Int,
++ epoch: Int,
++ minCount: Int,
++ neg: Int,
++ wordNgrams: Int,
++ loss: Int,
++ model: Int,
++ bucket: Int,
++ minn: Int,
++ maxn: Int,
++ lrUpdateRate: Int,
++ t: Double,
++ size: Int,
++ nwords: Int,
++ nlabels: Int,
++ ntokens: Long,
++ pruneidxSize: Long) {
++
++ def serialize: Array[Byte] = {
++ val bb = ByteBuffer.allocate(92).order(ByteOrder.LITTLE_ENDIAN)
++ bb.putInt(magic)
++ bb.putInt(version)
++ bb.putInt(dim)
++ bb.putInt(ws)
++ bb.putInt(epoch)
++ bb.putInt(minCount)
++ bb.putInt(neg)
++ bb.putInt(wordNgrams)
++ bb.putInt(loss)
++ bb.putInt(model)
++ bb.putInt(bucket)
++ bb.putInt(minn)
++ bb.putInt(maxn)
++ bb.putInt(lrUpdateRate)
++ bb.putDouble(t)
++ bb.putInt(size)
++ bb.putInt(nwords)
++ bb.putInt(nlabels)
++ bb.putLong(ntokens)
++ bb.putLong(pruneidxSize)
++ bb.array()
++ }
++
++ override def toString: String = {
++ s"""magic: $magic
++ |version: $version
++ |dim: $dim
++ |ws : $ws
++ |epoch: $epoch
++ |minCount: $minCount
++ |neg: $neg
++ |wordNgrams: $wordNgrams
++ |loss: $loss
++ |model: $model
++ |bucket: $bucket
++ |minn: $minn
++ |maxn: $maxn
++ |lrUpdateRate: $lrUpdateRate
++ |t: $t
++ |size: $size
++ |nwords: $nwords
++ |nlabels: $nlabels
++ |ntokens: $ntokens
++ |pruneIdxSize: $pruneidxSize
++ |""".stripMargin
++ }
++
++}
++
++object FastTextArgs {
++
++ private def getInt(implicit inputStream: InputStream, buffer: Array[Byte]): Int = {
++ inputStream.read(buffer, 0, 4)
++ ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getInt
++ }
++
++ private def getLong(implicit inputStream: InputStream, buffer: Array[Byte]): Long = {
++ inputStream.read(buffer, 0, 8)
++ ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getLong
++ }
++
++ private def getDouble(implicit inputStream: InputStream, buffer: Array[Byte]): Double = {
++ inputStream.read(buffer, 0, 8)
++ ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN).getDouble
++ }
++
++ def fromByteArray(ar: Array[Byte]): FastTextArgs =
++ fromInputStream(new ByteArrayInputStream(ar))
++
++ def fromInputStream(inputStream: InputStream): FastTextArgs = {
++ implicit val is: InputStream = inputStream
++ implicit val bytes: Array[Byte] = new Array[Byte](8)
++ FastTextArgs(
++ getInt, getInt, getInt, getInt, getInt, getInt, getInt, getInt, getInt, getInt,
++ getInt, getInt, getInt, getInt, getDouble, getInt, getInt, getInt, getLong, getLong)
++ }
++
++ def main(args: Array[String]): Unit = {
++ val args0 = FastTextArgs.fromInputStream(new FileInputStream("/Users/emeth.kim/d/g/fastText/dataset/sample.model.bin"))
++ val serialized = args0.serialize
++ val args1 = FastTextArgs.fromByteArray(serialized)
++
++ println(args0)
++ println(args1)
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
index 0000000,0000000..af1c0cc
new file mode 100644
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/fasttext/FastTextFetcher.scala
@@@ -1,0 -1,0 +1,56 @@@
++package org.apache.s2graph.core.fetcher.fasttext
++
++import com.typesafe.config.Config
++import org.apache.s2graph.core._
++import org.apache.s2graph.core.types.VertexId
++import org.apache.s2graph.core.utils.logger
++
++import scala.concurrent.{ExecutionContext, Future}
++
++
++class FastTextFetcher(val graph: S2GraphLike) extends EdgeFetcher {
++ val builder = graph.elementBuilder
++ var fastText: FastText = _
++
++ override def init(config: Config)(implicit ec: ExecutionContext): Unit = {
++ val dbPath = config.getString(FastText.DBPathKey)
++
++ try {
++ fastText = new FastText(dbPath)
++ } catch {
++ case e: Throwable =>
++ logger.error(s"[Init]: Failed.", e)
++ println(e)
++ throw e
++ }
++ }
++
++ override def fetches(queryRequests: Seq[QueryRequest],
++ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
++ val stepResultLs = queryRequests.map { queryRequest =>
++ val vertex = queryRequest.vertex
++ val queryParam = queryRequest.queryParam
++ val line = fastText.getLine(vertex.innerId.toIdString())
++
++ val edgeWithScores = fastText.predict(line, queryParam.limit).map { case (_label, score) =>
++ val tgtVertexId = builder.newVertexId(queryParam.label.service,
++ queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), _label)
++
++ val props: Map[String, Any] = if (queryParam.label.metaPropsInvMap.contains("score")) Map("score" -> score) else Map.empty
++ val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction, props = props)
++
++ EdgeWithScore(edge, score, queryParam.label)
++ }
++
++ StepResult(edgeWithScores, Nil, Nil)
++ }
++
++ Future.successful(stepResultLs)
++ }
++
++ override def close(): Unit = if (fastText != null) fastText.close()
++
++ // not supported yet.
++ override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] =
++ Future.successful(Nil)
++}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --cc s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index b10feb9,c3abd03..0000000
deleted file mode 100644,100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ /dev/null
@@@ -1,97 -1,102 +1,0 @@@
--/*
-- * Licensed to the Apache Software Foundation (ASF) under one
-- * or more contributor license agreements. See the NOTICE file
-- * distributed with this work for additional information
-- * regarding copyright ownership. The ASF licenses this file
-- * to you under the Apache License, Version 2.0 (the
-- * "License"); you may not use this file except in compliance
-- * with the License. You may obtain a copy of the License at
-- *
-- * http://www.apache.org/licenses/LICENSE-2.0
-- *
-- * Unless required by applicable law or agreed to in writing,
-- * software distributed under the License is distributed on an
-- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- * KIND, either express or implied. See the License for the
-- * specific language governing permissions and limitations
-- * under the License.
-- */
--
--package org.apache.s2graph.core.storage
--
--import com.typesafe.config.Config
--import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
--import org.apache.s2graph.core._
--import org.apache.s2graph.core.types.VertexId
--import org.apache.s2graph.core.utils.logger
--
--import scala.concurrent.{ExecutionContext, Future}
--
- trait StorageReadable extends Fetcher {
-trait StorageReadable extends EdgeFetcher {
-- val io: StorageIO
-- val serDe: StorageSerDe
--// /**
--// * responsible to fire parallel fetch call into storage and create future that will return merged result.
--// *
--// * @param queryRequests
--// * @param prevStepEdges
--// * @return
--// */
--// def fetches(queryRequests: Seq[QueryRequest],
--// prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
--
-- def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
--
-- def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
-
-
-
-
-
--
-- protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
--
- protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
-// protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
--
--
-- def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = {
-- val queryParam = QueryParam(labelName = edge.innerLabel.label,
-- direction = GraphUtil.fromDirection(edge.getDir()),
-- tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
-- cacheTTLInMillis = -1)
-- val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
-- val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
--
-- fetchKeyValues(queryRequest, edge).map { kvs =>
-- val (edgeOpt, kvOpt) =
-- if (kvs.isEmpty) (None, None)
-- else {
-- import CanSKeyValue._
-- val snapshotEdgeOpt = io.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
-- val _kvOpt = kvs.headOption
-- (snapshotEdgeOpt, _kvOpt)
-- }
-- (edgeOpt, kvOpt)
-- } recoverWith { case ex: Throwable =>
-- logger.error(s"fetchQueryParam failed. fallback return.", ex)
-- throw new FetchTimeoutException(s"${edge.toLogString}")
-- }
-- }
-
- def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
- def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
- if (kvs.isEmpty) Nil
- else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
- }
-
- val futures = vertices.map { vertex =>
- val queryParam = QueryParam.Empty
- val q = Query.toQuery(Seq(vertex), Seq(queryParam))
- val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
-
- fetchKeyValues(queryRequest, vertex).map { kvs =>
- fromResult(kvs, vertex.serviceColumn.schemaVersion)
- } recoverWith {
- case ex: Throwable => Future.successful(Nil)
- }
- }
--
- Future.sequence(futures).map(_.flatten)
- }
-// def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
-// def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
-// if (kvs.isEmpty) Nil
-// else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
-// }
-//
-// val futures = vertices.map { vertex =>
-// val queryParam = QueryParam.Empty
-// val q = Query.toQuery(Seq(vertex), Seq(queryParam))
-// val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
-//
-// fetchKeyValues(queryRequest, vertex).map { kvs =>
-// fromResult(kvs, vertex.serviceColumn.schemaVersion)
-// } recoverWith {
-// case ex: Throwable => Future.successful(Nil)
-// }
-// }
-//
-// Future.sequence(futures).map(_.flatten)
-// }
--
--}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
index 8dfbe1e,e6779fa..1f93174
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/BaseSparkTest.scala
@@@ -66,6 -66,6 +66,8 @@@ class BaseSparkTest extends FunSuite wi
super.beforeAll()
s2 = S2GraphHelper.getS2Graph(s2Config, true)
++ S2Graph.loadFetchersAndMutators(s2)
++
deleteRecursively(new File(options.output))
deleteRecursively(new File(options.tempDir))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/5ee1906f/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
----------------------------------------------------------------------
diff --cc s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
index bd14352,0000000..3f12e8c
mode 100644,000000..100644
--- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
+++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
@@@ -1,229 -1,0 +1,143 @@@
+package org.apache.s2graph.s2jobs.task.custom.process
+
+import java.io.File
+
+import com.holdenkarau.spark.testing.DataFrameSuiteBase
- import com.typesafe.config.ConfigFactory
+import org.apache.commons.io.FileUtils
- import org.apache.s2graph.core.Integrate.IntegrateCommon
+import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
- import org.apache.s2graph.core.model.annoy.AnnoyModelFetcher
- import org.apache.s2graph.core.{Query, QueryParam}
- import org.apache.s2graph.core.model.ModelManager
++import org.apache.s2graph.core.fetcher.annoy.AnnoyModelFetcher
++import org.apache.s2graph.core.{Query, QueryParam, ResourceManager}
+import org.apache.s2graph.core.schema.Label
++import org.apache.s2graph.s2jobs.BaseSparkTest
+import org.apache.s2graph.s2jobs.task.TaskConf
+
+import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.duration.Duration
+import scala.io.Source
+
- class ALSModelProcessTest extends IntegrateCommon with DataFrameSuiteBase {
++class ALSModelProcessTest extends BaseSparkTest {
+ import scala.collection.JavaConverters._
+
- // this test require adding movie lens rating data(u.data, movie.txt) under resources
- // so ignore for now until figure out how to automate download dataset.
- // ignore("RUN ALS on movie lens rating data and build annoy index on itemFeatures, finally query.") {
- // import spark.sqlContext.implicits._
- // val ratingPath = this.getClass.getResource("/u.data").toURI.getPath
- //
- // val ratings = Source.fromFile(new File(ratingPath)).getLines().toSeq.map { line =>
- // val tokens = line.split("\t")
- // (tokens(0).toInt, tokens(1).toInt, tokens(2).toFloat)
- // }.toDF("userId", "movieId", "rating")
- //
- // val outputPath = "/tmp"
- // val localInputPath = "/tmp/annoy_input"
- // val localIndexPath = "/tmp/annoy_result"
- //
- // val taskOptions = Map(
- // "outputPath" -> outputPath,
- // "localInputPath" -> localInputPath,
- // "localIndexPath" -> localIndexPath
- // )
- //
- // val conf = TaskConf("test", "test", Nil, taskOptions)
- // ALSModelProcess.buildAnnoyIndex(spark, conf, ratings)
- //
- // val labelName = "annoy_model_fetcher_test"
- //
- // val remoteIndexFilePath = s"${localIndexPath}/annoy-index"
- // val remoteDictFilePath = this.getClass.getResource(s"/movie.dict").toURI.getPath
- //
- // val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
- // val serviceColumn =
- // management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
- //
- // val options = s"""{
- // | "importer": {
- // | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter"
- // | },
- // | "fetcher": {
- // | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.annoy.AnnoyModelFetcher",
- // | "${AnnoyModelFetcher.IndexFilePathKey}": "${remoteIndexFilePath}",
- // | "${AnnoyModelFetcher.DictFilePathKey}": "${remoteDictFilePath}",
- // | "${AnnoyModelFetcher.DimensionKey}": 10
- // | }
- // |}""".stripMargin
- //
- // Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
- //
- // val label = management.createLabel(
- // labelName,
- // serviceColumn,
- // serviceColumn,
- // true,
- // service.serviceName,
- // Seq.empty[Index].asJava,
- // Seq.empty[Prop].asJava,
- // "strong",
- // null,
- // -1,
- // "v3",
- // "gz",
- // options
- // )
- // val config = ConfigFactory.parseString(options)
- // val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
- // Await.result(importerFuture, Duration("3 minutes"))
- //
- // Thread.sleep(10000)
- //
- // val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "Toy Story (1995)")
- // val queryParam = QueryParam(labelName = labelName, limit = 5)
- //
- // val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
- // val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
- //
- // stepResult.edgeWithScores.foreach { es =>
- // println(es.edge.tgtVertex.innerIdVal)
- // }
- //
- // // clean up temp directory.
- // FileUtils.deleteDirectory(new File(outputPath))
- // }
-
- def annoyLabelOptions(indexPath: String, dictPath: String): String = {
++ // this test require adding movie
++ def annoyLabelOptions(indexPath: String): String = {
+ val options = s"""{
- | "importer": {
- | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter"
- | },
+ | "fetcher": {
- | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.annoy.AnnoyModelFetcher",
++ | "${ResourceManager.ClassNameKey}": "org.apache.s2graph.core.fetcher.annoy.AnnoyModelFetcher",
+ | "${AnnoyModelFetcher.IndexFilePathKey}": "${indexPath}",
- | "${AnnoyModelFetcher.DictFilePathKey}": "${dictPath}",
+ | "${AnnoyModelFetcher.DimensionKey}": 10
+ | }
+ |}""".stripMargin
+ options
+ }
- def labelImport(labelName: String, indexPath: String, dictPath: String): Label = {
++ def createLabel(labelName: String): Label = {
++ val management = s2.management
+ val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
+ val serviceColumn =
+ management.createServiceColumn("s2graph", "movie", "string", Seq(Prop("age", "0", "int", true)))
+
- val options = annoyLabelOptions(indexPath, dictPath)
-
- Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) }
-
- val label = management.createLabel(
- labelName,
- serviceColumn,
- serviceColumn,
- true,
- service.serviceName,
- Seq.empty[Index].asJava,
- Seq.empty[Prop].asJava,
- "strong",
- null,
- -1,
- "v3",
- "gz",
- options
- )
-
- val config = ConfigFactory.parseString(options)
- val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global)
- Await.result(importerFuture, Duration("3 minutes"))
++ Label.findByName(labelName, useCache = false) match {
++ case None =>
++ management.createLabel(
++ labelName,
++ service.serviceName, serviceColumn.columnName, serviceColumn.columnType,
++ service.serviceName, serviceColumn.columnName, serviceColumn.columnType,
++ service.serviceName,
++ Seq.empty[Index],
++ Seq.empty[Prop],
++ isDirected = true,
++ consistencyLevel = "strong",
++ hTableName = None,
++ hTableTTL = None,
++ schemaVersion = "v3",
++ compressionAlgorithm = "gz",
++ options = None
++ ).get
++ case Some(label) => label
++ }
++ }
++
++ def registerEdgeFetcher(labelName: String, indexPath: String): Label = {
++ val label = createLabel(labelName)
++ s2.management.updateEdgeFetcher(label, Option(annoyLabelOptions(indexPath)))
+
+ Thread.sleep(10000)
+
+ label
+ }
+
+ def buildALS(ratingsPath: String, indexPath: String) = {
+ import spark.sqlContext.implicits._
+
+ FileUtils.deleteQuietly(new File(indexPath))
+
+ val buffer = scala.collection.mutable.ListBuffer.empty[(Int, Int, Float)]
+
+ val lines = Source.fromFile(ratingsPath).getLines()
+ // skip over header.
+ lines.next()
+
+ while (lines.hasNext) {
+ val line = lines.next()
+ try {
+ val Array(userId, movieId, rating, ts) = line.split(",")
+ buffer += ((userId.toInt, movieId.toInt, rating.toFloat))
+ } catch {
+ case e: Exception => // skip over.
+ }
+ }
+
+ val rating = buffer.toDF("userId", "movieId", "rating")
+
+ val processConf = TaskConf(name = "test", `type` = "test", inputs = Nil,
+ options = Map.empty)
+
+ val process = new ALSModelProcess(processConf)
+ val df = process.execute(spark, Map("test" -> rating))
+
+ val sinkConf = TaskConf(name = "sink", `type` = "sink", inputs = Nil,
+ options = Map("path" -> indexPath, "itemFactors" -> indexPath))
+
+ val sink = new AnnoyIndexBuildSink("sinkTest", sinkConf)
+ sink.write(df)
+ }
+
++ def generateDataset = {
++ import sys.process._
++
++ val generateInputScript = "sh ./example/movielens/generate_input.sh"
++ generateInputScript !
++ }
++
+ test("ALS ModelProcess and AnnoyIndexBuildSink") {
- import spark.sqlContext.implicits._
++ val labelName = "annoy_index_test"
++
++ generateDataset
+
- val inputPath = "/Users/shon/Workspace/incubator-s2graph/example/movielens/input/ratings.csv"
- val indexPath = "./annoy_result"
- val dictPath = "./example/movielens/input/movie.dict"
++ val inputPath = "input/ratings.csv"
++ val indexPath = "annoy_result"
++// val dictPath = "input/movie.dict"
+
+ buildALS(inputPath, indexPath)
+
- val labelName = "annoy_index_test"
- val label = labelImport(labelName, indexPath, dictPath)
- // val options = annoyLabelOptions(indexPath, dictPath)
- //
- // val config = ConfigFactory.parseString(label.options.get).getConfig("fetcher")
- // val config = ConfigFactory.parseString(options).getConfig("fetcher")
-
- // val ANNIndexWithDict(index, dict) = AnnoyModelFetcher.buildIndex(config)
- // val v = index.getItemVector(1)
- //
- // import scala.collection.JavaConverters._
- // index.getNearest(v, 10).asScala.foreach { x =>
- // println(x)
- // }
-
-
- //
- val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
++
++ val label = registerEdgeFetcher(labelName, indexPath)
++
++ val service = s2.management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get
+ val serviceColumn =
- management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
++ s2.management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true)))
+
- val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "1")
++ val vertex = s2.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "1")
+ val queryParam = QueryParam(labelName = labelName, limit = 5)
+
+ val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam))
- val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds"))
++ val stepResult = Await.result(s2.getEdges(query), Duration("60 seconds"))
+
+ stepResult.edgeWithScores.foreach { es =>
+ println(es.edge.tgtVertex.innerIdVal)
+ }
++
++ Label.delete(label.id.get)
++ FileUtils.deleteDirectory(new File("input"))
++ FileUtils.deleteDirectory(new File("annoy_result"))
+ }
+}