You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/23 04:29:34 UTC

[2/8] incubator-s2graph git commit: add search vertices

add search vertices


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/ba938bc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/ba938bc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/ba938bc0

Branch: refs/heads/master
Commit: ba938bc087898ba6221bbb53ea62e0ec12185fe3
Parents: 3ac20b4
Author: daewon <da...@apache.org>
Authored: Thu Apr 19 18:13:34 2018 +0900
Committer: daewon <da...@apache.org>
Committed: Thu Apr 19 18:13:34 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/QueryParam.scala    |  91 +++++++++-------
 .../scala/org/apache/s2graph/core/S2Graph.scala |  33 ++++--
 .../s2graph/core/index/ESIndexProvider.scala    |  67 ++++++------
 .../s2graph/core/index/IndexProvider.scala      |   1 +
 .../core/index/LuceneIndexProvider.scala        | 107 +++++++++++--------
 .../s2graph/core/index/IndexProviderTest.scala  |  23 ++--
 .../graphql/repository/GraphRepository.scala    |  20 ++--
 .../s2graph/graphql/types/FieldResolver.scala   |   2 +
 .../apache/s2graph/graphql/types/S2Type.scala   |  27 ++++-
 9 files changed, 229 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 748e8c5..faf04db 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -37,9 +37,11 @@ import scala.util.{Success, Try}
 object Query {
   val initialScore = 1.0
   lazy val empty = Query()
+
   def apply(query: Query): Query = {
     Query(query.vertices, query.steps, query.queryOption, query.jsonQuery)
   }
+
   def toQuery(srcVertices: Seq[S2VertexLike], queryParams: Seq[QueryParam]) = Query(srcVertices, Vector(Step(queryParams)))
 
 }
@@ -49,9 +51,10 @@ case class MinShouldMatchParam(prop: String, count: Int, terms: Set[Any])
 object GroupBy {
   val Empty = GroupBy()
 }
