You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/07/09 06:32:04 UTC

[1/2] incubator-s2graph git commit: make step works

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 701f0eeac -> 08d6a3edd


make step works


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6b088944
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6b088944
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6b088944

Branch: refs/heads/master
Commit: 6b088944b877b25775ac424658ad9b6472203082
Parents: 701f0ee
Author: daewon <da...@apache.org>
Authored: Mon Jul 2 15:28:17 2018 +0900
Committer: daewon <da...@apache.org>
Committed: Mon Jul 2 15:28:17 2018 +0900

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 s2core/build.sbt                                |   3 +-
 .../org/apache/s2graph/core/step/IStep.scala    |  58 ++++++++++
 .../s2graph/core/step/GraphStepTest.scala       | 110 +++++++++++++++++++
 .../org/apache/s2graph/core/step/StepTest.scala |  78 +++++++++++++
 5 files changed, 249 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b088944/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index ad5a5e9..0066e18 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,6 +9,7 @@
 .cache
 .history
 .lib/
+*/lib/
 var/*
 dist/*
 target/

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b088944/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 229bf41..0368715 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -57,7 +57,8 @@ libraryDependencies ++= Seq(
   "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging(),
   "org.scala-lang.modules" %% "scala-pickling" % "0.10.1",
   "net.pishen" %% "annoy4s" % annoy4sVersion,
-  "org.tensorflow" % "tensorflow" % tensorflowVersion
+  "org.tensorflow" % "tensorflow" % tensorflowVersion,
+  "io.reactivex" %% "rxscala" % "0.26.5"
 )
 
 libraryDependencies := {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b088944/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala b/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala
new file mode 100644
index 0000000..e0e23f6
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala
@@ -0,0 +1,58 @@
+package org.apache.s2graph.core.step
+
+import org.apache.s2graph.core._
+import rx.lang.scala.Observable
+
+import scala.language.higherKinds
+import scala.language.existentials
+
+trait RxStep[-A, +B] extends (A => Observable[B])
+
+object RxStep {
+
+  case class VertexFetchStep(g: S2GraphLike) extends RxStep[Seq[S2VertexLike], S2VertexLike] {
+    override def apply(vertices: Seq[S2VertexLike]): Observable[S2VertexLike] = {
+      Observable.from(vertices)
+    }
+  }
+
+  case class EdgeFetchStep(g: S2GraphLike, qp: QueryParam) extends RxStep[S2VertexLike, S2EdgeLike] {
+    override def apply(v: S2VertexLike): Observable[S2EdgeLike] = {
+      implicit val ec = g.ec
+
+      val step = org.apache.s2graph.core.Step(Seq(qp))
+      val q = Query(Seq(v), steps = Vector(step))
+
+      val f = g.getEdges(q).map { stepResult =>
+        val edges = stepResult.edgeWithScores.map(_.edge)
+        Observable.from(edges)
+      }
+
+      Observable.from(f).flatten
+    }
+  }
+
+  private def merge[A, B](steps: RxStep[A, B]*): RxStep[A, B] = new RxStep[A, B] {
+    override def apply(in: A): Observable[B] =
+      steps.map(_.apply(in)).toObservable.flatten
+  }
+
+  def toObservable(q: Query)(implicit graph: S2GraphLike): Observable[S2EdgeLike] = {
+    val v1: Observable[S2VertexLike] = VertexFetchStep(graph).apply(q.vertices)
+
+    val serialSteps = q.steps.map { step =>
+      val parallelSteps = step.queryParams.map(qp => EdgeFetchStep(graph, qp))
+      merge(parallelSteps: _*)
+    }
+
+    v1.flatMap { v =>
+      val initOpt = serialSteps.headOption.map(_.apply(v))
+
+      initOpt.map { init =>
+        serialSteps.tail.foldLeft(init) { case (prev, next) =>
+          prev.map(_.tgtForVertex).flatMap(next)
+        }
+      }.getOrElse(Observable.empty)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b088944/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala
new file mode 100644
index 0000000..96e49a0
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala
@@ -0,0 +1,110 @@
+package org.apache.s2graph.core.step
+
+import org.apache.s2graph.core.Integrate.IntegrateCommon
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.parsers.Where
+import org.apache.s2graph.core.rest.RequestParser
+import play.api.libs.json.Json
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
+
+class GraphStepTest extends IntegrateCommon {
+
+  import TestUtil._
+  import RxStep._
+
+  val insert = "insert"
+  val e = "e"
+  val weight = "weight"
+  val is_hidden = "is_hidden"
+
+  override def initTestData(): Unit = {
+    super.initTestData()
+
+    insertEdgesSync(
+      toEdge(1000, insert, e, 1, 10, testLabelName),
+      toEdge(1000, insert, e, 2, 20, testLabelName),
+      toEdge(1000, insert, e, 3, 30, testLabelName),
+
+      toEdge(1000, insert, e, 100, 1, testLabelName, Json.obj(weight -> 30))
+    )
+  }
+
+  test("basic compose") {
+    val vertices = Seq(
+      graph.toVertex(testServiceName, testColumnName, 1),
+      graph.toVertex(testServiceName, testColumnName, 2),
+      graph.toVertex(testServiceName, testColumnName, 3),
+
+      graph.toVertex(testServiceName, testColumnName, 10)
+    )
+
+    val v1 = VertexFetchStep(graph)
+
+    val qpIn = QueryParam(labelName = testLabelName, direction = "in")
+    val qpOut = QueryParam(labelName = testLabelName, direction = "out")
+
+    val e1 = EdgeFetchStep(graph, qpIn)
+    val e2 = EdgeFetchStep(graph, qpOut)
+
+    val where = Where("_to = 20").get
+
+    val q =
+      v1.apply(vertices) // vertices: 4 - (1, 2, 3, 10)
+        .flatMap(e1) // edges: 4 - (srcId = 1, 2, 3 and tgtId = 10)
+        .filter(where.filter) // filterOut (only _to == 20)
+        .map(_.tgtForVertex) // vertices: (20)
+        .flatMap(v => e1.apply(v) ++ e2.apply(v)) // edges: (tgtId = 20)
+
+    val res = q.toBlocking.toList
+  }
+
+  test("Query to RxSteps") {
+    def q(id: Int) = Json.parse(
+      s"""
+        {
+          "srcVertices": [
+            { "serviceName": "$testServiceName",
+              "columnName": "$testColumnName",
+              "id": $id
+            }],
+          "steps": [
+            [{
+              "label": "$testLabelName",
+              "direction": "out",
+              "offset": 0,
+              "limit": 10
+            },
+            {
+              "label": "$testLabelName",
+              "direction": "in",
+              "offset": 0,
+              "limit": 10
+            }],
+
+            [{
+              "label": "$testLabelName",
+              "direction": "out",
+              "offset": 0,
+              "limit": 10,
+              "where": "weight > 10"
+            },
+            {
+              "label": "$testLabelName",
+              "direction": "in",
+              "offset": 0,
+              "limit": 10
+            }]
+          ]
+        }""")
+
+    val queryJs = q(1)
+    val requestParser = new RequestParser(graph)
+    val query = requestParser.toQuery(queryJs, None)
+
+    val actual = RxStep.toObservable(query)(graph).toBlocking.toList.sortBy(_.srcVertex.innerIdVal.toString)
+    val expected = Await.result(graph.getEdges(query), Duration("30 sec")).edgeWithScores.map(_.edge).sortBy(_.srcVertex.innerIdVal.toString)
+
+    actual shouldBe expected
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6b088944/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala
new file mode 100644
index 0000000..f29a346
--- /dev/null
+++ b/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala
@@ -0,0 +1,78 @@
+package org.apache.s2graph.core.step
+
+import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}
+import rx.lang.scala.{Observable, Subscription}
+
+class StepTest extends FunSuite with Matchers {
+
+  trait GraphE {
+    def id: String
+  }
+
+  case class V(id: String) extends GraphE
+
+  case class E(id: String, src: V, tgt: V) extends GraphE
+
+  object GraphModels {
+    /**
+      * vertices: [A, B]
+      * edges: [E(A, B), E(B, A)]
+      */
+    val va = V("V_A")
+    val vb = V("V_B")
+
+    val e1 = E("E1", va, vb)
+    val e2 = E("E2", vb, va)
+
+    val allVertices = List(va, vb)
+    val allEdges = List(e1, e2)
+  }
+
+  case class VertexStep(vid: String) extends RxStep[Unit, V] {
+    override def apply(in: Unit): Observable[V] = {
+      val vertices = GraphModels.allVertices.filter(v => vid == v.id)
+      Observable.from(vertices)
+    }
+  }
+
+  case class EdgeStep(dir: String) extends RxStep[V, E] {
+    override def apply(in: V): Observable[E] = {
+      val edges = if (dir == "OUT") {
+        GraphModels.allEdges.filter(e => in == e.src)
+      } else {
+        GraphModels.allEdges.filter(e => in == e.tgt)
+      }
+
+      Observable.from(edges)
+    }
+  }
+
+  case class EdgeToVertexStep() extends RxStep[E, V] {
+    override def apply(in: E): Observable[V] = {
+      Observable.just(in.tgt)
+    }
+  }
+
+  test("basic step") {
+    val v1: RxStep[Unit, V] = VertexStep("V_A")
+
+    val e1: RxStep[V, E] = EdgeStep("OUT")
+    val e2 = EdgeStep("IN")
+
+    val g = v1(())
+      .flatMap(v => e1(v) ++ e2(v))
+      .flatMap(EdgeToVertexStep())
+      .flatMap(v => e1(v) ++ e2(v))
+      .distinct
+
+    val expected = List(
+      E("E1", V("V_A"), V("V_B")),
+      E("E2", V("V_B"), V("V_A"))
+    ).sortBy(_.id)
+
+    val actual = g.toBlocking.toList.sortBy(_.id)
+
+    println(actual)
+    actual shouldBe expected
+  }
+}


