You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/23 04:29:34 UTC
[2/8] incubator-s2graph git commit: add search vertices
add search vertices
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/ba938bc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/ba938bc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/ba938bc0
Branch: refs/heads/master
Commit: ba938bc087898ba6221bbb53ea62e0ec12185fe3
Parents: 3ac20b4
Author: daewon <da...@apache.org>
Authored: Thu Apr 19 18:13:34 2018 +0900
Committer: daewon <da...@apache.org>
Committed: Thu Apr 19 18:13:34 2018 +0900
----------------------------------------------------------------------
.../org/apache/s2graph/core/QueryParam.scala | 91 +++++++++-------
.../scala/org/apache/s2graph/core/S2Graph.scala | 33 ++++--
.../s2graph/core/index/ESIndexProvider.scala | 67 ++++++------
.../s2graph/core/index/IndexProvider.scala | 1 +
.../core/index/LuceneIndexProvider.scala | 107 +++++++++++--------
.../s2graph/core/index/IndexProviderTest.scala | 23 ++--
.../graphql/repository/GraphRepository.scala | 20 ++--
.../s2graph/graphql/types/FieldResolver.scala | 2 +
.../apache/s2graph/graphql/types/S2Type.scala | 27 ++++-
9 files changed, 229 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 748e8c5..faf04db 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -37,9 +37,11 @@ import scala.util.{Success, Try}
object Query {
val initialScore = 1.0
lazy val empty = Query()
+
def apply(query: Query): Query = {
Query(query.vertices, query.steps, query.queryOption, query.jsonQuery)
}
+
def toQuery(srcVertices: Seq[S2VertexLike], queryParams: Seq[QueryParam]) = Query(srcVertices, Vector(Step(queryParams)))
}
@@ -49,9 +51,10 @@ case class MinShouldMatchParam(prop: String, count: Int, terms: Set[Any])
object GroupBy {
val Empty = GroupBy()
}
+
case class GroupBy(keys: Seq[String] = Nil,
limit: Int = Int.MaxValue,
- minShouldMatch: Option[MinShouldMatchParam]= None)
+ minShouldMatch: Option[MinShouldMatchParam] = None)
case class MultiQuery(queries: Seq[Query],
weights: Seq[Double],
@@ -79,12 +82,12 @@ case class QueryOption(removeCycle: Boolean = false,
ignorePrevStepCache: Boolean = false) {
val orderByKeys = orderByColumns.map(_._1)
val ascendingVals = orderByColumns.map(_._2)
- val selectColumnsMap = selectColumns.map { c => c -> true } .toMap
+ val selectColumnsMap = selectColumns.map { c => c -> true }.toMap
val scoreFieldIdx = orderByKeys.zipWithIndex.find(t => t._1 == "score").map(_._2).getOrElse(-1)
val (edgeSelectColumns, propsSelectColumns) = selectColumns.partition(c => LabelMeta.defaultRequiredMetaNames.contains(c))
/** */
val edgeSelectColumnsFiltered = edgeSelectColumns
-// val edgeSelectColumnsFiltered = edgeSelectColumns.filterNot(c => groupBy.keys.contains(c))
+ // val edgeSelectColumnsFiltered = edgeSelectColumns.filterNot(c => groupBy.keys.contains(c))
lazy val cacheKeyBytes: Array[Byte] = {
val selectBytes = Bytes.toBytes(selectColumns.toString)
val groupBytes = Bytes.toBytes(groupBy.keys.toString)
@@ -118,9 +121,10 @@ object EdgeTransformer {
}
/**
- * TODO: step wise outputFields should be used with nextStepLimit, nextStepThreshold.
- * @param jsValue
- */
+ * TODO: step wise outputFields should be used with nextStepLimit, nextStepThreshold.
+ *
+ * @param jsValue
+ */
case class EdgeTransformer(jsValue: JsValue) {
val Delimiter = "\\$"
val targets = jsValue.asOpt[List[Vector[String]]].toList
@@ -204,13 +208,15 @@ case class Step(queryParams: Seq[QueryParam],
cacheTTL: Long = -1,
groupBy: GroupBy = GroupBy.Empty) {
-// lazy val excludes = queryParams.filter(_.exclude)
-// lazy val includes = queryParams.filterNot(_.exclude)
-// lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap
+ // lazy val excludes = queryParams.filter(_.exclude)
+ // lazy val includes = queryParams.filterNot(_.exclude)
+ // lazy val excludeIds = excludes.map(x => x.labelWithDir.labelId -> true).toMap
lazy val cacheKeyBytes = queryParams.map(_.toCacheKeyRaw(Array.empty[Byte])).foldLeft(Array.empty[Byte])(Bytes.add)
+
def toCacheKey(lss: Seq[Long]): Long = Hashing.murmur3_128().hashBytes(toCacheKeyRaw(lss)).asLong()
-// MurmurHash3.bytesHash(toCacheKeyRaw(lss))
+
+ // MurmurHash3.bytesHash(toCacheKeyRaw(lss))
def toCacheKeyRaw(lss: Seq[Long]): Array[Byte] = {
var bytes = Array.empty[Byte]
@@ -277,6 +283,7 @@ object QueryParam {
val Delimiter = ","
val maxMetaByte = (-1).toByte
val fillArray = Array.fill(100)(maxMetaByte)
+
import scala.collection.JavaConverters._
def apply(labelWithDirection: LabelWithDirection): QueryParam = {
@@ -285,34 +292,43 @@ object QueryParam {
QueryParam(labelName = label.label, direction = direction)
}
}
+
+case class VertexQueryParam(offset: Int,
+ limit: Int,
+ searchString: Option[String],
+ vertexIds: Seq[VertexId] = Nil,
+ fetchProp: Boolean = true) {
+}
+
case class QueryParam(labelName: String,
- direction: String = "out",
- offset: Int = 0,
- limit: Int = S2Graph.DefaultFetchLimit,
- sample: Int = -1,
- maxAttempt: Int = 20,
- rpcTimeout: Int = 600000,
- cacheTTLInMillis: Long = -1L,
- indexName: String = LabelIndex.DefaultName,
- where: Try[Where] = Success(WhereParser.success),
- timestamp: Long = System.currentTimeMillis(),
- threshold: Double = Double.MinValue,
- rank: RankParam = RankParam.Default,
- intervalOpt: Option[((Seq[(String, JsValue)]), Seq[(String, JsValue)])] = None,
- durationOpt: Option[(Long, Long)] = None,
- exclude: Boolean = false,
- include: Boolean = false,
- has: Map[String, Any] = Map.empty,
- duplicatePolicy: DuplicatePolicy = DuplicatePolicy.First,
- includeDegree: Boolean = false,
- scorePropagateShrinkage: Long = 500L,
- scorePropagateOp: String = "multiply",
- shouldNormalize: Boolean = false,
- whereRawOpt: Option[String] = None,
- cursorOpt: Option[String] = None,
- tgtVertexIdOpt: Option[Any] = None,
- edgeTransformer: EdgeTransformer = EdgeTransformer(EdgeTransformer.DefaultJson),
- timeDecay: Option[TimeDecay] = None) {
+ direction: String = "out",
+ offset: Int = 0,
+ limit: Int = S2Graph.DefaultFetchLimit,
+ sample: Int = -1,
+ maxAttempt: Int = 20,
+ rpcTimeout: Int = 600000,
+ cacheTTLInMillis: Long = -1L,
+ indexName: String = LabelIndex.DefaultName,
+ where: Try[Where] = Success(WhereParser.success),
+ timestamp: Long = System.currentTimeMillis(),
+ threshold: Double = Double.MinValue,
+ rank: RankParam = RankParam.Default,
+ intervalOpt: Option[((Seq[(String, JsValue)]), Seq[(String, JsValue)])] = None,
+ durationOpt: Option[(Long, Long)] = None,
+ exclude: Boolean = false,
+ include: Boolean = false,
+ has: Map[String, Any] = Map.empty,
+ duplicatePolicy: DuplicatePolicy = DuplicatePolicy.First,
+ includeDegree: Boolean = false,
+ scorePropagateShrinkage: Long = 500L,
+ scorePropagateOp: String = "multiply",
+ shouldNormalize: Boolean = false,
+ whereRawOpt: Option[String] = None,
+ cursorOpt: Option[String] = None,
+ tgtVertexIdOpt: Option[Any] = None,
+ edgeTransformer: EdgeTransformer = EdgeTransformer(EdgeTransformer.DefaultJson),
+ timeDecay: Option[TimeDecay] = None) {
+
import JSONParser._
//TODO: implement this.
@@ -447,6 +463,7 @@ object DuplicatePolicy extends Enumeration {
}
}
}
+
case class TimeDecay(initial: Double = 1.0,
lambda: Double = 0.1,
timeUnit: Double = 60 * 60 * 24,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 09f9c7c..2dc9f63 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -33,7 +33,8 @@ import org.apache.s2graph.core.storage.rocks.RocksStorage
import org.apache.s2graph.core.storage.{MutateResponse, Storage}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies
+import org.apache.tinkerpop.gremlin.process.traversal.{P, TraversalStrategies}
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph}
import scala.collection.JavaConversions._
@@ -58,7 +59,7 @@ object S2Graph {
"hbase.table.name" -> "s2graph",
"hbase.table.compression.algorithm" -> "gz",
"phase" -> "dev",
- "db.default.driver" -> "org.h2.Driver",
+ "db.default.driver" -> "org.h2.Driver",
"db.default.url" -> "jdbc:h2:file:./var/metastore;MODE=MYSQL",
"db.default.password" -> "graph",
"db.default.user" -> "graph",
@@ -110,7 +111,7 @@ object S2Graph {
} {
m.put(key, value)
}
- val config = ConfigFactory.parseMap(m).withFallback(DefaultConfig)
+ val config = ConfigFactory.parseMap(m).withFallback(DefaultConfig)
config
}
@@ -134,7 +135,7 @@ object S2Graph {
storageBackend match {
case "hbase" =>
- hbaseExecutor =
+ hbaseExecutor =
if (config.getString("hbase.zookeeper.quorum") == "localhost")
AsynchbaseStorage.initLocalHBase(config)
else
@@ -176,7 +177,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
override val config = _config.withFallback(S2Graph.DefaultConfig)
- val storageBackend = Try { config.getString("s2graph.storage.backend") }.getOrElse("hbase")
+ val storageBackend = Try {
+ config.getString("s2graph.storage.backend")
+ }.getOrElse("hbase")
Model.apply(config)
Model.loadCache()
@@ -260,6 +263,15 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
localLongId.set(0l)
}
+ def searchVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
+ val matchedVertices = indexProvider.fetchVertexIdsAsyncRaw(queryParam).map { vids =>
+ (queryParam.vertexIds ++ vids).distinct.map(vid => elementBuilder.newVertex(vid))
+ }
+
+ if (queryParam.fetchProp) matchedVertices.flatMap(getVertices)
+ else matchedVertices
+ }
+
override def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = {
val verticesWithIdx = vertices.zipWithIndex
val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
@@ -289,7 +301,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
override def mutateVertices(vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike],
withWait: Boolean = false): Future[Seq[MutateResponse]] = {
- val futures = vertices.map { vertex => storage.mutateVertex(zkQuorum, vertex, withWait) }
+ val futures = vertices.map { vertex =>
+ storage.mutateVertex(zkQuorum, vertex, withWait)
+ }
Future.sequence(futures)
}
@@ -297,7 +311,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
mutateVertices(getStorage(service))(service.cluster, vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
}
- Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
+
+ Future.sequence(futures).flatMap { ls =>
+ indexProvider.mutateVerticesAsync(vertices).map { _ =>
+ ls.flatten.toSeq.sortBy(_._2).map(_._1)
+ }
+ }
}
override def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
index 10c9222..fbf76ef 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/ESIndexProvider.scala
@@ -25,12 +25,13 @@ import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType}
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.HttpClient
import com.typesafe.config.Config
+import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait.Predicate
import org.apache.s2graph.core.io.Conversions
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.types.VertexId
-import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike}
+import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike, VertexQueryParam}
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import play.api.libs.json.Json
+import play.api.libs.json.{Json, Reads}
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
@@ -38,6 +39,7 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Try
class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends IndexProvider {
+
import GlobalIndex._
import IndexProvider._
@@ -103,10 +105,10 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind
override def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]] = {
val bulkRequests = vertices.flatMap { vertex =>
- toFields(vertex, forceToIndex).toSeq.map { fields =>
- update(vertex.id.toString()).in(new IndexAndType(GlobalIndex.VertexIndexName, GlobalIndex.TypeName)).docAsUpsert(fields)
- }
+ toFields(vertex, forceToIndex).toSeq.map { fields =>
+ update(vertex.id.toString()).in(new IndexAndType(GlobalIndex.VertexIndexName, GlobalIndex.TypeName)).docAsUpsert(fields)
}
+ }
if (bulkRequests.isEmpty) Future.successful(vertices.map(_ => true))
else {
@@ -149,61 +151,56 @@ class ESIndexProvider(config: Config)(implicit ec: ExecutionContext) extends Ind
}
}
- override def fetchEdgeIds(hasContainers: util.List[HasContainer]): util.List[EdgeId] =
- Await.result(fetchEdgeIdsAsync(hasContainers), WaitTime)
-
- override def fetchEdgeIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[EdgeId]] = {
- val field = eidField
- val ids = new java.util.HashSet[EdgeId]
-
- val queryString = buildQueryString(hasContainers)
+ private def fetchInner[T](queryString: String, indexKey: String, field: String, reads: Reads[T])(validate: (T => Boolean)): Future[util.List[T]] = {
+ val ids = new java.util.HashSet[T]
client.execute {
- search(GlobalIndex.EdgeIndexName).query(queryString)
+ search(indexKey).query(queryString)
}.map { ret =>
ret match {
case Left(failure) =>
case Right(results) =>
results.result.hits.hits.foreach { searchHit =>
searchHit.sourceAsMap.get(field).foreach { idValue =>
- val id = Conversions.s2EdgeIdReads.reads(Json.parse(idValue.toString)).get
-
+ val id = reads.reads(Json.parse(idValue.toString)).get
//TODO: Come up with better way to filter out hits with invalid meta.
- EdgeId.isValid(id).foreach(ids.add)
+ if (validate(id)) ids.add(id)
}
}
}
- new util.ArrayList[EdgeId](ids)
+ new util.ArrayList(ids)
}
}
+ override def fetchEdgeIds(hasContainers: util.List[HasContainer]): util.List[EdgeId] =
+ Await.result(fetchEdgeIdsAsync(hasContainers), WaitTime)
+
+ override def fetchEdgeIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[EdgeId]] = {
+ val field = eidField
+
+ val queryString = buildQueryString(hasContainers)
+ fetchInner[EdgeId](queryString, GlobalIndex.EdgeIndexName, field, Conversions.s2EdgeIdReads)(e => EdgeId.isValid(e).isDefined)
+ }
override def fetchVertexIds(hasContainers: util.List[HasContainer]): util.List[VertexId] =
Await.result(fetchVertexIdsAsync(hasContainers), WaitTime)
override def fetchVertexIdsAsync(hasContainers: util.List[HasContainer]): Future[util.List[VertexId]] = {
val field = vidField
- val ids = new java.util.HashSet[VertexId]
-
val queryString = buildQueryString(hasContainers)
- client.execute {
- search(GlobalIndex.VertexIndexName).query(queryString)
- }.map { ret =>
- ret match {
- case Left(failure) =>
- case Right(results) =>
- results.result.hits.hits.foreach { searchHit =>
- searchHit.sourceAsMap.get(field).foreach { idValue =>
- val id = Conversions.s2VertexIdReads.reads(Json.parse(idValue.toString)).get
- //TODO: Come up with better way to filter out hits with invalid meta.
- VertexId.isValid(id).foreach(ids.add)
- }
- }
- }
+ fetchInner[VertexId](queryString, GlobalIndex.VertexIndexName, field, Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined)
+ }
+
+ override def fetchVertexIdsAsyncRaw(vertexQueryParam: VertexQueryParam): Future[util.List[VertexId]] = {
+ val field = vidField
+ val empty = new util.ArrayList[VertexId]()
- new util.ArrayList[VertexId](ids)
+ vertexQueryParam.searchString match {
+ case Some(queryString) =>
+ fetchInner[VertexId](queryString, GlobalIndex.VertexIndexName, field, Conversions.s2VertexIdReads)(v => VertexId.isValid(v).isDefined)
+ case None => Future.successful(empty)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index ae632df..ffbebf4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -121,6 +121,7 @@ trait IndexProvider {
def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId]
def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[java.util.List[VertexId]]
+ def fetchVertexIdsAsyncRaw(vertexQueryParam: VertexQueryParam): Future[java.util.List[VertexId]] = Future.successful(util.Arrays.asList())
def mutateVertices(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Seq[Boolean]
def mutateVerticesAsync(vertices: Seq[S2VertexLike], forceToIndex: Boolean = false): Future[Seq[Boolean]]
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
index 841331d..68d481c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/LuceneIndexProvider.scala
@@ -19,6 +19,7 @@
package org.apache.s2graph.core.index
+import java.io.File
import java.util
import com.typesafe.config.Config
@@ -26,20 +27,21 @@ import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.document.{Document, Field, StringField}
import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig, Term}
import org.apache.lucene.queryparser.classic.{ParseException, QueryParser}
-import org.apache.lucene.search.IndexSearcher
-import org.apache.lucene.store.{BaseDirectory, RAMDirectory}
+import org.apache.lucene.search.{IndexSearcher, Query}
+import org.apache.lucene.store.{BaseDirectory, RAMDirectory, SimpleFSDirectory}
import org.apache.s2graph.core.io.Conversions
import org.apache.s2graph.core.mysqls.GlobalIndex
import org.apache.s2graph.core.types.VertexId
import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike}
+import org.apache.s2graph.core.{EdgeId, S2EdgeLike, S2VertexLike, VertexQueryParam}
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import play.api.libs.json.Json
+import play.api.libs.json.{Json, Reads}
import scala.concurrent.Future
class LuceneIndexProvider(config: Config) extends IndexProvider {
+
import GlobalIndex._
import IndexProvider._
@@ -49,10 +51,19 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
val analyzer = new StandardAnalyzer()
val writers = mutable.Map.empty[String, IndexWriter]
val directories = mutable.Map.empty[String, BaseDirectory]
+ val baseDirectory = scala.util.Try(config.getString("index.provider.base.dir")).getOrElse(".")
+
+ private def getOrElseDirectory(indexName: String): BaseDirectory = {
+ val pathname = s"${baseDirectory}/${indexName}"
+ val dir = directories.getOrElseUpdate(indexName, new SimpleFSDirectory(new File(pathname).toPath))
+
+ dir
+ }
private def getOrElseCreateIndexWriter(indexName: String): IndexWriter = {
writers.getOrElseUpdate(indexName, {
- val dir = directories.getOrElseUpdate(indexName, new RAMDirectory())
+ val dir = getOrElseDirectory(indexName)
+
val indexConfig = new IndexWriterConfig(analyzer)
new IndexWriter(dir, indexConfig)
})
@@ -124,8 +135,9 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
vertices.foreach { vertex =>
toDocument(vertex, forceToIndex).foreach { doc =>
val vId = vertex.id.toString()
+// logger.error(s"DOC: ${doc}")
- writer.updateDocument(new Term(IdField, vId), doc)
+ writer.updateDocument(new Term(vidField, vId), doc)
}
}
@@ -144,7 +156,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
toDocument(edge, forceToIndex).foreach { doc =>
val eId = edge.edgeId.toString
- writer.updateDocument(new Term(IdField, eId), doc)
+ writer.updateDocument(new Term(eidField, eId), doc)
}
}
@@ -153,73 +165,84 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
edges.map(_ => true)
}
- override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = {
- val field = eidField
- val ids = new java.util.HashSet[EdgeId]
-
- val queryString = buildQueryString(hasContainers)
+ private def fetchInner[T](q: Query, indexKey: String, field: String, reads: Reads[T]): util.List[T] = {
+ val ids = new java.util.HashSet[T]
+ var reader: DirectoryReader = null
try {
- val q = new QueryParser(field, analyzer).parse(queryString)
-
- val reader = DirectoryReader.open(directories(GlobalIndex.EdgeIndexName))
+ val reader = DirectoryReader.open(getOrElseDirectory(indexKey))
val searcher = new IndexSearcher(reader)
val docs = searcher.search(q, hitsPerPage)
+ logger.error(s"total hit: ${docs.scoreDocs.length}")
docs.scoreDocs.foreach { scoreDoc =>
val document = searcher.doc(scoreDoc.doc)
- val id = Conversions.s2EdgeIdReads.reads(Json.parse(document.get(field))).get
- ids.add(id);
- }
+ logger.error(s"DOC_IN_L: ${document.toString}")
- reader.close()
- ids
+ val id = reads.reads(Json.parse(document.get(field))).get
+ ids.add(id)
+ }
} catch {
- case ex: ParseException =>
- logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
- ids
+ case e: org.apache.lucene.index.IndexNotFoundException => logger.info("Index file not found.")
+ } finally {
+ if (reader != null) reader.close()
}
- new util.ArrayList[EdgeId](ids)
+ new util.ArrayList[T](ids)
}
override def fetchVertexIds(hasContainers: java.util.List[HasContainer]): java.util.List[VertexId] = {
val field = vidField
- val ids = new java.util.HashSet[VertexId]
val queryString = buildQueryString(hasContainers)
try {
val q = new QueryParser(field, analyzer).parse(queryString)
+ fetchInner[VertexId](q, GlobalIndex.VertexIndexName, vidField, Conversions.s2VertexIdReads)
+ } catch {
+ case ex: ParseException =>
+ logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
+ util.Arrays.asList[VertexId]()
+ }
+ }
- val reader = DirectoryReader.open(directories(GlobalIndex.VertexIndexName))
- val searcher = new IndexSearcher(reader)
-
- val docs = searcher.search(q, hitsPerPage)
-
- docs.scoreDocs.foreach { scoreDoc =>
- val document = searcher.doc(scoreDoc.doc)
- val id = Conversions.s2VertexIdReads.reads(Json.parse(document.get(field))).get
- ids.add(id)
- }
+ override def fetchEdgeIds(hasContainers: java.util.List[HasContainer]): java.util.List[EdgeId] = {
+ val field = eidField
+ val queryString = buildQueryString(hasContainers)
- reader.close()
- ids
+ try {
+ val q = new QueryParser(field, analyzer).parse(queryString)
+ fetchInner[EdgeId](q, GlobalIndex.EdgeIndexName, field, Conversions.s2EdgeIdReads)
} catch {
case ex: ParseException =>
logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
- ids
+ util.Arrays.asList[EdgeId]()
}
+ }
+
+ override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers))
- new util.ArrayList[VertexId](ids)
+ override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers))
+
+ override def fetchVertexIdsAsyncRaw(vertexQueryParam: VertexQueryParam): Future[util.List[VertexId]] = {
+ val ret = vertexQueryParam.searchString.fold(util.Arrays.asList[VertexId]()) { queryString =>
+ val field = vidField
+ try {
+ val q = new QueryParser(field, analyzer).parse(queryString)
+ fetchInner[VertexId](q, GlobalIndex.VertexIndexName, vidField, Conversions.s2VertexIdReads)
+ } catch {
+ case ex: ParseException =>
+ logger.error(s"[IndexProvider]: ${queryString} parse failed.", ex)
+ util.Arrays.asList[VertexId]()
+ }
+ }
+
+ Future.successful(ret)
}
override def shutdown(): Unit = {
writers.foreach { case (_, writer) => writer.close() }
}
- override def fetchEdgeIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[EdgeId]] = Future.successful(fetchEdgeIds(hasContainers))
-
- override def fetchVertexIdsAsync(hasContainers: java.util.List[HasContainer]): Future[util.List[VertexId]] = Future.successful(fetchVertexIds(hasContainers))
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
index 4889412..a5349dc 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
@@ -29,6 +29,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper
import org.apache.tinkerpop.gremlin.structure.T
import scala.collection.JavaConversions._
+import scala.concurrent._
+import scala.concurrent.duration._
class IndexProviderTest extends IntegrateCommon {
import scala.concurrent.ExecutionContext.Implicits.global
@@ -74,14 +76,19 @@ class IndexProviderTest extends IntegrateCommon {
Thread.sleep(1000)
(0 until numOfTry).foreach { ith =>
- val hasContainer = new HasContainer(indexPropsColumnMeta.name, P.eq(Long.box(1)))
-
- var ids = indexProvider.fetchVertexIds(Seq(hasContainer))
- ids.head shouldBe vertex.id
-
- ids.foreach { id =>
- println(s"[Id]: $id")
- }
+// val hasContainer = new HasContainer(indexPropsColumnMeta.name, P.eq(Long.box(1)))
+ val hasContainer = new HasContainer(GlobalIndex.serviceColumnField, P.eq(testColumn.columnName))
+
+ val f = graph.searchVertices(VertexQueryParam(0, 100, Option(s"${GlobalIndex.serviceColumnField}:${testColumn.columnName}")))
+ val a = Await.result(f, Duration("60 sec"))
+ println(a)
+
+// var ids = indexProvider.fetchVertexIds(Seq(hasContainer))
+// ids.head shouldBe vertex.id
+//
+// ids.foreach { id =>
+// println(s"[Id]: $id")
+// }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
----------------------------------------------------------------------
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
index c7ffae1..1f53c76 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/repository/GraphRepository.scala
@@ -30,13 +30,14 @@ import org.slf4j.{Logger, LoggerFactory}
import sangria.execution.deferred._
import sangria.schema._
+import scala.collection.immutable
import scala.concurrent._
import scala.util.{Failure, Success, Try}
object GraphRepository {
- implicit val vertexHasId = new HasId[S2VertexLike, S2VertexLike] {
- override def id(value: S2VertexLike): S2VertexLike = value
+ implicit val vertexHasId = new HasId[(VertexQueryParam, Seq[S2VertexLike]), VertexQueryParam] {
+ override def id(value: (VertexQueryParam, Seq[S2VertexLike])): VertexQueryParam = value._1
}
implicit val edgeHasId = new HasId[(S2VertexLike, QueryParam, Seq[S2EdgeLike]), DeferFetchEdges] {
@@ -44,9 +45,12 @@ object GraphRepository {
DeferFetchEdges(value._1, value._2)
}
- val vertexFetcher = Fetcher((ctx: GraphRepository, ids: Seq[S2VertexLike]) => {
- ctx.getVertices(ids)
- })
+ val vertexFetcher =
+ Fetcher((ctx: GraphRepository, ids: Seq[VertexQueryParam]) => {
+ implicit val ec = ctx.ec
+
+ Future.traverse(ids)(ctx.getVertices).map(vs => ids.zip(vs))
+ })
val edgeFetcher = Fetcher((ctx: GraphRepository, ids: Seq[DeferFetchEdges]) => {
implicit val ec = ctx.ec
@@ -58,7 +62,7 @@ object GraphRepository {
}
val f: Future[Iterable[(QueryParam, Seq[S2EdgeLike])]] = Future.sequence(edgesByParam)
- val grouped = f.map { tpLs =>
+ val grouped: Future[Seq[(S2VertexLike, QueryParam, Seq[S2EdgeLike])]] = f.map { tpLs =>
tpLs.toSeq.flatMap { case (qp, edges) =>
edges.groupBy(_.srcForVertex).map { case (v, edges) => (v, qp, edges) }
}
@@ -121,8 +125,8 @@ class GraphRepository(val graph: S2GraphLike) {
graph.mutateEdges(edges, withWait = true)
}
- def getVertices(vertex: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = {
- graph.getVertices(vertex)
+ def getVertices(queryParam: VertexQueryParam): Future[Seq[S2VertexLike]] = {
+ graph.asInstanceOf[S2Graph].searchVertices(queryParam)
}
def getEdges(vertices: Seq[S2VertexLike], queryParam: QueryParam): Future[Seq[S2EdgeLike]] = {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
----------------------------------------------------------------------
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
index 4f092dd..64650d3 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/FieldResolver.scala
@@ -53,6 +53,8 @@ object FieldResolver {
val ids = c.argOpt[Any]("id").toSeq ++ c.argOpt[List[Any]]("ids").toList.flatten
val vertices = ids.map(vid => c.ctx.toS2VertexLike(vid, column))
+ val search = c.argOpt[String]("search")
+
val columnFields = column.metasInvMap.keySet
val selectedFields = AstHelper.selectedFields(c.astFields)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/ba938bc0/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
----------------------------------------------------------------------
diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
index 9066d21..4ba6680 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/types/S2Type.scala
@@ -138,15 +138,31 @@ object S2Type {
ListType(ColumnType),
arguments = List(
Argument("id", OptionInputType(toScalarType(column.columnType))),
- Argument("ids", OptionInputType(ListInputType(toScalarType(column.columnType))))
+ Argument("ids", OptionInputType(ListInputType(toScalarType(column.columnType)))),
+ Argument("search", OptionInputType(StringType))
),
description = Option("desc here"),
resolve = c => {
implicit val ec = c.ctx.ec
val (vertices, canSkipFetchVertex) = FieldResolver.serviceColumnOnService(column, c)
+ val searchOpt = c.argOpt[String]("search").map { qs =>
+ val prefix = s"${GlobalIndex.serviceField}:${service.serviceName} AND ${GlobalIndex.serviceColumnField}:${column.columnName}"
+ if (qs.trim.nonEmpty) Seq(prefix, qs).mkString(" AND ")
+ else prefix
+ qs
+ }
+
+ println(searchOpt)
+
+ val vertexQueryParam = VertexQueryParam(0, 100, searchOpt, vertices.map(_.id))
+
+// if (canSkipFetchVertex) Future.successful(vertices)
+// else GraphRepository.vertexFetcher.deferSeq(deferVertices)
+
+// val empty = Seq.empty[S2VertexLike]
+// DeferredValue(GraphRepository.vertexFetcher.defer(vertexQueryParam)).map(m => m._2)
- if (canSkipFetchVertex) Future.successful(vertices)
- else GraphRepository.vertexFetcher.deferSeq(vertices)
+ c.ctx.getVertices(vertexQueryParam)
}
): Field[GraphRepository, Any]
}
@@ -173,8 +189,9 @@ object S2Type {
implicit val ec = c.ctx.ec
val (vertex, canSkipFetchVertex) = FieldResolver.serviceColumnOnLabel(c)
- if (canSkipFetchVertex) Future.successful(vertex)
- else GraphRepository.vertexFetcher.defer(vertex)
+ // if (canSkipFetchVertex) Future.successful(vertex)
+ // else GraphRepository.vertexFetcher.defer(vertex)
+ Future.successful(vertex)
})
lazy val EdgeType = ObjectType(