+
 case class GroupBy(keys: Seq[String] = Nil,
                    limit: Int = Int.MaxValue,
-                   minShouldMatch: Option[MinShouldMatchParam]= None)
+                   minShouldMatch: Option[MinShouldMatchParam] = None)
 
 case class MultiQuery(queries: Seq[Query],
                       weights: Seq[Double],
@@ -79,12 +82,12 @@ case class QueryOption(removeCycle: Boolean = false,
                        ignorePrevStepCache: Boolean = false) {
   val orderByKeys = orderByColumns.map(_._1)
   val ascendingVals = orderByColumns.map(_._2)
-  val selectColumnsMap = selectColumns.map { c => c -> true } .toMap
+  val selectColumnsMap = selectColumns.map { c => c -> true }.toMap
   val scoreFieldIdx = orderByKeys.zipWithIndex.find(t => t._1 == "score").map(_._2).getOrElse(-1)
   val (edgeSelectColumns, propsSelectColumns) = selectColumns.partition(c => LabelMeta.defaultRequiredMetaNames.contains(c))
   /** */
   val edgeSelectColumnsFiltered = edgeSelectColumns
-//  val edgeSelectColumnsFiltered = edgeSelectColumns.filterNot(c => groupBy.keys.contains(c))
+  //  val edgeSelectColumnsFiltered = edgeSelectColumns.filterNot(c => groupBy.keys.contains(c))
   lazy val cacheKeyBytes: Array[Byte] = {
     val selectBytes = Bytes.toBytes(selectColumns.toString)
     val groupBytes = Bytes.toBytes(groupBy.keys.toString)
@@ -118,9 +121,10 @@ object EdgeTransformer {
 }
 
 /**
- * TODO: step wise outputFields should be used with nextStepLimit, nextStepThreshold.
- * @param jsValue
- */
+  * TODO: step wise outputFields should be used with nextStepLimit, nextStepThreshold.
+  *
+  * @param jsValue
+  */
 case class EdgeTransformer(jsValue: JsValue) {
   val Delimiter = "\\$"
   val targets = jsValue.asOpt[List[Vector[String]]].toList
@@ -204,13 +208,15 @@ case class Step(queryParams: Seq[QueryParam],
                 cacheTTL: Long = -1,
                 groupBy: GroupBy = GroupBy.Empty) {
 
-//  lazy val excludes = queryParams.filter(_.exclude)
-//  lazy val includes = queryParams.filterNot(_.exclude)
-//  lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap
+  //  lazy val excludes = queryParams.filter(_.exclude)
+  //  lazy val includes = queryParams.filterNot(_.exclude)
+  //  lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap
 
   lazy val cacheKeyBytes = queryParams.map(_.toCacheKeyRaw(Array.empty[Byte])).foldLeft(Array.empty[Byte])(Bytes.add)
+
   def toCacheKey(lss: Seq[Long]): Long = Hashing.murmur3_128().hashBytes(toCacheKeyRaw(lss)).asLong()
-//    MurmurHash3.bytesHash(toCacheKeyRaw(lss))
+
+  //    MurmurHash3.bytesHash(toCacheKeyRaw(lss))
 
   def toCacheKeyRaw(lss: Seq[Long]): Array[Byte] = {
     var bytes = Array.empty[Byte]
@@ -277,6 +283,7 @@ object QueryParam {
   val Delimiter = ","
   val maxMetaByte = (-1).toByte
   val fillArray = Array.fill(100)(maxMetaByte)
+
   import scala.collection.JavaConverters._
 
   def apply(labelWithDirection: LabelWithDirection): QueryParam = {
@@ -285,34 +292,43 @@ object QueryParam {
     QueryParam(labelName = label.label, direction = direction)
   }
 }
+
+case class VertexQueryParam(offset: Int,
+                            limit: Int,
+                            searchString: Option[String],
+                            vertexIds: Seq[VertexId] = Nil,
+                            fetchProp: Boolean = true) {
+}
+
 case class QueryParam(labelName: String,
-                        direction: String = "out",
-                        offset: Int = 0,
-                        limit: Int = S2Graph.DefaultFetchLimit,
-                        sample: Int = -1,
-                        maxAttempt: Int = 20,
-                        rpcTimeout: Int = 600000,
-                        cacheTTLInMillis: Long = -1L,
-                        indexName: String = LabelIndex.DefaultName,
-                        where: Try[Where] = Success(WhereParser.success),
-                        timestamp: Long = System.currentTimeMillis(),
-                        threshold: Double = Double.MinValue,
-                        rank: RankParam = RankParam.Default,
-                        intervalOpt: Option[((Seq[(String, JsValue)]), Seq[(String, JsValue)])] = None,
-                        durationOpt: Option[(Long, Long)] = None,
-                        exclude: Boolean = false,
-                        include: Boolean = false,
-                        has: Map[String, Any] = Map.empty,
-                        duplicatePolicy: DuplicatePolicy = DuplicatePolicy.First,
-                        includeDegree: Boolean = false,
-                        scorePropagateShrinkage: Long = 500L,
-                        scorePropagateOp: String = "multiply",
-                        shouldNormalize: Boolean = false,
-                        whereRawOpt: Option[String] = None,
-                        cursorOpt: Option[String] = None,
-                        tgtVertexIdOpt: Option[Any] = None,
-                        edgeTransformer: EdgeTransformer = EdgeTransformer(EdgeTransformer.DefaultJson),
-                        timeDecay: Option[TimeDecay] = None) {
+                      direction: String = "out",
+                      offset: Int = 0,
+                      limit: Int = S2Graph.DefaultFetchLimit,
+                      sample: Int = -1,
+                      maxAttempt: Int = 20,
+                      rpcTimeout: Int = 600000,
+                      cacheTTLInMillis: Long = -1L,
+                      indexName: String = LabelIndex.DefaultName,
+                      where: Try[Where] = Success(WhereParser.success),
+                      timestamp: Long = System.currentTimeMillis(),
+                      threshold: Double = Double.MinValue,
+                      rank: RankParam = RankParam.Default,
+                      intervalOpt: Option[((Seq[(String, JsValue)]), Seq[(String, JsValue)])] = None,
+                      durationOpt: Option[(Long, Long)] = None,
+                      exclude: Boolean = false,
+                      include: Boolean = false,
+                      has: Map[String, Any] = Map.empty,
+                      duplicatePolicy: DuplicatePolicy = DuplicatePolicy.First,
+                      includeDegree: Boolean = false,
+                      scorePropagateShrinkage: Long = 500L,
+                      scorePropagateOp: String = "multiply",
+                      shouldNormalize: Boolean = false,
+                      whereRawOpt: Option[String] = None,
+                      cursorOpt: Option[String] = None,
+                      tgtVertexIdOpt: Option[Any] = None,
+                      edgeTransformer: EdgeTransformer = EdgeTransformer(EdgeTransformer.DefaultJson),
+                      timeDecay: Option[TimeDecay] = None) {
+
   import JSONParser._
 
   //TODO: implement this.
@@ -447,6 +463,7 @@ object DuplicatePolicy extends Enumeration {
     }
   }
 }
+
 case class TimeDecay(initial: Double = 1.0,
                      lambda: Double = 0.1,
                      timeUnit: Double = 60 * 60 * 24,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 09f9c7c..2dc9f63 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -33,7 +33,8 @@ import org.apache.s2graph.core.storage.rocks.RocksStorage
 import org.apache.s2graph.core.storage.{MutateResponse, Storage}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies
+import org.apache.tinkerpop.gremlin.process.traversal.{P, TraversalStrategies}
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
 import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph}
 
 import scala.collection.JavaConversions._
@@ -58,7 +59,7 @@ object S2Graph {
     "hbase.table.name" -> "s2graph",
     "hbase.table.compression.algorithm" -> "gz",
     "phase" -> "dev",
-    "db.default.driver" ->  "org.h2.Driver",
+    "db.default.driver" -> "org.h2.Driver",
     "db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL",
     "db.default.password" -> "graph",
     "db.default.user" -> "graph",
@@ -110,7 +111,7 @@ object S2Graph {
     } {
       m.put(key, value)
     }
-    val config  = ConfigFactory.parseMap(m).withFallback(DefaultConfig)
+    val config = ConfigFactory.parseMap(m).withFallback(DefaultConfig)
     config
   }
 
@@ -134,7 +135,7 @@ object S2Graph {
 
     storageBackend match {
       case "hbase" =>
-        hbaseExecutor  =
+        hbaseExecutor =
           if (config.getString("hbase.zookeeper.quorum") == "localhost")
             AsynchbaseStorage.initLocalHBase(config)
           else
@@ -176,7 +177,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
 
   override val config = _config.withFallback(S2Graph.DefaultConfig)
 
-  val storageBackend = Try { config.getString("s2graph.storage.backend") }.getOrElse("hbase")
+  val storageBackend = Try {
+    config.getString("s2graph.storage.backend")
+  }.getOrElse("hbase")
 
   Model.apply(config)
   Model.loadCache()
@@ -260,6 +263,15 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
       localLongId.set(0l)
     }
 
+  def searchVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
+    val matchedVertices = indexProvider.fetchVertexIdsAsyncRaw(queryParam).map { vids =>
+      (queryParam.vertexIds ++ vids).distinct.map(vid => elementBuilder.newVertex(vid))
+    }
+
+    if (queryParam.fetchProp) matchedVertices.flatMap(getVertices)
+    else matchedVertices
+  }
+
   override def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = {
     val verticesWithIdx = vertices.zipWithIndex
     val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
@@ -289,7 +301,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
   override def mutateVertices(vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
     def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike],
                                          withWait: Boolean = false): Future[Seq[MutateResponse]] = {
-      val futures = vertices.map { vertex => storage.mutateVertex(zkQuorum, vertex, withWait) }
+      val futures = vertices.map { vertex =>
+        storage.mutateVertex(zkQuorum, vertex, withWait)
+      }
       Future.sequence(futures)
     }
 
@@ -297,7 +311,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
     val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
       mutateVertices(getStorage(service))(service.cluster, vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
     }
-    Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
+
+    Future.sequence(futures).flatMap { ls =>
+      indexProvider.mutateVerticesAsync(vertices).map { _ =>
+        ls.flatten.toSeq.sortBy(_._2).map(_._1)
+      }
+    }
   }
 
   override def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
index 10c9222..fbf76ef 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
@@ -25,12 +25,13 @@ import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType}
 import com.sksamuel.elastic4s.http.ElasticDsl._
 import com.sksamuel.elastic4s.http.HttpClient
 import com.typesafe.config.Config
+import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait.Predicate
 import org.apache.s2graph.core.io.Conversions
 import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.types.VertexId
-import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike}
+import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike, VertexQueryParam}
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import play.api.libs.json.Json
+import play.api.libs.json.{Json, Reads}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
@@ -38,6 +39,7 @@ import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.util.Try
 
 class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends IndexProvider {
+
   import GlobalIndex._
   import IndexProvider._
 
@@ -103,10 +105,10 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind
 
   override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = {
     val bulkRequests = vertices.flatMap { vertex =>
-        toFields(vertex, forceToIndex).toSeq.map { fields =>
-          update(vertex.id.toString()).in(new IndexAndType(GlobalIndex.VertexIndexName, GlobalIndex.TypeName)).docAsUpsert(fields)
-        }
+      toFields(vertex, forceToIndex).toSeq.map { fields =>
+        update(vertex.id.toString()).in(new IndexAndType(GlobalIndex.VertexIndexName, GlobalIndex.TypeName)).docAsUpsert(fields)
       }
+    }
 
     if (bulkRequests.isEmpty) Future.successful(vertices.map(_ => true))
     else {
@@ -149,61 +151,56 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind
     }
   }
 
-  override def fetchEdgeIds(hasContainers: util.List[HasContainer]): util.List[EdgeId] =
-    Await.result(fetchEdgeIdsAsync(hasContainers), WaitTime)
-
-  override def fetchEdgeIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[EdgeId]] = {
-    val field = eidField
-    val ids = new java.util.HashSet[EdgeId]
-
-    val queryString = buildQueryString(hasContainers)
+  private def fetchInner[T](queryString: String, indexKey: String, field: String, reads: Reads[T])(validate: (T => Boolean)): Future[util.List[T]] = {
+    val ids = new java.util.HashSet[T]
 
     client.execute {
-      search(GlobalIndex.EdgeIndexName).query(queryString)
+      search(indexKey).query(queryString)
     }.map { ret =>
       ret match {
         case Left(failure) =>
         case Right(results) =>
           results.result.hits.hits.foreach { searchHit =>
             searchHit.sourceAsMap.get(field).foreach { idValue =>
-              val id = Conversions.s2EdgeIdReads.reads(Json.parse(idValue.toString)).get
-
+              val id = reads.reads(Json.parse(idValue.toString)).get
               //TODO: Come up with better way to filter out hits with invalid meta.
-              EdgeId.isValid(id).foreach(ids.add)
+              if (validate(id)) ids.add(id)
             }
           }
       }
 
-      new util.ArrayList[EdgeId](ids)
+      new util.ArrayList(ids)
     }
   }
 
+  override def fetchEdgeIds(hasContainers: util.List[HasContainer]): util.List[EdgeId] =
+    Await.result(fetchEdgeIdsAsync(hasContainers), WaitTime)
+
+  override def fetchEdgeIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[EdgeId]] = {
+    val field = eidField
+
+    val queryString = buildQueryString(hasContainers)
+    fetchInner[EdgeId](queryString, GlobalIndex.EdgeIndexName, field, Conversions.s2EdgeIdReads)(e => EdgeId.isValid(e).isDefined)
+  }
 
   override def fetchVertexIds(hasContainers: util.List[HasContainer]): util.List[VertexId] =
     Await.result(fetchVertexIdsAsync(hasContainers), WaitTime)
 
   override def fetchVertexIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[VertexId]] = {
     val field = vidField
-    val ids = new java.util.HashSet[VertexId]
-
     val queryString = buildQueryString(hasContainers)
 
-    client.execute {
-      search(GlobalIndex.VertexIndexName).query(queryString)
-    }.map { ret =>
-      ret match {
-        case Left(failure) =>
-        case Right(results) =>
-          results.result.hits.hits.foreach { searchHit =>
-            searchHit.sourceAsMap.get(field).foreach { idValue =>
-              val id = Conversions.s2VertexIdReads.reads(Json.parse(idValue.toString)).get
-              //TODO: Come up with better way to filter out hits with invalid meta.
-              VertexId.isValid(id).foreach(ids.add)
-            }
-          }
-      }
+    fetchInner[VertexId](queryString, GlobalIndex.VertexIndexName, field, Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined)
+  }
+
+  override def fetchVertexIdsAsyncRaw(vertexQueryParam: VertexQueryParam): Future[util.List[VertexId]] = {
+    val field = vidField
+    val empty = new util.ArrayList[VertexId]()
 
-      new util.ArrayList[VertexId](ids)
+    vertexQueryParam.searchString match  {
+      case Some(queryString) =>
+        fetchInner[VertexId](queryString, GlobalIndex.VertexIndexName, field, Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined)
+      case None => Future.successful(empty)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index ae632df..ffbebf4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -121,6 +121,7 @@ trait IndexProvider {
 
   def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId]
   def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[java.util.List[VertexId]]
+  def fetchVertexIdsAsyncRaw(vertexQueryParam: VertexQueryParam): Future[java.util.List[VertexId]] = Future.successful(util.Arrays.asList())
 
   def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean]
   def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]]

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
index 841331d..68d481c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
@@ -19,6 +19,7 @@
 
 package org.apache.s2graph.core.index
 
+import java.io.File
 import java.util
 
 import com.typesafe.config.Config
@@ -26,20 +27,21 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer
 import org.apache.lucene.document.{Document, Field, StringField}
 import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig, Term}
 import org.apache.lucene.queryparser.classic.{ParseException, QueryParser}