[2/2] incubator-s2graph git commit: [S2GRAPH-229] 'Step' abstraction for combinable queries

Posted by st...@apache.org.
[S2GRAPH-229] 'Step' abstraction for combinable queries

JIRA:
  [S2GRAPH-229] https://issues.apache.org/jira/browse/S2GRAPH-229

Pull Request:
  Closes #178

Author
  daewon <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/08d6a3ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/08d6a3ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/08d6a3ed

Branch: refs/heads/master
Commit: 08d6a3edd613656330dc2963f1fef71574d78c1b
Parents: 6b08894
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Jul 9 15:30:39 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Jul 9 15:30:39 2018 +0900

----------------------------------------------------------------------
 CHANGES                                          |  1 +
 .../org/apache/s2graph/core/step/IStep.scala     | 19 +++++++++++++++++++
 .../apache/s2graph/core/step/GraphStepTest.scala | 19 +++++++++++++++++++
 .../org/apache/s2graph/core/step/StepTest.scala  | 19 +++++++++++++++++++
 4 files changed, 58 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08d6a3ed/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index fead999..da32f18 100644
--- a/CHANGES
+++ b/CHANGES
@@ -104,6 +104,7 @@ Release Notes - S2Graph - Version 0.2.0
     * [S2GRAPH-162] - Update year in the NOTICE file.
     * [S2GRAPH-211] - Include 's2jobs' test in CI
     * [S2GRAPH-212] - Fix broken markdown on README.md.
+    * [S2GRAPH-229] - 'Step' abstraction for combinable queries
 
 Release 0.1.0 - Released
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08d6a3ed/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala b/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala
index e0e23f6..e700bf8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/step/IStep.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.s2graph.core.step
 
 import org.apache.s2graph.core._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08d6a3ed/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala
index 96e49a0..0096212 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/step/GraphStepTest.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.s2graph.core.step
 
 import org.apache.s2graph.core.Integrate.IntegrateCommon

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08d6a3ed/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala
index f29a346..3acc9d6 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/step/StepTest.scala
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.s2graph.core.step
 
 import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}