You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/07/31 01:05:18 UTC
[04/25] incubator-s2graph git commit: add IndexProvider.
add IndexProvider.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/7ddc2762
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/7ddc2762
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/7ddc2762
Branch: refs/heads/master
Commit: 7ddc276207a2ef8276d3f40067b64ac06876c48d
Parents: e3472de
Author: DO YUNG YOON <st...@apache.org>
Authored: Tue Jul 11 18:48:53 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Tue Jul 11 18:48:53 2017 +0900
----------------------------------------------------------------------
.../core/io/tinkerpop/optimize/S2GraphStep.java | 59 +++++++++++++++++++-
.../tinkerpop/optimize/S2GraphStepStrategy.java | 2 +
.../org/apache/s2graph/core/QueryParam.scala | 4 +-
.../s2graph/core/index/IndexProvider.scala | 27 +++++----
.../s2graph/core/index/IndexProviderTest.scala | 2 +-
.../core/tinkerpop/structure/S2GraphTest.scala | 5 +-
6 files changed, 82 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
----------------------------------------------------------------------
diff --git a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
index 4d6568b..384a88b 100644
--- a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
+++ b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStep.java
@@ -1,27 +1,74 @@
package org.apache.s2graph.core.io.tinkerpop.optimize;
+import org.apache.s2graph.core.EdgeId;
+import org.apache.s2graph.core.QueryParam;
+import org.apache.s2graph.core.S2Graph;
+import org.apache.s2graph.core.index.IndexProvider;
+import org.apache.s2graph.core.index.IndexProvider$;
+import org.apache.s2graph.core.mysqls.Label;
import org.apache.s2graph.core.utils.logger;
import org.apache.tinkerpop.gremlin.process.traversal.Order;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.step.HasContainerHolder;
import org.apache.tinkerpop.gremlin.process.traversal.step.Profiling;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.HasStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.NoOpBarrierStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.IdentityStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer;
import org.apache.tinkerpop.gremlin.process.traversal.util.MutableMetrics;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import java.util.*;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
public class S2GraphStep<S, E extends Element> extends GraphStep<S, E> {
private final List<HasContainer> hasContainers = new ArrayList<>();
+ private void foldInHasContainers(final GraphStep<?, ?> originalStep) {
+ Step<?, ?> currentStep = originalStep.getNextStep();
+ while (true) {
+ if (currentStep instanceof HasStep) {
+ hasContainers.addAll(((HasStep)currentStep).getHasContainers());
+ } else if (currentStep instanceof IdentityStep) {
+ } else if (currentStep instanceof NoOpBarrierStep) {
+
+ } else {
+ break;
+ }
+
+ currentStep = currentStep.getNextStep();
+ }
+
+ }
public S2GraphStep(final GraphStep<S, E> originalStep) {
super(originalStep.getTraversal(), originalStep.getReturnClass(), originalStep.isStartStep(), originalStep.getIds());
+
+ foldInHasContainers(originalStep);
originalStep.getLabels().forEach(this::addLabel);
- System.err.println("[[S2GraphStep]]");
+ // 1. build S2Graph QueryParams for this step.
+ // 2. graph.vertices(this.ids, queryParams) or graph.edges(this.ids, queryParams)
+ // 3. vertices/edges lookup indexProvider, then return Seq[EdgeId/VertexId]
+
+ this.setIteratorSupplier(() -> {
+ final S2Graph graph = (S2Graph)traversal.asAdmin().getGraph().get();
+ if (this.ids != null && this.ids.length > 0) {
+
+ return iteratorList((Iterator)graph.vertices(this.ids));
+ }
+ // full scan
+
+ String queryString = IndexProvider$.MODULE$.buildQueryString(hasContainers);
+
+ List<String> ids = graph.indexProvider().fetchIds(queryString);
+ return (Iterator) (Vertex.class.isAssignableFrom(this.returnClass) ? graph.vertices(ids) : graph.edges(ids));
+ });
}
@Override
@@ -30,4 +77,14 @@ public class S2GraphStep<S, E extends Element> extends GraphStep<S, E> {
super.toString() : StringFactory.stepString(this, Arrays.toString(this.ids), this.hasContainers);
}
+ private <E extends Element> Iterator<E> iteratorList(final Iterator<E> iterator) {
+ final List<E> list = new ArrayList<E>();
+ while (iterator.hasNext()) {
+ final E e = iterator.next();
+ if (HasContainer.testAll(e, this.hasContainers))
+ list.add(e);
+ }
+ return list.iterator();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStepStrategy.java
----------------------------------------------------------------------
diff --git a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStepStrategy.java b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStepStrategy.java
index ea9ad7e..15d7699 100644
--- a/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStepStrategy.java
+++ b/s2core/src/main/java/org/apache/s2graph/core/io/tinkerpop/optimize/S2GraphStepStrategy.java
@@ -26,6 +26,8 @@ public class S2GraphStepStrategy extends AbstractTraversalStrategy<TraversalStra
final S2GraphStep<?, ?> s2GraphStep = new S2GraphStep<>(originalGraphStep);
TraversalHelper.replaceStep(originalGraphStep, (Step) s2GraphStep, traversal);
+
+
} else {
//Make sure that any provided "start" elements are instantiated in the current transaction
// Object[] ids = originalGraphStep.getIds();
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 5b8543a..7e95f58 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -28,8 +28,9 @@ import org.apache.s2graph.core.parsers.{Where, WhereParser}
import org.apache.s2graph.core.rest.TemplateHelper
import org.apache.s2graph.core.storage.StorageSerializable._
import org.apache.s2graph.core.types._
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
import org.hbase.async.ColumnRangeFilter
-import play.api.libs.json.{JsString, JsNull, JsValue, Json}
+import play.api.libs.json.{JsNull, JsString, JsValue, Json}
import scala.util.{Success, Try}
@@ -257,6 +258,7 @@ object QueryParam {
val Delimiter = ","
val maxMetaByte = (-1).toByte
val fillArray = Array.fill(100)(maxMetaByte)
+ import scala.collection.JavaConverters._
def apply(labelWithDirection: LabelWithDirection): QueryParam = {
val label = Label.findById(labelWithDirection.labelId)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index baf05d4..d3878af 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -11,6 +11,7 @@ import org.apache.s2graph.core.io.Conversions
import org.apache.s2graph.core.{EdgeId, S2Edge}
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.types.InnerValLike
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
import play.api.libs.json.Json
object IndexProvider {
@@ -23,11 +24,19 @@ object IndexProvider {
case "lucene" => new LuceneIndexProvider(config)
}
}
+
+ def buildQueryString(hasContainers: java.util.List[HasContainer]): String = {
+ import scala.collection.JavaConversions._
+ hasContainers.map { container =>
+ container.getKey + ":" + container.getValue
+ }.mkString(" AND ")
+ }
+
}
trait IndexProvider {
//TODO: Seq nee do be changed into stream
- def fetchEdges(indexProps: Seq[(String, InnerValLike)]): Seq[EdgeId]
+ def fetchIds(queryString: String): java.util.List[String]
def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean]
@@ -57,24 +66,22 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
edges.map(_ => true)
}
- override def fetchEdges(indexProps: Seq[(String, InnerValLike)]): Seq[EdgeId] = {
- val queryStr = indexProps.map { case (name, value) =>
- name + ": " + value.toString()
- }.mkString(" AND ")
-
- val q = new QueryParser(edgeIdField, analyzer).parse(queryStr)
+ override def fetchIds(queryString: String): java.util.List[String] = {
+ val ids = new java.util.ArrayList[String]
+ val q = new QueryParser(edgeIdField, analyzer).parse(queryString)
val hitsPerPage = 10
val reader = DirectoryReader.open(directory)
val searcher = new IndexSearcher(reader)
val docs = searcher.search(q, hitsPerPage)
- val ls = docs.scoreDocs.map { scoreDoc =>
+
+ docs.scoreDocs.foreach { scoreDoc =>
val document = searcher.doc(scoreDoc.doc)
- Conversions.s2EdgeIdReads.reads(Json.parse(document.get(edgeIdField))).get
+ ids.add(document.get(edgeIdField))
}
reader.close()
- ls
+ ids
}
override def shutdown(): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
index 8f484ad..70f7c3c 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/index/IndexProviderTest.scala
@@ -25,7 +25,7 @@ class IndexProviderTest extends IntegrateCommon {
edges.foreach(e => logger.debug(s"[Edge]: $e"))
indexProvider.mutateEdges(edges)
- val edgeIds = indexProvider.fetchEdges(Seq("time" -> InnerVal.withLong(10, "v4")))
+ val edgeIds = indexProvider.fetchIds("time: 10")
edgeIds.foreach { edgeId =>
logger.debug(s"[EdgeId]: $edgeId")
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7ddc2762/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
index d9aa3bc..1ef7890 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/structure/S2GraphTest.scala
@@ -463,10 +463,7 @@ class S2GraphTest extends FunSuite with Matchers with TestCommonWithModels {
val e12 = v6.addEdge("created", v3, "weight", Double.box(0.2))
- val ls = graph.traversal().V().choose(new Predicate[Vertex] {
- override def test(t: Vertex): Boolean =
- t.label().equals("person")
- }, out("knows"), in("created")).values("name").asAdmin()
+ val ls = graph.traversal().V().has("name", "josh")
val l = ls.toList
logger.error(s"[Size]: ${l.size}")