-import org.apache.lucene.search.IndexSearcher
-import org.apache.lucene.store.{BaseDirectory, RAMDirectory}
+import org.apache.lucene.search.{IndexSearcher, Query}
+import org.apache.lucene.store.{BaseDirectory, RAMDirectory, SimpleFSDirectory}
 import org.apache.s2graph.core.io.Conversions
 import org.apache.s2graph.core.mysqls.GlobalIndex
 import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike}
+import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike, VertexQueryParam}
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import play.api.libs.json.Json
+import play.api.libs.json.{Json, Reads}
 
 import scala.concurrent.Future
 
 
 class LuceneIndexProvider(config: Config) extends IndexProvider {
+
   import GlobalIndex._
   import IndexProvider._
 
@@ -49,10 +51,19 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
   val analyzer = new StandardAnalyzer()
   val writers = mutable.Map.empty[String, IndexWriter]
   val directories = mutable.Map.empty[String, BaseDirectory]
+  val baseDirectory = scala.util.Try(config.getString("index.provider.base.dir")).getOrElse(".")
+
+  private def getOrElseDirectory(indexName: String): BaseDirectory = {
+    val pathname = s"${baseDirectory}/${indexName}"
+    val dir = directories.getOrElseUpdate(indexName, new SimpleFSDirectory(new File(pathname).toPath))
+
+    dir
+  }
 
   private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = {
     writers.getOrElseUpdate(indexName, {
-      val dir = directories.getOrElseUpdate(indexName, new RAMDirectory())
+      val dir = getOrElseDirectory(indexName)
+
       val indexConfig = new IndexWriterConfig(analyzer)
       new IndexWriter(dir, indexConfig)
     })
@@ -124,8 +135,9 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
     vertices.foreach { vertex =>
       toDocument(vertex, forceToIndex).foreach { doc =>
         val vId = vertex.id.toString()
+//        logger.error(s"DOC: ${doc}")
 
-        writer.updateDocument(new Term(IdField, vId), doc)
+        writer.updateDocument(new Term(vidField, vId), doc)
       }
     }
 
@@ -144,7 +156,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
       toDocument(edge, forceToIndex).foreach { doc =>
         val eId = edge.edgeId.toString
 
-        writer.updateDocument(new Term(IdField, eId), doc)
+        writer.updateDocument(new Term(eidField, eId), doc)
       }
     }
 
@@ -153,73 +165,84 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
     edges.map(_ => true)
   }
 
