You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/11/19 02:29:02 UTC
[19/23] incubator-s2graph git commit: remove
newVertex/newEdge/newVertexId on S2GraphLike.
remove newVertex/newEdge/newVertexId on S2GraphLike.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/eabe7570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/eabe7570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/eabe7570
Branch: refs/heads/master
Commit: eabe7570a9ef9ec0cb7b36d518d5bef4d1fd53dd
Parents: 6403bc9
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Nov 10 22:57:51 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Nov 11 10:40:19 2017 +0900
----------------------------------------------------------------------
.../loader/subscriber/GraphSubscriber.scala | 67 +----
.../loader/subscriber/TransferToHFile.scala | 4 +-
.../org/apache/s2graph/core/QueryResult.scala | 2 +-
.../scala/org/apache/s2graph/core/S2Edge.scala | 8 +-
.../org/apache/s2graph/core/S2EdgeBuilder.scala | 2 +-
.../scala/org/apache/s2graph/core/S2Graph.scala | 256 ++-----------------
.../org/apache/s2graph/core/S2GraphLike.scala | 45 +---
.../org/apache/s2graph/core/S2GraphTp.scala | 98 +++++++
.../org/apache/s2graph/core/S2VertexLike.scala | 3 +-
.../apache/s2graph/core/TraversalHelper.scala | 153 ++++++++++-
.../s2graph/core/rest/RequestParser.scala | 2 +-
.../tall/IndexEdgeDeserializable.scala | 15 +-
.../wide/IndexEdgeDeserializable.scala | 15 +-
.../tall/SnapshotEdgeDeserializable.scala | 12 +-
.../wide/SnapshotEdgeDeserializable.scala | 13 +-
.../vertex/tall/VertexDeserializable.scala | 3 +-
.../vertex/wide/VertexDeserializable.scala | 3 +-
.../org/apache/s2graph/core/S2EdgeTest.scala | 24 +-
.../s2graph/core/TestCommonWithModels.scala | 3 +-
.../s2graph/core/parsers/WhereParserTest.scala | 22 +-
.../s2graph/core/storage/StorageIOTest.scala | 4 +-
.../core/storage/hbase/IndexEdgeTest.scala | 6 +-
.../process/S2GraphProcessStandardTest.scala | 4 +-
.../S2GraphStructureIntegrateTest.scala | 4 +-
.../S2GraphStructureStandardTest.scala | 4 +-
25 files changed, 354 insertions(+), 418 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
index a371b6b..6ecb070 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -67,6 +67,7 @@ object GraphSubscriberHelper extends WithKafka {
var g: S2Graph = null
var management: Management = null
val conns = new scala.collection.mutable.HashMap[String, Connection]()
+ var builder: GraphElementBuilder = null
def toOption(s: String) = {
s match {
@@ -82,6 +83,7 @@ object GraphSubscriberHelper extends WithKafka {
val ec = ExecutionContext.Implicits.global
g = new S2Graph(config)(ec)
management = new Management(g)
+ builder = g.elementBuilder
}
}
@@ -106,7 +108,7 @@ object GraphSubscriberHelper extends WithKafka {
(statFunc: (String, Int) => Unit): Iterable[GraphElement] = {
(for (msg <- msgs) yield {
statFunc("total", 1)
- g.elementBuilder.toGraphElement(msg, labelMapping) match {
+ builder.toGraphElement(msg, labelMapping) match {
case Some(e) if e.isInstanceOf[S2Edge] =>
statFunc("EdgeParseOk", 1)
e.asInstanceOf[S2Edge]
@@ -122,69 +124,6 @@ object GraphSubscriberHelper extends WithKafka {
}).toList
}
-// private def storeRec(zkQuorum: String, tableName: String, puts: List[Put], elementsSize: Int, tryNum: Int)
-// (statFunc: (String, Int) => Unit, statPrefix: String = "edge"): Unit = {
-// if (tryNum <= 0) {
-// statFunc("errorStore", elementsSize)
-// throw new RuntimeException(s"retry failed after $maxTryNum")
-// }
-// val conn = getConn(zkQuorum)
-// val mutator = conn.getBufferedMutator(TableName.valueOf(tableName))
-// // val table = conn.getTable(TableName.valueOf(tableName))
-// // table.setAutoFlush(false, false)
-//
-// try {
-// puts.foreach { put => put.setDurability(Durability.ASYNC_WAL) }
-// mutator.mutate(puts)
-// // table.put(puts)
-// statFunc(s"$statPrefix:storeOk", elementsSize)
-// } catch {
-// case e: Throwable =>
-// e.printStackTrace()
-// Thread.sleep(sleepPeriod)
-// storeRec(zkQuorum, tableName, puts, elementsSize, tryNum - 1)(statFunc)
-// } finally {
-// mutator.close()
-// // table.close()
-// }
-// }
-//
-// def storeDegreeBulk(zkQuorum: String, tableName: String)
-// (degrees: Iterable[(String, String, String, Int)], labelMapping: Map[String, String] = Map.empty)
-// (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
-// val counts = HashMap[String, Long]()
-// val statFunc = storeStat(counts)(mapAccOpt) _
-//
-// for {
-// (vertexId, labelName, direction, degreeVal) <- degrees
-// incrementRequests <- TransferToHFile.buildDegreePutRequests(vertexId, labelName, direction, degreeVal)
-// } {
-// storeRec(zkQuorum, tableName, incrementRequests, degrees.size, maxTryNum)(statFunc, "degree")
-// }
-// counts
-// }
-// def storeBulk(zkQuorum: String, tableName: String)
-// (msgs: Seq[String], labelMapping: Map[String, String] = Map.empty, autoCreateEdge: Boolean = false)
-// (mapAccOpt: Option[HashMapAccumulable]): Iterable[(String, Long)] = {
-//
-// val counts = HashMap[String, Long]()
-// val statFunc = storeStat(counts)(mapAccOpt) _
-// val elements = toGraphElements(msgs, labelMapping)(statFunc)
-//
-// val puts = elements.flatMap { element =>
-// element match {
-// case v: Vertex if v.op == GraphUtil.operations("insert") || v.op == GraphUtil.operations("insertBulk") =>
-// v.buildPuts()
-// case e: Edge if e.op == GraphUtil.operations("insert") || e.op == GraphUtil.operations("insertBulk") =>
-// EdgeWriter(e).insertBulkForLoader(autoCreateEdge)
-// case _ => Nil
-// }
-// } toList
-//
-// storeRec(zkQuorum, tableName, puts, msgs.size, maxTryNum)(statFunc)
-// counts
-// }
-
def storeStat(counts: HashMap[String, Long])(mapAccOpt: Option[HashMapAccumulable])(key: String, value: Int) = {
counts.put(key, counts.getOrElse(key, 0L) + value)
mapAccOpt match {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index a7b4e00..6aaf6fd 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -97,11 +97,11 @@ object TransferToHFile extends SparkApp {
val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
throw new RuntimeException(s"$vertexId can not be converted into innerval")
}
- val vertex = GraphSubscriberHelper.g.newVertex(SourceVertexId(label.srcColumn, innerVal))
+ val vertex = GraphSubscriberHelper.builder.newVertex(SourceVertexId(label.srcColumn, innerVal))
val ts = System.currentTimeMillis()
val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))
- val edge = GraphSubscriberHelper.g.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs)
+ val edge = GraphSubscriberHelper.builder.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs)
edge.edgesWithIndex.flatMap { indexEdge =>
GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map { kv =>
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 7d187c6..37ddf06 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -35,7 +35,7 @@ object QueryResult {
val propsWithTs = Map(LabelMeta.timestamp ->
InnerValLikeWithTs(InnerVal.withLong(currentTs, label.schemaVersion), currentTs))
- val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
+ val edge = graph.elementBuilder.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs)
val edgeWithScore = EdgeWithScore(edge, S2Graph.DefaultScore, queryParam.label)
edgeWithScore
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 3e33ed4..3eb1fa7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -817,10 +817,10 @@ object S2Edge {
val belongLabelIds = Seq(e.getLabelId())
if (e.getDir() == GraphUtil.directions("in")) {
val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn)
- e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
+ e.innerGraph.elementBuilder.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
} else {
val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn)
- e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
+ e.innerGraph.elementBuilder.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
}
}
@@ -828,10 +828,10 @@ object S2Edge {
val belongLabelIds = Seq(e.getLabelId())
if (e.getDir() == GraphUtil.directions("in")) {
val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn)
- e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
+ e.innerGraph.elementBuilder.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds)
} else {
val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn)
- e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
+ e.innerGraph.elementBuilder.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
index 4004a13..dc0baa2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala
@@ -80,7 +80,7 @@ class S2EdgeBuilder(edge: S2EdgeLike) {
def updateTgtVertex(id: InnerValLike): S2EdgeLike = {
val newId = TargetVertexId(edge.tgtVertex.id.column, id)
- val newTgtVertex = edge.innerGraph.newVertex(newId, edge.tgtVertex.ts, edge.tgtVertex.props)
+ val newTgtVertex = edge.innerGraph.elementBuilder.newVertex(newId, edge.tgtVertex.ts, edge.tgtVertex.props)
copyEdge(tgtVertex = newTgtVertex)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index f80d5c2..3905442 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.core
import java.util
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.commons.configuration.{BaseConfiguration, Configuration}
@@ -96,6 +96,8 @@ object S2Graph {
val DefaultColumnName = "vertex"
val DefaultLabelName = "_s2graph"
+ var hbaseExecutor: ExecutorService = _
+
val graphStrategies: TraversalStrategies =
TraversalStrategies.GlobalCache.getStrategies(classOf[Graph]).addStrategies(S2GraphStepStrategy.instance)
@@ -130,7 +132,14 @@ object S2Graph {
logger.info(s"[InitStorage]: $storageBackend")
storageBackend match {
- case "hbase" => new AsynchbaseStorage(graph, config)
+ case "hbase" =>
+ hbaseExecutor =
+ if (config.getString("hbase.zookeeper.quorum") == "localhost")
+ AsynchbaseStorage.initLocalHBase(config)
+ else
+ null
+
+ new AsynchbaseStorage(graph, config)
case _ => throw new RuntimeException("not supported storage.")
}
}
@@ -150,94 +159,6 @@ object S2Graph {
}
}
-
-@Graph.OptIns(value = Array(
- new Graph.OptIn(value = Graph.OptIn.SUITE_PROCESS_STANDARD),
- new Graph.OptIn(value = Graph.OptIn.SUITE_STRUCTURE_STANDARD)
-))
-@Graph.OptOuts(value = Array(
- /* Process */
- /* branch: passed all. */
- /* filter */
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.DropTest$Traversals", method = "g_V_properties_drop", reason = "please find bug on this case."),
-
- /* map */
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_both_both_count", reason = "count differ very little. fix this."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX3X_count", reason = "count differ very little. fix this."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX8X_count", reason = "count differ very litter. fix this."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallback", reason = "NullPointerException. fix this."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profile", reason = "java.lang.AssertionError: There should be 3 top-level metrics. fix this."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profileXmetricsX", reason = "expected 2, actual 6. fix this."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profileXmetricsX", reason = "expected 8049, actual 8046. fix this."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profile", reason = "expected 8049, actual 8046. fix this."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profileXmetricsX", reason = "expected 2, actual 6. fix this."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profile", reason = "expected 2, actual 6. fix this."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallbackSideEffect", reason = "NullPointerException. fix this."),
-
- /* sideEffect */
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_repeatXbothEXcreatedX_subgraphXsgX_outVX_timesX5X_name_dedup", reason = "Expected 5, Actual 6."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_outEXknowsX_subgraphXsgX_name_capXsgX", reason = "Expected 3, Actual 6"),
-
- /* compliance */
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.CoreTraversalTest", method = "shouldThrowExceptionWhenIdsMixed", reason = "VertexId is not Element."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest", method = "*", reason = "not supported yet."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest", method = "*", reason = "not supported yet."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest", method = "*", reason = "not supported yet."),
-
- /* Structure */
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateIdEquality", reason = "reference equals on EdgeId is not supported."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateEquality", reason = "reference equals on EdgeId is not supported."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.VertexTest$BasicVertexTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnEdge", reason = "S2Vertex.addEdge behave as upsert."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "reference equals is not supported."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPropertyTest", method = "shouldNotBeEqualPropertiesAsThereIsDifferentKey", reason = "reference equals is not supported."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveVertices", reason = "random label creation is not supported. all label need to be pre-configured."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnVertex", reason = "Assigning the same ID to an Element update instead of throwing exception."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveEdges", reason = "random label creation is not supported. all label need to be pre-configured."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "Assigning the same ID to an Element update instead of throwing exception."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.1)", reason = "graphson-v2-embedded is not supported."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.5)", reason = "graphson-v2-embedded is not supported."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.5)", reason = "graphson-v2-embedded is not supported."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.1)", reason = "graphson-v2-embedded is not supported."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.25)", reason = "graphson-v2-embedded is not supported."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.25)", reason = "graphson-v2-embedded is not supported."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.DistributionGeneratorTest", method = "*", reason = "non-deterministic test."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GryoTest", method = "shouldSerializeTree", reason = "order of children is reversed. not sure why."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GraphSONTest", method = "shouldSerializeTraversalMetrics", reason = "expected 2, actual 3."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithBOTHEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithINEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexAsReferenceNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithOUTEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdgeAsReference", specific = "graphson-v2-embedded", reason = "no"),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteEdge", specific = "graphson-v2-embedded", reason = "no"),
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdge", specific = "graphson-v2-embedded", reason = "no"),
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoGraphTest", method = "*", reason = "no"), // all failed.
-
- new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoTest", method = "*", reason = "no")
-))
class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2GraphLike {
var apacheConfiguration: Configuration = _
@@ -457,10 +378,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
} else {
val filterOutFuture = q.queryOption.filterOutQuery match {
case None => fallback
- case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+ case Some(filterOutQuery) => traversalHelper.getEdgesStepInner(filterOutQuery)
}
for {
- stepResult <- getEdgesStepInner(q)
+ stepResult <- traversalHelper.getEdgesStepInner(q)
filterOutInnerResult <- filterOutFuture
} yield {
if (filterOutInnerResult.isEmpty) stepResult
@@ -480,7 +401,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
else {
val filterOutFuture = mq.queryOption.filterOutQuery match {
case None => fallback
- case Some(filterOutQuery) => getEdgesStepInner(filterOutQuery)
+ case Some(filterOutQuery) => traversalHelper.getEdgesStepInner(filterOutQuery)
}
val multiQueryFutures = Future.sequence(mq.queries.map { query => getEdges(query) })
@@ -515,7 +436,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
}
val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
- fetchAndDeleteAll(queries, requestTs)
+ traversalHelper.fetchAndDeleteAll(queries, requestTs)
} { case (allDeleted, deleteSuccess) =>
allDeleted && deleteSuccess
}.map { case (allDeleted, deleteSuccess) => allDeleted && deleteSuccess }
@@ -546,12 +467,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
}
override def getVertex(vertexId: VertexId): Option[S2VertexLike] = {
- val v = newVertex(vertexId)
+ val v = elementBuilder.newVertex(vertexId)
Await.result(getVertices(Seq(v)).map { vertices => vertices.headOption }, WaitTimeout)
}
override def fetchEdges(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): util.Iterator[Edge] = {
- Await.result(fetchEdgesAsync(vertex, labelNameWithDirs), WaitTimeout)
+ Await.result(traversalHelper.fetchEdgesAsync(vertex, labelNameWithDirs), WaitTimeout)
}
override def edgesAsync(vertex: S2VertexLike, direction: Direction, labelNames: String*): Future[util.Iterator[Edge]] = {
@@ -574,149 +495,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
}
}
- fetchEdgesAsync(vertex, labelNameWithDirs.distinct)
+ traversalHelper.fetchEdgesAsync(vertex, labelNameWithDirs.distinct)
}
def isRunning(): Boolean = running.get()
-
- /** Private **/
-
- private def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
- Try {
- if (q.steps.isEmpty) fallback
- else {
- def fetch: Future[StepResult] = {
- val startStepInnerResult = QueryResult.fromVertices(this, q)
- q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
- for {
- prevStepInnerResult <- prevStepInnerResultFuture
- currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult)
- } yield {
- currentStepInnerResult.copy(
- accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors,
- failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount
- )
- }
- }
- }
-
- fetch
- }
- } recover {
- case e: Exception =>
- logger.error(s"getEdgesAsync: $e", e)
- fallback
- } get
- }
-
- private def fetchStep(orgQuery: Query,
- stepIdx: Int,
- stepInnerResult: StepResult,
- buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
- if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty)
- else {
- val (_, prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) =
- traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, stepInnerResult)
-
- val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
-
- traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests,
- fetchedLs, orgQuery.steps(stepIdx).queryParams, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec)
- }
- }
-
- private def fetches(queryRequests: Seq[QueryRequest],
- prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = {
-
- val reqWithIdxs = queryRequests.zipWithIndex
- val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label)
- val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
- for {
- prev <- prevFuture
- cur <- getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
- } yield {
- prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
- }
- }
- aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) }
- }
-
- private def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
- val futures = queries.map(getEdgesStepInner(_, true))
- val future = for {
- stepInnerResultLs <- Future.sequence(futures)
- (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
- } yield {
- // logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
- (allDeleted, ret)
- }
-
- Extensions.retryOnFailure(MaxRetryNum) {
- future
- } {
- logger.error(s"fetch and deleteAll failed.")
- (true, false)
- }
-
- }
-
- private def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
- requestTs: Long): Future[(Boolean, Boolean)] = {
- stepInnerResultLs.foreach { stepInnerResult =>
- if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
- }
- val futures = for {
- stepInnerResult <- stepInnerResultLs
- filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
- (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
- }
- edgesToDelete = elementBuilder.buildEdgesToDelete(filtered, requestTs)
- if edgesToDelete.nonEmpty
- } yield {
- val head = edgesToDelete.head
- val label = head.edge.innerLabel
- val stepResult = StepResult(edgesToDelete, Nil, Nil, false)
- val ret = label.schemaVersion match {
- case HBaseType.VERSION3 | HBaseType.VERSION4 =>
- if (label.consistencyLevel == "strong") {
- /*
- * read: snapshotEdge on queryResult = O(N)
- * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
- */
- mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
- } else {
- getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
- }
- case _ =>
-
- /*
- * read: x
- * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
- */
- getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
- }
- ret
- }
-
- if (futures.isEmpty) {
- // all deleted.
- Future.successful(true -> true)
- } else {
- Future.sequence(futures).map { rets => false -> rets.forall(identity) }
- }
- }
-
- private def fetchEdgesAsync(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): Future[util.Iterator[Edge]] = {
- val queryParams = labelNameWithDirs.map { case (l, direction) =>
- QueryParam(labelName = l, direction = direction.toLowerCase)
- }
-
- val query = Query.toQuery(Seq(vertex), queryParams)
- val queryRequests = queryParams.map { param => QueryRequest(query, 0, vertex, param) }
- val ls = new util.ArrayList[Edge]()
- fetches(queryRequests, Map.empty).map { stepResultLs =>
- stepResultLs.foreach(_.edgeWithScores.foreach(es => ls.add(es.edge)))
- ls.iterator()
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index a58f1e0..1b80cb4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -58,8 +58,6 @@ trait S2GraphLike extends Graph {
override def features() = s2Features
- def nextLocalLongId = localLongId.getAndIncrement()
-
def fallback = Future.successful(StepResult.Empty)
def defaultStorage: Storage
@@ -103,34 +101,6 @@ trait S2GraphLike extends Graph {
def edgesAsync(vertex: S2VertexLike, direction: Direction, labelNames: String*): Future[util.Iterator[Edge]]
/** Convert to Graph Element **/
- def newEdge(srcVertex: S2VertexLike,
- tgtVertex: S2VertexLike,
- innerLabel: Label,
- dir: Int,
- op: Byte = GraphUtil.defaultOpByte,
- version: Long = System.currentTimeMillis(),
- propsWithTs: S2Edge.State,
- parentEdges: Seq[EdgeWithScore] = Nil,
- originalEdgeOpt: Option[S2EdgeLike] = None,
- pendingEdgeOpt: Option[S2EdgeLike] = None,
- statusCode: Byte = 0,
- lockTs: Option[Long] = None,
- tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike =
- elementBuilder.newEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs,
- parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
-
- def newVertexId(service: Service,
- column: ServiceColumn,
- id: Any): VertexId =
- elementBuilder.newVertexId(service, column, id)
-
- def newVertex(id: VertexId,
- ts: Long = System.currentTimeMillis(),
- props: S2Vertex.Props = S2Vertex.EmptyProps,
- op: Byte = 0,
- belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike =
- elementBuilder.newVertex(id, ts, props, op, belongLabelIds)
-
def toVertex(serviceName: String,
columnName: String,
id: Any,
@@ -261,7 +231,7 @@ trait S2GraphLike extends Graph {
val vertex = kvsMap.get(T.id.name()) match {
case None => // do nothing
- val id = nextLocalLongId
+ val id = localLongId.getAndIncrement()
makeVertex(Long.box(id), kvsMap)
case Some(idValue) if S2Property.validType(idValue) =>
makeVertex(idValue, kvsMap)
@@ -269,7 +239,7 @@ trait S2GraphLike extends Graph {
throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported
}
- addVertexInner(vertex)
+ addVertex(vertex.id, vertex.ts, vertex.props, vertex.op, vertex.belongLabelIds)
vertex
}
@@ -290,17 +260,6 @@ trait S2GraphLike extends Graph {
vertex
}
- def addVertexInner(vertex: S2VertexLike): S2VertexLike = {
- val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets =>
- if (rets.forall(_.isSuccess)) {
- indexProvider.mutateVerticesAsync(Seq(vertex))
- } else throw new RuntimeException("addVertex failed.")
- }
- Await.ready(future, WaitTimeout)
-
- vertex
- }
-
/* tp3 only */
def addEdge(srcVertex: S2VertexLike, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = {
val containsId = kvs.contains(T.id)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala
new file mode 100644
index 0000000..5ce4086
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphTp.scala
@@ -0,0 +1,98 @@
+package org.apache.s2graph.core
+
+import com.typesafe.config.Config
+import org.apache.tinkerpop.gremlin.structure.Graph
+
+import scala.concurrent.ExecutionContext
+
+
+@Graph.OptIns(value = Array(
+ new Graph.OptIn(value = Graph.OptIn.SUITE_PROCESS_STANDARD),
+ new Graph.OptIn(value = Graph.OptIn.SUITE_STRUCTURE_STANDARD)
+))
+@Graph.OptOuts(value = Array(
+ /* Process */
+ /* branch: passed all. */
+ /* filter */
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.filter.DropTest$Traversals", method = "g_V_properties_drop", reason = "please find bug on this case."),
+
+ /* map */
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_both_both_count", reason = "count differ very little. fix this."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX3X_count", reason = "count differ very little. fix this."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.CountTest$Traversals", method = "g_V_repeatXoutX_timesX8X_count", reason = "count differ very litter. fix this."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallback", reason = "NullPointerException. fix this."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profile", reason = "java.lang.AssertionError: There should be 3 top-level metrics. fix this."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "g_V_whereXinXcreatedX_count_isX1XX_name_profileXmetricsX", reason = "expected 2, actual 6. fix this."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profileXmetricsX", reason = "expected 8049, actual 8046. fix this."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "grateful_V_out_out_profile", reason = "expected 8049, actual 8046. fix this."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profileXmetricsX", reason = "expected 2, actual 6. fix this."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "modern_V_out_out_profile", reason = "expected 2, actual 6. fix this."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest$Traversals", method = "testProfileStrategyCallbackSideEffect", reason = "NullPointerException. fix this."),
+
+ /* sideEffect */
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_repeatXbothEXcreatedX_subgraphXsgX_outVX_timesX5X_name_dedup", reason = "Expected 5, Actual 6."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest$Traversals", method = "g_V_withSideEffectXsgX_outEXknowsX_subgraphXsgX_name_capXsgX", reason = "Expected 3, Actual 6"),
+
+ /* compliance */
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.CoreTraversalTest", method = "shouldThrowExceptionWhenIdsMixed", reason = "VertexId is not Element."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest", method = "*", reason = "not supported yet."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnAddVWithGeneratedDefaultId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithGeneratedCustomId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest", method = "shouldGenerateDefaultIdOnGraphAddVWithSpecifiedId", reason = "GraphStep.processNextStart throw FastNoSuchElementException when GraphStep.start = true and GraphStep.end = true."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest", method = "*", reason = "not supported yet."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest", method = "*", reason = "not supported yet."),
+
+ /* Structure */
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateIdEquality", reason = "reference equals on EdgeId is not supported."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.EdgeTest$BasicEdgeTest", method = "shouldValidateEquality", reason = "reference equals on EdgeId is not supported."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.VertexTest$BasicVertexTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnEdge", reason = "S2Vertex.addEdge behave as upsert."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "reference equals is not supported."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPropertyTest", method = "shouldNotBeEqualPropertiesAsThereIsDifferentKey", reason = "reference equals is not supported."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveVertices", reason = "random label creation is not supported. all label need to be pre-configured."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldHaveExceptionConsistencyWhenAssigningSameIdOnVertex", reason = "Assigning the same ID to an Element update instead of throwing exception."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.GraphTest", method = "shouldRemoveEdges", reason = "random label creation is not supported. all label need to be pre-configured."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceEdgeTest", method = "shouldNotEvaluateToEqualDifferentId", reason = "Assigning the same ID to an Element update instead of throwing exception."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.1)", reason = "graphson-v2-embedded is not supported."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.5)", reason = "graphson-v2-embedded is not supported."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.5)", reason = "graphson-v2-embedded is not supported."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(NormalDistribution{stdDeviation=2.0, mean=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.1)", reason = "graphson-v2-embedded is not supported."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},PowerLawDistribution{gamma=2.4, multiplier=0.0},0.25)", reason = "graphson-v2-embedded is not supported."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.CommunityGeneratorTest$DifferentDistributionsTest", method = "shouldGenerateDifferentGraph", specific = "test(PowerLawDistribution{gamma=2.3, multiplier=0.0},NormalDistribution{stdDeviation=4.0, mean=0.0},0.25)", reason = "graphson-v2-embedded is not supported."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.algorithm.generator.DistributionGeneratorTest", method = "*", reason = "non-deterministic test."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GryoTest", method = "shouldSerializeTree", reason = "order of children is reversed. not sure why."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.SerializationTest$GraphSONTest", method = "shouldSerializeTraversalMetrics", reason = "expected 2, actual 3."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithBOTHEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithINEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexAsReferenceNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteVertexWithOUTEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoVertexTest", method = "shouldReadWriteDetachedVertexNoEdges", specific = "graphson-v2-embedded", reason = "Vertex.id() is deserialized as string, not class in graphson-v2-embedded."),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdgeAsReference", specific = "graphson-v2-embedded", reason = "no"),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteEdge", specific = "graphson-v2-embedded", reason = "no"),
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoEdgeTest", method = "shouldReadWriteDetachedEdge", specific = "graphson-v2-embedded", reason = "no"),
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoGraphTest", method = "*", reason = "no"), // all failed.
+
+ new Graph.OptOut(test = "org.apache.tinkerpop.gremlin.structure.io.IoTest", method = "*", reason = "no")
+))
+class S2GraphTp(config: Config)(override implicit val ec: ExecutionContext) extends S2Graph(config) {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
index ad35efd..97e3095 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
@@ -118,7 +118,8 @@ trait S2VertexLike extends Vertex with GraphElement {
props.put(key, newProps)
// FIXME: save to persistent for tp test
- graph.addVertexInner(this)
+// graph.addVertexInner(this)
+ graph.addVertex(id, ts, props, op, belongLabelIds)
newProps
case _ => throw new RuntimeException("only single cardinality is supported.")
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index 25b909e..7a8e63e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -1,14 +1,17 @@
package org.apache.s2graph.core
+import java.util
+
import org.apache.s2graph.core.S2Graph.{FilterHashKey, HashKey}
import org.apache.s2graph.core.mysqls.LabelMeta
-import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection, VertexId}
-import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.types.{HBaseType, InnerVal, LabelWithDirection, VertexId}
+import org.apache.s2graph.core.utils.{Extensions, logger}
+import org.apache.tinkerpop.gremlin.structure.Edge
import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.concurrent.Future
-import scala.util.Random
+import scala.util.{Random, Try}
object TraversalHelper {
@tailrec
@@ -125,6 +128,150 @@ object TraversalHelper {
class TraversalHelper(graph: S2GraphLike) {
import TraversalHelper._
+ implicit val ec = graph.ec
+ val MaxRetryNum = graph.config.getInt("max.retry.number")
+
+ def fallback = Future.successful(StepResult.Empty)
+
+ def getEdgesStepInner(q: Query, buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
+ Try {
+ if (q.steps.isEmpty) fallback
+ else {
+ def fetch: Future[StepResult] = {
+ val startStepInnerResult = QueryResult.fromVertices(graph, q)
+ q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
+ for {
+ prevStepInnerResult <- prevStepInnerResultFuture
+ currentStepInnerResult <- fetchStep(q, stepIdx, prevStepInnerResult, buildLastStepInnerResult)
+ } yield {
+ currentStepInnerResult.copy(
+ accumulatedCursors = prevStepInnerResult.accumulatedCursors :+ currentStepInnerResult.cursors,
+ failCount = currentStepInnerResult.failCount + prevStepInnerResult.failCount
+ )
+ }
+ }
+ }
+
+ fetch
+ }
+ } recover {
+ case e: Exception =>
+ logger.error(s"getEdgesAsync: $e", e)
+ fallback
+ } get
+ }
+
+ def fetchStep(orgQuery: Query,
+ stepIdx: Int,
+ stepInnerResult: StepResult,
+ buildLastStepInnerResult: Boolean = false): Future[StepResult] = {
+ if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty)
+ else {
+ val (_, prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) =
+ graph.traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, stepInnerResult)
+
+ val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges)
+
+ graph.traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests,
+ fetchedLs, orgQuery.steps(stepIdx).queryParams, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(graph.ec)
+ }
+ }
+
+ def fetches(queryRequests: Seq[QueryRequest],
+ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = {
+
+ val reqWithIdxs = queryRequests.zipWithIndex
+ val requestsPerLabel = reqWithIdxs.groupBy(t => t._1.queryParam.label)
+ val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
+ for {
+ prev <- prevFuture
+ cur <- graph.getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
+ } yield {
+ prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
+ }
+ }
+ aggFuture.map { agg => agg.toSeq.sortBy(_._1).map(_._2) }
+ }
+
+ def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
+ val futures = queries.map(getEdgesStepInner(_, true))
+ val future = for {
+ stepInnerResultLs <- Future.sequence(futures)
+ (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
+ } yield {
+ // logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
+ (allDeleted, ret)
+ }
+
+ Extensions.retryOnFailure(MaxRetryNum) {
+ future
+ } {
+ logger.error(s"fetch and deleteAll failed.")
+ (true, false)
+ }
+
+ }
+
+ def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
+ requestTs: Long): Future[(Boolean, Boolean)] = {
+ stepInnerResultLs.foreach { stepInnerResult =>
+ if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
+ }
+ val futures = for {
+ stepInnerResult <- stepInnerResultLs
+ filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
+ (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
+ }
+ edgesToDelete = graph.elementBuilder.buildEdgesToDelete(filtered, requestTs)
+ if edgesToDelete.nonEmpty
+ } yield {
+ val head = edgesToDelete.head
+ val label = head.edge.innerLabel
+ val stepResult = StepResult(edgesToDelete, Nil, Nil, false)
+ val ret = label.schemaVersion match {
+ case HBaseType.VERSION3 | HBaseType.VERSION4 =>
+ if (label.consistencyLevel == "strong") {
+ /*
+ * read: snapshotEdge on queryResult = O(N)
+ * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
+ */
+ graph.mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
+ } else {
+ graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+ }
+ case _ =>
+
+ /*
+ * read: x
+ * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
+ */
+ graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+ }
+ ret
+ }
+
+ if (futures.isEmpty) {
+ // all deleted.
+ Future.successful(true -> true)
+ } else {
+ Future.sequence(futures).map { rets => false -> rets.forall(identity) }
+ }
+ }
+
+ def fetchEdgesAsync(vertex: S2VertexLike, labelNameWithDirs: Seq[(String, String)]): Future[util.Iterator[Edge]] = {
+ val queryParams = labelNameWithDirs.map { case (l, direction) =>
+ QueryParam(labelName = l, direction = direction.toLowerCase)
+ }
+
+ val query = Query.toQuery(Seq(vertex), queryParams)
+ val queryRequests = queryParams.map { param => QueryRequest(query, 0, vertex, param) }
+ val ls = new util.ArrayList[Edge]()
+ fetches(queryRequests, Map.empty).map { stepResultLs =>
+ stepResultLs.foreach(_.edgeWithScores.foreach(es => ls.add(es.edge)))
+ ls.iterator()
+ }
+ }
+
def buildNextStepQueryRequests(orgQuery: Query, stepIdx: Int, stepInnerResult: StepResult) = {
val edgeWithScoreLs = stepInnerResult.edgeWithScores
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 54b865a..55a3638 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -269,7 +269,7 @@ class RequestParser(graph: S2GraphLike) {
id <- ids
innerId <- jsValueToInnerVal(id, serviceColumn.columnType, label.schemaVersion)
} yield {
- graph.newVertex(SourceVertexId(serviceColumn, innerId), System.currentTimeMillis())
+ graph.elementBuilder.newVertex(SourceVertexId(serviceColumn, innerId), System.currentTimeMillis())
}
vertices
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 6ebc90c..945f246 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -36,6 +36,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
+ val builder = graph.elementBuilder
override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = {
@@ -61,7 +62,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
else {
val label = Label.findById(labelWithDir.labelId)
val schemaVer = label.schemaVersion
- val srcVertex = graph.newVertex(srcVertexId, version)
+ val srcVertex = builder.newVertex(srcVertexId, version)
var tsVal = version
val isTallSchema = tallSchemaVersions(label.schemaVersion)
val isDegree = if (isTallSchema) pos == kv.row.length else kv.qualifier.isEmpty
@@ -71,13 +72,13 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
// val degreeVal = Bytes.toLong(kv.value)
val degreeVal = bytesToLongFunc(kv.value, 0)
val tgtVertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("0", schemaVer))
- val tgtVertex = graph.newVertex(tgtVertexId, version)
- val edge = graph.newEdge(srcVertex, tgtVertex,
+ val tgtVertex = builder.newVertex(tgtVertexId, version)
+ val edge = graph.elementBuilder.newEdge(srcVertex, tgtVertex,
label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState)
edge.propertyInner(LabelMeta.timestamp.name, version, version)
edge.propertyInner(LabelMeta.degree.name, degreeVal, version)
- edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+ edge.tgtVertex = builder.newVertex(tgtVertexId, version)
edge.setOp(GraphUtil.defaultOpByte)
edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
@@ -113,8 +114,8 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
else kv.qualifier(kv.qualifier.length - 1)
}
- val tgtVertex = graph.newVertex(tgtVertexIdRaw, version)
- val edge = graph.newEdge(srcVertex, tgtVertex,
+ val tgtVertex = builder.newVertex(tgtVertexIdRaw, version)
+ val edge = graph.elementBuilder.newEdge(srcVertex, tgtVertex,
label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState)
val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
@@ -150,7 +151,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
if (edge.checkProperty(LabelMeta.to.name)) {
val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
- val tgtVertex = graph.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version)
+ val tgtVertex = builder.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version)
edge.setTgtVertex(tgtVertex)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 79e3f2e..e533b4b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -32,6 +32,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
+ val builder = graph.elementBuilder
override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = {
@@ -55,20 +56,20 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
else {
val label = Label.findById(labelWithDir.labelId)
val schemaVer = label.schemaVersion
- val srcVertex = graph.newVertex(srcVertexId, version)
+ val srcVertex = builder.newVertex(srcVertexId, version)
//TODO:
var tsVal = version
if (kv.qualifier.isEmpty) {
val degreeVal = bytesToLongFunc(kv.value, 0)
val tgtVertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("0", schemaVer))
- val tgtVertex = graph.newVertex(tgtVertexId, version)
- val edge = graph.newEdge(srcVertex, tgtVertex,
+ val tgtVertex = builder.newVertex(tgtVertexId, version)
+ val edge = builder.newEdge(srcVertex, tgtVertex,
label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState)
edge.propertyInner(LabelMeta.timestamp.name, version, version)
edge.propertyInner(LabelMeta.degree.name, degreeVal, version)
- edge.tgtVertex = graph.newVertex(tgtVertexId, version)
+ edge.tgtVertex = builder.newVertex(tgtVertexId, version)
edge.setOp(GraphUtil.defaultOpByte)
edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
@@ -88,8 +89,8 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
if (kv.qualifier.length == pos) GraphUtil.defaultOpByte
else kv.qualifier(kv.qualifier.length-1)
- val tgtVertex = graph.newVertex(tgtVertexIdRaw, version)
- val edge = graph.newEdge(srcVertex, tgtVertex,
+ val tgtVertex = builder.newVertex(tgtVertexIdRaw, version)
+ val edge = builder.newEdge(srcVertex, tgtVertex,
label, labelWithDir.dir, GraphUtil.defaultOpByte, version, S2Edge.EmptyState)
val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
@@ -124,7 +125,7 @@ class IndexEdgeDeserializable(graph: S2GraphLike,
/* process tgtVertexId */
if (edge.checkProperty(LabelMeta.to.name)) {
val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
- val tgtVertex = graph.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version)
+ val tgtVertex = builder.newVertex(TargetVertexId(ServiceColumn.Default, vId.innerVal), version)
edge.setTgtVertex(tgtVertex)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index 4c97f6e..b7f5ba1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -28,7 +28,7 @@ import org.apache.s2graph.core._
import org.apache.s2graph.core.storage.serde.Deserializable
class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[SnapshotEdge] {
-
+ val builder = graph.elementBuilder
def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
val statusCode = byte >> 4
val op = byte & ((1 << 4) - 1)
@@ -89,15 +89,19 @@ class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[Snap
val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
val pendingEdge =
- graph.newEdge(graph.newVertex(srcVertexId, version),
- graph.newVertex(tgtVertexId, version),
+ builder.newEdge(
+ builder.newVertex(srcVertexId, version),
+ builder.newVertex(tgtVertexId, version),
label, labelWithDir.dir, pendingEdgeOp,
version, pendingEdgeProps.toMap,
statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
+
Option(pendingEdge)
}
- val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
+ val snapshotEdge = builder.newSnapshotEdge(
+ builder.newVertex(srcVertexId, ts),
+ builder.newVertex(tgtVertexId, ts),
label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode,
pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index a6a64a2..7dec6d9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -28,7 +28,7 @@ import org.apache.s2graph.core._
import org.apache.s2graph.core.storage.serde.Deserializable
class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[SnapshotEdge] {
-
+ val builder = graph.elementBuilder
def statusCodeWithOp(byte: Byte): (Byte, Byte) = {
val statusCode = byte >> 4
val op = byte & ((1 << 4) - 1)
@@ -55,7 +55,7 @@ class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[Snap
else {
val label = Label.findById(labelWithDir.labelId)
val schemaVer = label.schemaVersion
- val srcVertex = graph.newVertex(srcVertexId, version)
+ val srcVertex = builder.newVertex(srcVertexId, version)
val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer)
@@ -80,15 +80,18 @@ class SnapshotEdgeDeserializable(graph: S2GraphLike) extends Deserializable[Snap
val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
val pendingEdge =
- graph.newEdge(graph.newVertex(srcVertexId, version),
- graph.newVertex(tgtVertexId, version),
+ builder.newEdge(
+ builder.newVertex(srcVertexId, version),
+ builder.newVertex(tgtVertexId, version),
label, labelWithDir.dir, pendingEdgeOp,
version, pendingEdgeProps.toMap,
statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
Option(pendingEdge)
}
- val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
+ val snapshotEdge = builder.newSnapshotEdge(
+ builder.newVertex(srcVertexId, ts),
+ builder.newVertex(tgtVertexId, ts),
label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode,
pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
index 70160a8..5576017 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/tall/VertexDeserializable.scala
@@ -28,6 +28,7 @@ import org.apache.s2graph.core.{S2Graph, S2GraphLike, S2Vertex, S2VertexLike}
class VertexDeserializable(graph: S2GraphLike,
bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[S2VertexLike] {
+ val builder = graph.elementBuilder
def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
cacheElementOpt: Option[S2VertexLike]): Option[S2VertexLike] = {
try {
@@ -47,7 +48,7 @@ class VertexDeserializable(graph: S2GraphLike,
propsMap += (columnMeta -> innerVal)
}
- val vertex = graph.newVertex(vertexId, kv.timestamp, S2Vertex.EmptyProps, belongLabelIds = Nil)
+ val vertex = builder.newVertex(vertexId, kv.timestamp, S2Vertex.EmptyProps, belongLabelIds = Nil)
S2Vertex.fillPropsWithTs(vertex, propsMap.toMap)
Option(vertex)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
index 002f577..d1d4d7d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/wide/VertexDeserializable.scala
@@ -30,6 +30,7 @@ import scala.collection.mutable.ListBuffer
class VertexDeserializable(graph: S2GraphLike,
bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[S2VertexLike] {
+ val builder = graph.elementBuilder
def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
cacheElementOpt: Option[S2VertexLike]): Option[S2VertexLike] = {
try {
@@ -62,7 +63,7 @@ class VertexDeserializable(graph: S2GraphLike,
}
}
assert(maxTs != Long.MinValue)
- val vertex = graph.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds)
+ val vertex = builder.newVertex(vertexId, maxTs, S2Vertex.EmptyProps, belongLabelIds = belongLabelIds)
S2Vertex.fillPropsWithTs(vertex, propsMap.toMap)
Option(vertex)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
index 94883c9..2b439bc 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/S2EdgeTest.scala
@@ -66,14 +66,14 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
test("buildOperation") {
val schemaVersion = "v2"
val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
- val srcVertex = graph.newVertex(vertexId)
+ val srcVertex = builder.newVertex(vertexId)
val tgtVertex = srcVertex
val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
val snapshotEdge = None
val propsWithTs = Map(timestampProp)
- val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
+ val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
val newVersion = 0L
@@ -93,14 +93,14 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
test("buildMutation: snapshotEdge: None with newProps") {
val schemaVersion = "v2"
val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
- val srcVertex = graph.newVertex(vertexId)
+ val srcVertex = builder.newVertex(vertexId)
val tgtVertex = srcVertex
val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
val snapshotEdge = None
val propsWithTs = Map(timestampProp)
- val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
+ val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
val newVersion = 0L
@@ -120,14 +120,14 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
test("buildMutation: oldPropsWithTs == newPropsWithTs, Drop all requests") {
val schemaVersion = "v2"
val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
- val srcVertex = graph.newVertex(vertexId)
+ val srcVertex = builder.newVertex(vertexId)
val tgtVertex = srcVertex
val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
val snapshotEdge = None
val propsWithTs = Map(timestampProp)
- val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
+ val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
val newVersion = 0L
@@ -144,7 +144,7 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
test("buildMutation: All props older than snapshotEdge's LastDeletedAt") {
val schemaVersion = "v2"
val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
- val srcVertex = graph.newVertex(vertexId)
+ val srcVertex = builder.newVertex(vertexId)
val tgtVertex = srcVertex
val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
@@ -159,12 +159,12 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3)
)
- val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs)
+ val _snapshotEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs)
val snapshotEdge = Option(_snapshotEdge)
- val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
+ val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
val newVersion = 0L
val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)
@@ -178,7 +178,7 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
test("buildMutation: All props newer than snapshotEdge's LastDeletedAt") {
val schemaVersion = "v2"
val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
- val srcVertex = graph.newVertex(vertexId)
+ val srcVertex = builder.newVertex(vertexId)
val tgtVertex = srcVertex
val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
@@ -193,11 +193,11 @@ class S2EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
LabelMeta.lastDeletedAt -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 3)
)
- val _snapshotEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs)
+ val _snapshotEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, op = GraphUtil.operations("delete"), propsWithTs = propsWithTs)
val snapshotEdge = Option(_snapshotEdge)
- val requestEdge = graph.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
+ val requestEdge = builder.newEdge(srcVertex, tgtVertex, labelV2, labelWithDirV2.dir, propsWithTs = propsWithTs)
val newVersion = 0L
val edgeMutate = S2Edge.buildMutation(snapshotEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/eabe7570/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
index 4614bed..6ac77e4 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/TestCommonWithModels.scala
@@ -35,12 +35,13 @@ trait TestCommonWithModels {
var graph: S2Graph = _
var config: Config = _
var management: Management = _
+ var builder: GraphElementBuilder = _
def initTests() = {
config = ConfigFactory.load()
graph = new S2Graph(config)(ExecutionContext.Implicits.global)
management = new Management(graph)
-
+ builder = graph.elementBuilder
implicit val session = AutoSession
deleteTestLabel()