-  override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = {
-    val field = eidField
-    val ids = new java.util.HashSet[EdgeId]
-
-    val queryString = buildQueryString(hasContainers)
+  private def fetchInner[T](q: Query, indexKey: String, field: String, reads: Reads[T]): util.List[T] = {
+    val ids = new java.util.HashSet[T]
+    var reader: DirectoryReader = null
 
     try {
-      val q = new QueryParser(field, analyzer).parse(queryString)
-
-      val reader = DirectoryReader.open(directories(GlobalIndex.EdgeIndexName))
+      val reader = DirectoryReader.open(getOrElseDirectory(indexKey))
       val searcher = new IndexSearcher(reader)
 
       val docs = searcher.search(q, hitsPerPage)
+      logger.error(s"total hit: ${docs.scoreDocs.length}")
 
       docs.scoreDocs.foreach { scoreDoc =>
         val document = searcher.doc(scoreDoc.doc)
-        val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get
-        ids.add(id);
-      }
+        logger.error(s"DOC_IN_L: ${document.toString}")
 
-      reader.close()
-      ids
+        val id = reads.reads(Json.parse(document.get(field))).get
+        ids.add(id)
+      }
     } catch {
-      case ex: ParseException =>
-        logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
-        ids
+      case e: org.apache.lucene.index.IndexNotFoundException => logger.info("Index file not found.")
+    } finally {
+      if (reader != null) reader.close()
     }
 
-    new util.ArrayList[EdgeId](ids)
+    new util.ArrayList[T](ids)
   }
 
   override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] = {
     val field = vidField
-    val ids = new java.util.HashSet[VertexId]
     val queryString = buildQueryString(hasContainers)
 
     try {
       val q = new QueryParser(field, analyzer).parse(queryString)
+      fetchInner[VertexId](q, GlobalIndex.VertexIndexName, vidField, Conversions.s2VertexIdReads)
+    } catch {
+      case ex: ParseException =>
+        logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
+        util.Arrays.asList[VertexId]()
+    }
+  }
 
-      val reader = DirectoryReader.open(directories(GlobalIndex.VertexIndexName))
-      val searcher = new IndexSearcher(reader)
-
-      val docs = searcher.search(q, hitsPerPage)
-
-      docs.scoreDocs.foreach { scoreDoc =>
-        val document = searcher.doc(scoreDoc.doc)
-        val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get
-        ids.add(id)
-      }
+  override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = {
+    val field = eidField
+    val queryString = buildQueryString(hasContainers)
 
-      reader.close()
-      ids
+    try {
+      val q = new QueryParser(field, analyzer).parse(queryString)
+      fetchInner[EdgeId](q, GlobalIndex.EdgeIndexName, field, Conversions.s2EdgeIdReads)
     } catch {
       case ex: ParseException =>
         logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
-        ids
+        util.Arrays.asList[EdgeId]()
     }
+  }
+
+  override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers))
 
-    new util.ArrayList[VertexId](ids)
+  override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers))
+
+  override def fetchVertexIdsAsyncRaw(vertexQueryParam: VertexQueryParam): Future[util.List[VertexId]] = {
+    val ret = vertexQueryParam.searchString.fold(util.Arrays.asList[VertexId]()) { queryString =>
+      val field = vidField
+      try {
+        val q = new QueryParser(field, analyzer).parse(queryString)
+        fetchInner[VertexId](q, GlobalIndex.VertexIndexName, vidField, Conversions.s2VertexIdReads)
+      } catch {
+        case ex: ParseException =>
+          logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
+          util.Arrays.asList[VertexId]()
+      }
+    }
+
+    Future.successful(ret)
   }
 
   override def shutdown(): Unit = {
     writers.foreach { case (_, writer) => writer.close() }
   }
 
-  override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers))
-
-  override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers))
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
index 4889412..a5349dc 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
@@ -29,6 +29,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper
 import org.apache.tinkerpop.gremlin.structure.T
 
 import scala.collection.JavaConversions._
+import scala.concurrent._
+import scala.concurrent.duration._
 
 class IndexProviderTest extends IntegrateCommon {
   import scala.concurrent.ExecutionContext.Implicits.global
@@ -74,14 +76,19 @@ class IndexProviderTest extends IntegrateCommon {
     Thread.sleep(1000)
 
     (0 until numOfTry).foreach { ith =>
-      val hasContainer = new HasContainer(indexPropsColumnMeta.name, P.eq(Long.box(1)))
-
-      var ids = indexProvider.fetchVertexIds(Seq(hasContainer))
-      ids.head shouldBe vertex.id
-
-      ids.foreach { id =>
-        println(s"[Id]: $id")
-      }
+//      val hasContainer = new HasContainer(indexPropsColumnMeta.name, P.eq(Long.box(1)))
+      val hasContainer = new HasContainer(GlobalIndex.serviceColumnField, P.eq(testColumn.columnName))
+
+      val f = graph.searchVertices(VertexQueryParam(0, 100, Option(s"${GlobalIndex.serviceColumnField}:${testColumn.columnName}")))
+      val a = Await.result(f, Duration("60 sec"))
+      println(a)
+
+//      var ids = indexProvider.fetchVertexIds(Seq(hasContainer))
+//      ids.head shouldBe vertex.id
+//
+//      ids.foreach { id =>
+//        println(s"[Id]: $id")
+//      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
----------------------------------------------------------------------
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
index c7ffae1..1f53c76 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
@@ -30,13 +30,14 @@ import org.slf4j.{Logger, LoggerFactory}
 import sangria.execution.deferred._
 import sangria.schema._
 
+import scala.collection.immutable
 import scala.concurrent._
 import scala.util.{Failure, Success, Try}
 
 object GraphRepository {
 
-  implicit val vertexHasId = new HasId[S2VertexLike, S2VertexLike] {
-    override def id(value: S2VertexLike): S2VertexLike = value
+  implicit val vertexHasId = new HasId[(VertexQueryParam, Seq[S2VertexLike]), VertexQueryParam] {
+    override def id(value: (VertexQueryParam, Seq[S2VertexLike])): VertexQueryParam = value._1
   }
 
   implicit val edgeHasId = new HasId[(S2VertexLike, QueryParam, Seq[S2EdgeLike]), DeferFetchEdges] {
@@ -44,9 +45,12 @@ object GraphRepository {
       DeferFetchEdges(value._1, value._2)
   }
 
-  val vertexFetcher = Fetcher((ctx: GraphRepository, ids: Seq[S2VertexLike]) => {
-    ctx.getVertices(ids)
-  })
+  val vertexFetcher =
+    Fetcher((ctx: GraphRepository, ids: Seq[VertexQueryParam]) => {
+      implicit val ec = ctx.ec
+
+      Future.traverse(ids)(ctx.getVertices).map(vs => ids.zip(vs))
+    })
 
   val edgeFetcher = Fetcher((ctx: GraphRepository, ids: Seq[DeferFetchEdges]) => {
     implicit val ec = ctx.ec
@@ -58,7 +62,7 @@ object GraphRepository {
     }
 
     val f: Future[Iterable[(QueryParam, Seq[S2EdgeLike])]] = Future.sequence(edgesByParam)
-    val grouped = f.map { tpLs =>
+    val grouped: Future[Seq[(S2VertexLike, QueryParam, Seq[S2EdgeLike])]] = f.map { tpLs =>
       tpLs.toSeq.flatMap { case (qp, edges) =>
         edges.groupBy(_.srcForVertex).map { case (v, edges) => (v, qp, edges) }
       }
@@ -121,8 +125,8 @@ class GraphRepository(val graph: S2GraphLike) {
     graph.mutateEdges(edges, withWait = true)
   }
 
-  def getVertices(vertex: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = {
-    graph.getVertices(vertex)
+  def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
+    graph.asInstanceOf[S2Graph].searchVertices(queryParam)
   }
 
   def getEdges(vertices: Seq[S2VertexLike], queryParam: QueryParam): Future[Seq[S2EdgeLike]] = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
----------------------------------------------------------------------
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
index 4f092dd..64650d3 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
@@ -53,6 +53,8 @@ object FieldResolver {
     val ids = c.argOpt[Any]("id").toSeq ++ c.argOpt[List[Any]]("ids").toList.flatten
     val vertices = ids.map(vid => c.ctx.toS2VertexLike(vid, column))
 
+    val search = c.argOpt[String]("search")
+
     val columnFields = column.metasInvMap.keySet
     val selectedFields = AstHelper.selectedFields(c.astFields)
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
----------------------------------------------------------------------
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
index 9066d21..4ba6680 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
@@ -138,15 +138,31 @@ object S2Type {
         ListType(ColumnType),
         arguments = List(
           Argument("id", OptionInputType(toScalarType(column.columnType))),
-          Argument("ids", OptionInputType(ListInputType(toScalarType(column.columnType))))
+          Argument("ids", OptionInputType(ListInputType(toScalarType(column.columnType)))),
+          Argument("search", OptionInputType(StringType))
         ),
         description = Option("desc here"),
         resolve = c => {
           implicit val ec = c.ctx.ec
           val (vertices, canSkipFetchVertex) = FieldResolver.serviceColumnOnService(column, c)
+          val searchOpt = c.argOpt[String]("search").map { qs =>
+            val prefix = s"${GlobalIndex.serviceField}:${service.serviceName} AND ${GlobalIndex.serviceColumnField}:${column.columnName}"
+            if (qs.trim.nonEmpty) Seq(prefix, qs).mkString(" AND ")
+            else prefix
+            qs
+          }
+
+          println(searchOpt)
+
+          val vertexQueryParam = VertexQueryParam(0, 100, searchOpt, vertices.map(_.id))
+
+//          if (canSkipFetchVertex) Future.successful(vertices)
+//          else GraphRepository.vertexFetcher.deferSeq(deferVertices)
+
+//          val empty = Seq.empty[S2VertexLike]
+//          DeferredValue(GraphRepository.vertexFetcher.defer(vertexQueryParam)).map(m => m._2)
 
-          if (canSkipFetchVertex) Future.successful(vertices)
-          else GraphRepository.vertexFetcher.deferSeq(vertices)
+            c.ctx.getVertices(vertexQueryParam)
         }
       ): Field[GraphRepository, Any]
     }
@@ -173,8 +189,9 @@ object S2Type {
       implicit val ec = c.ctx.ec
       val (vertex, canSkipFetchVertex) = FieldResolver.serviceColumnOnLabel(c)
 
-      if (canSkipFetchVertex) Future.successful(vertex)
-      else GraphRepository.vertexFetcher.defer(vertex)
+      //      if (canSkipFetchVertex) Future.successful(vertex)
+      //      else GraphRepository.vertexFetcher.defer(vertex)
+      Future.successful(vertex)
     })
 
     lazy val EdgeType = ObjectType(