You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/11/19 02:28:52 UTC
[09/23] incubator-s2graph git commit: apply MutationHelper and
GraphElementBuilder.
apply MutationHelper and GraphElementBuilder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/aa66822b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/aa66822b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/aa66822b
Branch: refs/heads/master
Commit: aa66822b08d0045e3870af2a9b82523947f553ce
Parents: 937b55a
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Nov 3 21:41:57 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Nov 4 07:01:07 2017 +0900
----------------------------------------------------------------------
.../loader/subscriber/GraphSubscriber.scala | 2 +-
.../loader/subscriber/TransferToHFile.scala | 10 +-
.../s2graph/loader/subscriber/WalLogStat.scala | 3 +-
.../loader/subscriber/WalLogToHDFS.scala | 3 +-
.../scala/org/apache/s2graph/core/S2Graph.scala | 718 ++-----------------
.../org/apache/s2graph/core/S2GraphLike.scala | 238 ++++++
.../s2graph/core/features/S2Features.scala | 19 +
.../apache/s2graph/core/storage/Storage.scala | 112 +--
.../hbase/AsynchbaseStorageReadable.scala | 2 +-
.../tall/SnapshotEdgeDeserializable.scala | 2 +-
.../wide/SnapshotEdgeDeserializable.scala | 2 +-
.../core/storage/hbase/IndexEdgeTest.scala | 4 +-
12 files changed, 386 insertions(+), 729 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
index 2352cdf..a371b6b 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -106,7 +106,7 @@ object GraphSubscriberHelper extends WithKafka {
(statFunc: (String, Int) => Unit): Iterable[GraphElement] = {
(for (msg <- msgs) yield {
statFunc("total", 1)
- g.toGraphElement(msg, labelMapping) match {
+ g.elementBuilder.toGraphElement(msg, labelMapping) match {
case Some(e) if e.isInstanceOf[S2Edge] =>
statFunc("EdgeParseOk", 1)
e.asInstanceOf[S2Edge]
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index 618c1bd..a7b4e00 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -84,11 +84,11 @@ object TransferToHFile extends SparkApp {
} yield output
}
def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = {
- val kvs = GraphSubscriberHelper.g.getStorage(snapshotEdge.label).snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList
+ val kvs = GraphSubscriberHelper.g.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList
kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) }
}
def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = {
- val kvs = GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.toList
+ val kvs = GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList
kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) }
}
def buildDegreePutRequests(vertexId: String, labelName: String, direction: String, degreeVal: Long): List[PutRequest] = {
@@ -104,7 +104,7 @@ object TransferToHFile extends SparkApp {
val edge = GraphSubscriberHelper.g.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs)
edge.edgesWithIndex.flatMap { indexEdge =>
- GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.map { kv =>
+ GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map { kv =>
new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], Bytes.toBytes(degreeVal), kv.timestamp)
}
}
@@ -125,7 +125,7 @@ object TransferToHFile extends SparkApp {
def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = {
val kvList = new java.util.ArrayList[KeyValue]
for (s <- strs) {
- val elementList = GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq
+ val elementList = GraphSubscriberHelper.g.elementBuilder.toGraphElement(s, labelMapping).toSeq
for (element <- elementList) {
if (element.isInstanceOf[S2Edge]) {
val edge = element.asInstanceOf[S2Edge]
@@ -136,7 +136,7 @@ object TransferToHFile extends SparkApp {
}
} else if (element.isInstanceOf[S2Vertex]) {
val vertex = element.asInstanceOf[S2Vertex]
- val putRequestList = GraphSubscriberHelper.g.getStorage(vertex.service).vertexSerializer(vertex).toKeyValues.map { kv =>
+ val putRequestList = GraphSubscriberHelper.g.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues.map { kv =>
new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp)
}
for (p <- putRequestList) {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
index 40f936d..eca77f9 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
@@ -21,7 +21,6 @@ package org.apache.s2graph.loader.subscriber
import kafka.producer.KeyedMessage
import kafka.serializer.StringDecoder
-import org.apache.s2graph.core.S2Graph$
import org.apache.s2graph.spark.spark.{WithKafka, SparkApp}
import org.apache.spark.streaming.Durations._
import org.apache.spark.streaming.kafka.HasOffsetRanges
@@ -69,7 +68,7 @@ object WalLogStat extends SparkApp with WithKafka {
val phase = System.getProperty("phase")
GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
partition.map { case (key, msg) =>
- GraphSubscriberHelper.g.toGraphElement(msg) match {
+ GraphSubscriberHelper.g.elementBuilder.toGraphElement(msg) match {
case Some(elem) =>
val serviceName = elem.serviceName
msg.split("\t", 7) match {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
index 23c3cda..605a994 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
@@ -22,7 +22,6 @@ package org.apache.s2graph.loader.subscriber
import java.text.SimpleDateFormat
import java.util.Date
import kafka.serializer.StringDecoder
-import org.apache.s2graph.core.S2Graph$
import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.Durations._
@@ -92,7 +91,7 @@ object WalLogToHDFS extends SparkApp with WithKafka {
GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList)
partition.flatMap { case (key, msg) =>
- val optMsg = GraphSubscriberHelper.g.toGraphElement(msg).flatMap { element =>
+ val optMsg = GraphSubscriberHelper.g.elementBuilder.toGraphElement(msg).flatMap { element =>
val arr = msg.split("\t", 7)
val service = element.serviceName
val label = arr(5)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 82d0c6a..5e23f9b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -483,11 +483,11 @@ object S2Graph {
new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoTest", method="*", reason="no")
// all failed.
))
-class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph {
+class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2GraphLike {
import S2Graph._
- private var apacheConfiguration: Configuration = _
+ var apacheConfiguration: Configuration = _
def dbSession() = scalikejdbc.AutoSession
@@ -575,6 +575,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
val indexProvider = IndexProvider.apply(config)
+ val elementBuilder = new GraphElementBuilder(this)
+
def getStorage(service: Service): Storage = {
storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
}
@@ -639,8 +641,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
Try {
if (q.steps.isEmpty) fallback
else {
-
- val queryOption = q.queryOption
def fetch: Future[StepResult] = {
val startStepInnerResult = QueryResult.fromVertices(this, q)
q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) =>
@@ -795,7 +795,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
Query(vertices, Vector(step))
}
- // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) {
val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) {
fetchAndDeleteAll(queries, requestTs)
} { case (allDeleted, deleteSuccess) =>
@@ -832,16 +831,19 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
requestTs: Long): Future[(Boolean, Boolean)] = {
stepInnerResultLs.foreach { stepInnerResult =>
- logger.error(s"[!!!!!!]: ${stepInnerResult.edgeWithScores.size}")
if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
}
val futures = for {
stepInnerResult <- stepInnerResultLs
- deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs)
- if deleteStepInnerResult.edgeWithScores.nonEmpty
+ filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
+ (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
+ }
+ edgesToDelete = elementBuilder.buildEdgesToDelete(filtered, requestTs)
+ if edgesToDelete.nonEmpty
} yield {
- val head = deleteStepInnerResult.edgeWithScores.head
+ val head = edgesToDelete.head
val label = head.edge.innerLabel
+ val stepResult = StepResult(edgesToDelete, Nil, Nil, false)
val ret = label.schemaVersion match {
case HBaseType.VERSION3 | HBaseType.VERSION4 =>
if (label.consistencyLevel == "strong") {
@@ -849,9 +851,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
* read: snapshotEdge on queryResult = O(N)
* write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
*/
- mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
+ mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
} else {
- deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum)
+ getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
}
case _ =>
@@ -859,7 +861,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
* read: x
* write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
*/
- deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum)
+ getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
}
ret
}
@@ -872,71 +874,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
}
- private def deleteAllFetchedEdgesAsyncOld(storage: Storage)(stepInnerResult: StepResult,
- requestTs: Long,
- retryNum: Int): Future[Boolean] = {
- if (stepInnerResult.isEmpty) Future.successful(true)
- else {
- val head = stepInnerResult.edgeWithScores.head
- val zkQuorum = head.edge.innerLabel.hbaseZkAddr
- val futures = for {
- edgeWithScore <- stepInnerResult.edgeWithScores
- } yield {
- val edge = edgeWithScore.edge
- val score = edgeWithScore.score
-
- val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
- val reversedSnapshotEdgeMutations = storage.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
-
- val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
- val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
- storage.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- storage.buildIncrementsAsync(indexEdge, -1L)
- }
-
- /* reverted direction */
- val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
- val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
- storage.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- storage.buildIncrementsAsync(indexEdge, -1L)
- }
-
- val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-
- storage.writeToStorage(zkQuorum, mutations, withWait = true)
- }
-
- Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
- }
- }
-
- def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = {
- val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
- (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
- }
- if (filtered.isEmpty) StepResult.Empty
- else {
- val head = filtered.head
- val label = head.edge.innerLabel
- val edgeWithScoreLs = filtered.map { edgeWithScore =>
- val edge = edgeWithScore.edge
- val copiedEdge = label.consistencyLevel match {
- case "strong" =>
- edge.copyEdge(op = GraphUtil.operations("delete"),
- version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
- case _ =>
- edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
- }
-
- val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
- // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
- edgeToDelete
- }
- //Degree edge?
- StepResult(edgeWithScores = edgeWithScoreLs, grouped = Nil, degreeEdges = Nil, false)
- }
- }
-
def mutateElements(elements: Seq[GraphElement],
withWait: Boolean = false): Future[Seq[MutateResponse]] = {
@@ -967,8 +904,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
- // def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
-
def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
val edgeWithIdxs = edges.zipWithIndex
@@ -985,20 +920,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
val idxs = edgeGroup.map(_._2)
/* multiple edges with weak consistency level will be processed as batch */
- val mutations = edges.flatMap { edge =>
- val (_, edgeUpdate) =
- if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
- else S2Edge.buildOperation(None, Seq(edge))
-
- val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy)
-
- if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
- storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
- }
-
- storage.writeToStorage(zkQuorum, mutations, withWait).map { ret =>
- idxs.map(idx => idx -> ret.isSuccess)
- }
+ storage.mutateWeakEdges(zkQuorum, edges, withWait)
}
Future.sequence(futures)
}
@@ -1012,7 +934,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
val storage = getStorage(label)
- mutateStrongEdges(storage)(edges, withWait = true).map { rets =>
+ val zkQuorum = label.hbaseZkAddr
+ storage.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
idxs.zip(rets)
}
}
@@ -1026,119 +949,24 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
}
- private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[Boolean]] = {
-
- val edgeWithIdxs = _edges.zipWithIndex
- val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
- (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
- } toSeq
-
- val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
- val edges = edgeGroup.map(_._1)
- val idxs = edgeGroup.map(_._2)
- // After deleteAll, process others
- val mutateEdgeFutures = edges.toList match {
- case head :: tail =>
- val edgeFuture = mutateEdgesInner(storage)(edges, checkConsistency = true , withWait)
-
- //TODO: decide what we will do on failure on vertex put
- val puts = storage.buildVertexPutsAsync(head)
- val vertexFuture = storage.writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
- Seq(edgeFuture, vertexFuture)
- case Nil => Nil
- }
-
- val composed = for {
- // deleteRet <- Future.sequence(deleteAllFutures)
- mutateRet <- Future.sequence(mutateEdgeFutures)
- } yield mutateRet
-
- composed.map(_.forall(_.isSuccess)).map { ret => idxs.map( idx => idx -> ret) }
- }
-
- Future.sequence(mutateEdges).map { squashedRets =>
- squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
- }
- }
-
-
- private def mutateEdgesInner(storage: Storage)(edges: Seq[S2EdgeLike],
- checkConsistency: Boolean,
- withWait: Boolean): Future[MutateResponse] = {
- assert(edges.nonEmpty)
- // TODO:: remove after code review: unreachable code
- if (!checkConsistency) {
-
- val zkQuorum = edges.head.innerLabel.hbaseZkAddr
- val futures = edges.map { edge =>
- val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
-
- val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy)
- val mutations =
- storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-
- if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
-
- storage.writeToStorage(zkQuorum, mutations, withWait)
- }
- Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
- } else {
- storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
- storage.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
- }
- }
- }
-
def mutateVertices(vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
- def mutateVertex(storage: Storage)(vertex: S2VertexLike, withWait: Boolean): Future[MutateResponse] = {
- if (vertex.op == GraphUtil.operations("delete")) {
- storage.writeToStorage(vertex.hbaseZkAddr,
- storage.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
- } else if (vertex.op == GraphUtil.operations("deleteAll")) {
- logger.info(s"deleteAll for vertex is truncated. $vertex")
- Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
- } else {
- storage.writeToStorage(vertex.hbaseZkAddr, storage.buildPutsAll(vertex), withWait)
- }
- }
-
- def mutateVertices(storage: Storage)(vertices: Seq[S2VertexLike],
- withWait: Boolean = false): Future[Seq[MutateResponse]] = {
- val futures = vertices.map { vertex => mutateVertex(storage)(vertex, withWait) }
+ def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike],
+ withWait: Boolean = false): Future[Seq[MutateResponse]] = {
+ val futures = vertices.map { vertex => storage.mutateVertex(zkQuorum, vertex, withWait) }
Future.sequence(futures)
}
val verticesWithIdx = vertices.zipWithIndex
val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
- mutateVertices(getStorage(service))(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
+ mutateVertices(getStorage(service))(service.cluster, vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
}
Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
}
-
-
def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
- def incrementCounts(storage: Storage)(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
- val futures = for {
- edge <- edges
- } yield {
- val kvs = for {
- relEdge <- edge.relatedEdges
- edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
- } yield {
- val countWithTs = edge.propertyValueInner(LabelMeta.count)
- val countVal = countWithTs.innerVal.toString().toLong
- storage.buildIncrementsCountAsync(edgeWithIndex, countVal).head
- }
- storage.writeToStorage(edge.innerLabel.hbaseZkAddr, kvs, withWait = withWait)
- }
-
- Future.sequence(futures)
- }
-
val edgesWithIdx = edges.zipWithIndex
val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
- incrementCounts(getStorage(label))(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+ getStorage(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
}
Future.sequence(futures).map { ls =>
ls.flatten.toSeq.sortBy(_._2).map(_._1)
@@ -1147,11 +975,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = {
val label = edge.innerLabel
-
val storage = getStorage(label)
- val kvs = storage.buildDegreePuts(edge, degreeVal)
- storage.writeToStorage(edge.innerLabel.service.cluster, kvs, withWait = true)
+ storage.updateDegree(label.hbaseZkAddr, edge, degreeVal)
}
def isRunning(): Boolean = running.get()
@@ -1164,166 +990,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
localLongId.set(0l)
}
- def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try {
- val parts = GraphUtil.split(s)
- val logType = parts(2)
- val element = if (logType == "edge" | logType == "e") {
- /* current only edge is considered to be bulk loaded */
- labelMapping.get(parts(5)) match {
- case None =>
- case Some(toReplace) =>
- parts(5) = toReplace
- }
- toEdge(parts)
- } else if (logType == "vertex" | logType == "v") {
- toVertex(parts)
- } else {
- throw new GraphExceptions.JsonParseException("log type is not exist in log.")
- }
-
- element
- } recover {
- case e: Exception =>
- logger.error(s"[toElement]: $e", e)
- None
- } get
-
-
- def toVertex(s: String): Option[S2VertexLike] = {
- toVertex(GraphUtil.split(s))
- }
-
- def toEdge(s: String): Option[S2EdgeLike] = {
- toEdge(GraphUtil.split(s))
- }
-
- def toEdge(parts: Array[String]): Option[S2EdgeLike] = Try {
- val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
- val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
- val tempDirection = if (parts.length >= 8) parts(7) else "out"
- val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection
- val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation)
- Option(edge)
- } recover {
- case e: Exception =>
- logger.error(s"[toEdge]: $e", e)
- throw e
- } get
-
- def toVertex(parts: Array[String]): Option[S2VertexLike] = Try {
- val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
- val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
- val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
- Option(vertex)
- } recover {
- case e: Throwable =>
- logger.error(s"[toVertex]: $e", e)
- throw e
- } get
-
- def toEdge(srcId: Any,
- tgtId: Any,
- labelName: String,
- direction: String,
- props: Map[String, Any] = Map.empty,
- ts: Long = System.currentTimeMillis(),
- operation: String = "insert"): S2EdgeLike = {
- val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
-
- val srcColumn = if (direction == "out") label.srcColumn else label.tgtColumn
- val tgtColumn = if (direction == "out") label.tgtColumn else label.srcColumn
-
- val srcVertexIdInnerVal = toInnerVal(srcId, srcColumn.columnType, label.schemaVersion)
- val tgtVertexIdInnerVal = toInnerVal(tgtId, tgtColumn.columnType, label.schemaVersion)
-
- val srcVertex = newVertex(SourceVertexId(srcColumn, srcVertexIdInnerVal), System.currentTimeMillis())
- val tgtVertex = newVertex(TargetVertexId(tgtColumn, tgtVertexIdInnerVal), System.currentTimeMillis())
- val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
-
- val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
- val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
- val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
-
- new S2Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs)
- }
-
- def toVertex(serviceName: String,
- columnName: String,
- id: Any,
- props: Map[String, Any] = Map.empty,
- ts: Long = System.currentTimeMillis(),
- operation: String = "insert"): S2VertexLike = {
-
- val service = Service.findByName(serviceName).getOrElse(throw new java.lang.IllegalArgumentException(s"$serviceName is not found."))
- val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new java.lang.IllegalArgumentException(s"$columnName is not found."))
- val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
-
- val srcVertexId = id match {
- case vid: VertexId => id.asInstanceOf[VertexId]
- case _ => VertexId(column, toInnerVal(id, column.columnType, column.schemaVersion))
- }
-
- val propsInner = column.propsToInnerVals(props) ++
- Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion))
-
- val vertex = new S2Vertex(this, srcVertexId, ts, S2Vertex.EmptyProps, op)
- S2Vertex.fillPropsWithTs(vertex, propsInner)
- vertex
- }
-
- def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2EdgeLike = {
- val srcVertex = queryRequest.vertex
- val queryParam = queryRequest.queryParam
- val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
- val label = queryParam.label
- val labelWithDir = queryParam.labelWithDir
- val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir)
- val propsWithTs = label.EmptyPropsWithTs
-
- tgtVertexIdOpt match {
- case Some(tgtVertexId) => // _to is given.
- /* we use toSnapshotEdge so dont need to swap src, tgt */
- val src = srcVertex.innerId
- val tgt = tgtVertexId
- val (srcVId, tgtVId) = (SourceVertexId(srcColumn, src), TargetVertexId(tgtColumn, tgt))
- val (srcV, tgtV) = (newVertex(srcVId), newVertex(tgtVId))
-
- newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs)
- case None =>
- val src = srcVertex.innerId
- val srcVId = SourceVertexId(srcColumn, src)
- val srcV = newVertex(srcVId)
-
- newEdge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges)
- }
- }
- /**
- * helper to create new Edge instance from given parameters on memory(not actually stored in storage).
- *
- * Since we are using mutable map for property value(propsWithTs),
- * we should make sure that reference for mutable map never be shared between multiple Edge instances.
- * To guarantee this, we never create Edge directly, but rather use this helper which is available on S2Graph.
- *
- * Note that we are using following convention
- * 1. `add*` for method that actually store instance into storage,
- * 2. `new*` for method that only create instance on memory, but not store it into storage.
- *
- * @param srcVertex
- * @param tgtVertex
- * @param innerLabel
- * @param dir
- * @param op
- * @param version
- * @param propsWithTs
- * @param parentEdges
- * @param originalEdgeOpt
- * @param pendingEdgeOpt
- * @param statusCode
- * @param lockTs
- * @param tsInnerValOpt
- * @return
- */
def newEdge(srcVertex: S2VertexLike,
tgtVertex: S2VertexLike,
innerLabel: Label,
@@ -1336,89 +1003,22 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
pendingEdgeOpt: Option[S2EdgeLike] = None,
statusCode: Byte = 0,
lockTs: Option[Long] = None,
- tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = {
- val edge = S2Edge(
- this,
- srcVertex,
- tgtVertex,
- innerLabel,
- dir,
- op,
- version,
- S2Edge.EmptyProps,
- parentEdges,
- originalEdgeOpt,
- pendingEdgeOpt,
- statusCode,
- lockTs,
- tsInnerValOpt)
- S2Edge.fillPropsWithTs(edge, propsWithTs)
- edge
- }
-
- /**
- * helper to create new SnapshotEdge instance from given parameters on memory(not actually stored in storage).
- *
- * Note that this is only available to S2Graph, not structure.Graph so only internal code should use this method.
- * @param srcVertex
- * @param tgtVertex
- * @param label
- * @param dir
- * @param op
- * @param version
- * @param propsWithTs
- * @param pendingEdgeOpt
- * @param statusCode
- * @param lockTs
- * @param tsInnerValOpt
- * @return
- */
- private[core] def newSnapshotEdge(srcVertex: S2VertexLike,
- tgtVertex: S2VertexLike,
- label: Label,
- dir: Int,
- op: Byte,
- version: Long,
- propsWithTs: S2Edge.State,
- pendingEdgeOpt: Option[S2EdgeLike],
- statusCode: Byte = 0,
- lockTs: Option[Long],
- tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
- val snapshotEdge = new SnapshotEdge(this, srcVertex, tgtVertex, label, dir, op, version, S2Edge.EmptyProps,
- pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
- S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs)
- snapshotEdge
- }
-
- def newVertexId(serviceName: String)(columnName: String)(id: Any): VertexId = {
- val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
- val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
- newVertexId(service, column, id)
- }
+ tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike =
+ elementBuilder.newEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs,
+ parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
- /**
- * helper to create S2Graph's internal VertexId instance with given parameters.
- * @param service
- * @param column
- * @param id
- * @return
- */
def newVertexId(service: Service,
column: ServiceColumn,
- id: Any): VertexId = {
- val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(id)(column.schemaVersion)
- new VertexId(column, innerVal)
- }
+ id: Any): VertexId =
+ elementBuilder.newVertexId(service, column, id)
def newVertex(id: VertexId,
ts: Long = System.currentTimeMillis(),
props: S2Vertex.Props = S2Vertex.EmptyProps,
op: Byte = 0,
- belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = {
- val vertex = new S2Vertex(this, id, ts, S2Vertex.EmptyProps, op, belongLabelIds)
- S2Vertex.fillPropsWithTs(vertex, props)
- vertex
- }
+ belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike =
+ elementBuilder.newVertex(id, ts, props, op, belongLabelIds)
+
def getVertex(vertexId: VertexId): Option[S2VertexLike] = {
val v = newVertex(vertexId)
@@ -1441,248 +1041,26 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
stepResultLs.foreach(_.edgeWithScores.foreach(es => ls.add(es.edge)))
ls.iterator()
}
-// getEdges(query).map { stepResult =>
-// val ls = new util.ArrayList[Edge]()
-// stepResult.edgeWithScores.foreach(es => ls.add(es.edge))
-// ls.iterator()
-// }
- }
-
- /**
- * used by graph.traversal().V()
- * @param ids: array of VertexId values. note that last parameter can be used to control if actually fetch vertices from storage or not.
- * since S2Graph use user-provided id as part of edge, it is possible to
- * fetch edge without fetch start vertex. default is false which means we are not fetching vertices from storage.
- * @return
- */
- override def vertices(ids: AnyRef*): util.Iterator[structure.Vertex] = {
- val fetchVertices = ids.lastOption.map { lastParam =>
- if (lastParam.isInstanceOf[Boolean]) lastParam.asInstanceOf[Boolean]
- else true
- }.getOrElse(true)
-
- if (ids.isEmpty) {
- //TODO: default storage need to be fixed.
- Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator
- } else {
- val vertices = ids.collect {
- case s2Vertex: S2VertexLike => s2Vertex
- case vId: VertexId => newVertex(vId)
- case vertex: Vertex => newVertex(vertex.id().asInstanceOf[VertexId])
- case other @ _ => newVertex(VertexId.fromString(other.toString))
- }
-
- if (fetchVertices) {
- val future = getVertices(vertices).map { vs =>
- val ls = new util.ArrayList[structure.Vertex]()
- ls.addAll(vs)
- ls.iterator()
- }
- Await.result(future, WaitTimeout)
- } else {
- vertices.iterator
- }
- }
- }
-
- override def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
- if (edgeIds.isEmpty) {
- // FIXME
- Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
- } else {
- Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
- }
- }
-
- def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = {
- val s2EdgeIds = edgeIds.collect {
- case s2Edge: S2EdgeLike => s2Edge.id().asInstanceOf[EdgeId]
- case id: EdgeId => id
- case s: String => EdgeId.fromString(s)
- }
- val edgesToFetch = for {
- id <- s2EdgeIds
- } yield {
- toEdge(id.srcVertexId, id.tgtVertexId, id.labelName, id.direction)
- }
-
- checkEdges(edgesToFetch).map { stepResult =>
- val ls = new util.ArrayList[structure.Edge]
- stepResult.edgeWithScores.foreach { es => ls.add(es.edge) }
- ls.iterator()
- }
- }
- override def tx(): Transaction = {
- if (!features.graph.supportsTransactions) throw Graph.Exceptions.transactionsNotSupported
- ???
- }
-
- override def variables(): Variables = new S2GraphVariables
-
- override def configuration(): Configuration = apacheConfiguration
-
- override def addVertex(label: String): Vertex = {
- if (label == null) throw Element.Exceptions.labelCanNotBeNull
- if (label.isEmpty) throw Element.Exceptions.labelCanNotBeEmpty
-
- addVertex(Seq(T.label, label): _*)
- }
-
- def makeVertex(idValue: AnyRef, kvsMap: Map[String, AnyRef]): S2VertexLike = {
- idValue match {
- case vId: VertexId =>
- toVertex(vId.column.service.serviceName, vId.column.columnName, vId, kvsMap)
- case _ =>
- val serviceColumnNames = kvsMap.getOrElse(T.label.toString, DefaultColumnName).toString
-
- val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter)
- val (serviceName, columnName) =
- if (names.length == 1) (DefaultServiceName, names(0))
- else throw new RuntimeException("malformed data on vertex label.")
-
- toVertex(serviceName, columnName, idValue, kvsMap)
- }
- }
-
- override def addVertex(kvs: AnyRef*): structure.Vertex = {
- if (!features().vertex().supportsUserSuppliedIds() && kvs.contains(T.id)) {
- throw Vertex.Exceptions.userSuppliedIdsNotSupported
- }
-
- val kvsMap = S2Property.kvsToProps(kvs)
- kvsMap.get(T.id.name()) match {
- case Some(idValue) if !S2Property.validType(idValue) =>
- throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported()
- case _ =>
- }
-
- kvsMap.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
-
- if (kvsMap.contains(T.label.name()) && kvsMap(T.label.name).toString.isEmpty)
- throw Element.Exceptions.labelCanNotBeEmpty
-
- val vertex = kvsMap.get(T.id.name()) match {
- case None => // do nothing
- val id = nextLocalLongId
- makeVertex(Long.box(id), kvsMap)
- case Some(idValue) if S2Property.validType(idValue) =>
- makeVertex(idValue, kvsMap)
- case _ =>
- throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported
- }
-
- addVertexInner(vertex)
-
- vertex
- }
-
- def addVertex(id: VertexId,
- ts: Long = System.currentTimeMillis(),
- props: S2Vertex.Props = S2Vertex.EmptyProps,
- op: Byte = 0,
- belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = {
- val vertex = newVertex(id, ts, props, op, belongLabelIds)
-
- val future = mutateVertices(Seq(vertex), withWait = true).map { rets =>
- if (rets.forall(_.isSuccess)) vertex
- else throw new RuntimeException("addVertex failed.")
- }
- Await.ready(future, WaitTimeout)
-
- vertex
- }
-
- def addVertexInner(vertex: S2VertexLike): S2VertexLike = {
- val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets =>
- if (rets.forall(_.isSuccess)) {
- indexProvider.mutateVerticesAsync(Seq(vertex))
- } else throw new RuntimeException("addVertex failed.")
- }
- Await.ready(future, WaitTimeout)
-
- vertex
- }
-
- /* tp3 only */
- def addEdge(srcVertex: S2VertexLike, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = {
- val containsId = kvs.contains(T.id)
-
- tgtVertex match {
- case otherV: S2VertexLike =>
- if (!features().edge().supportsUserSuppliedIds() && containsId) {
- throw Exceptions.userSuppliedIdsNotSupported()
- }
-
- val props = S2Property.kvsToProps(kvs)
-
- props.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
-
- //TODO: direction, operation, _timestamp need to be reserved property key.
-
- try {
- val direction = props.get("direction").getOrElse("out").toString
- val ts = props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis())
- val operation = props.get("operation").map(_.toString).getOrElse("insert")
- val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
- val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
- val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
- val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
- val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
-
- val edge = newEdge(srcVertex, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs)
-
- val future = mutateEdges(Seq(edge), withWait = true).flatMap { rets =>
- indexProvider.mutateEdgesAsync(Seq(edge))
- }
- Await.ready(future, WaitTimeout)
-
- edge
- } catch {
- case e: LabelNotExistException => throw new java.lang.IllegalArgumentException(e)
- }
- case null => throw new java.lang.IllegalArgumentException
- case _ => throw new RuntimeException("only S2Graph vertex can be used.")
- }
}
- override def close(): Unit = {
- shutdown()
- }
-
- override def compute[C <: GraphComputer](aClass: Class[C]): C = ???
-
- override def compute(): GraphComputer = {
- if (!features.graph.supportsComputer) {
- throw Graph.Exceptions.graphComputerNotSupported
- }
- ???
- }
-
- class S2GraphFeatures extends Features {
- import org.apache.s2graph.core.{features => FS}
- override def edge(): Features.EdgeFeatures = new FS.S2EdgeFeatures
-
- override def graph(): Features.GraphFeatures = new FS.S2GraphFeatures
-
- override def supports(featureClass: Class[_ <: Features.FeatureSet], feature: String): Boolean =
- super.supports(featureClass, feature)
-
- override def vertex(): Features.VertexFeatures = new FS.S2VertexFeatures
-
- override def toString: String = {
- s"FEATURES:\nEdgeFeatures:${edge}\nGraphFeatures:${graph}\nVertexFeatures:${vertex}"
- }
- }
-
- private val s2Features = new S2GraphFeatures
-
- override def features() = s2Features
-
- override def toString(): String = "[s2graph]"
+ def toVertex(serviceName: String,
+ columnName: String,
+ id: Any,
+ props: Map[String, Any] = Map.empty,
+ ts: Long = System.currentTimeMillis(),
+ operation: String = "insert"): S2VertexLike =
+ elementBuilder.toVertex(serviceName, columnName, id, props, ts, operation)
- override def io[I <: Io[_ <: GraphReader.ReaderBuilder[_ <: GraphReader], _ <: GraphWriter.WriterBuilder[_ <: GraphWriter], _ <: Mapper.Builder[_]]](builder: Io.Builder[I]): I = {
- builder.graph(this).registry(S2GraphIoRegistry.instance).create().asInstanceOf[I]
- }
+ def toEdge(srcId: Any,
+ tgtId: Any,
+ labelName: String,
+ direction: String,
+ props: Map[String, Any] = Map.empty,
+ ts: Long = System.currentTimeMillis(),
+ operation: String = "insert"): S2EdgeLike =
+ elementBuilder.toEdge(srcId, tgtId, labelName, direction, props, ts, operation)
+ def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] =
+ elementBuilder.toGraphElement(s, labelMapping)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
new file mode 100644
index 0000000..03a92c6
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -0,0 +1,238 @@
+package org.apache.s2graph.core
+import java.util
+
+import org.apache.commons.configuration.Configuration
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
+import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
+import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.types.VertexId
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
+import org.apache.tinkerpop.gremlin.structure
+import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions
+import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables}
+import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, Io, Mapper}
+import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex}
+
+import scala.concurrent.{Await, Future}
+import scala.collection.JavaConversions._
+
+trait S2GraphLike extends Graph {
+ this: S2Graph =>
+
+ var apacheConfiguration: Configuration
+
+ private val s2Features = new S2Features
+
+ override def features() = s2Features
+
+ def vertices(ids: AnyRef*): util.Iterator[structure.Vertex] = {
+ val fetchVertices = ids.lastOption.map { lastParam =>
+ if (lastParam.isInstanceOf[Boolean]) lastParam.asInstanceOf[Boolean]
+ else true
+ }.getOrElse(true)
+
+ if (ids.isEmpty) {
+ //TODO: default storage need to be fixed.
+ Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator
+ } else {
+ val vertices = ids.collect {
+ case s2Vertex: S2VertexLike => s2Vertex
+ case vId: VertexId => newVertex(vId)
+ case vertex: Vertex => newVertex(vertex.id().asInstanceOf[VertexId])
+ case other@_ => newVertex(VertexId.fromString(other.toString))
+ }
+
+ if (fetchVertices) {
+ val future = getVertices(vertices).map { vs =>
+ val ls = new util.ArrayList[structure.Vertex]()
+ ls.addAll(vs)
+ ls.iterator()
+ }
+ Await.result(future, WaitTimeout)
+ } else {
+ vertices.iterator
+ }
+ }
+ }
+
+ def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
+ if (edgeIds.isEmpty) {
+ // FIXME
+ Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
+ } else {
+ Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
+ }
+ }
+
+ def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = {
+ val s2EdgeIds = edgeIds.collect {
+ case s2Edge: S2EdgeLike => s2Edge.id().asInstanceOf[EdgeId]
+ case id: EdgeId => id
+ case s: String => EdgeId.fromString(s)
+ }
+ val edgesToFetch = for {
+ id <- s2EdgeIds
+ } yield {
+ elementBuilder.toEdge(id.srcVertexId, id.tgtVertexId, id.labelName, id.direction)
+ }
+
+ checkEdges(edgesToFetch).map { stepResult =>
+ val ls = new util.ArrayList[structure.Edge]
+ stepResult.edgeWithScores.foreach { es => ls.add(es.edge) }
+ ls.iterator()
+ }
+ }
+
+ def tx(): Transaction = {
+ if (!features.graph.supportsTransactions) throw Graph.Exceptions.transactionsNotSupported
+ ???
+ }
+
+ def variables(): Variables = new S2GraphVariables
+
+ def configuration(): Configuration = apacheConfiguration
+
+ def addVertex(label: String): Vertex = {
+ if (label == null) throw Element.Exceptions.labelCanNotBeNull
+ if (label.isEmpty) throw Element.Exceptions.labelCanNotBeEmpty
+
+ addVertex(Seq(T.label, label): _*)
+ }
+
+ def makeVertex(idValue: AnyRef, kvsMap: Map[String, AnyRef]): S2VertexLike = {
+ idValue match {
+ case vId: VertexId =>
+ elementBuilder.toVertex(vId.column.service.serviceName, vId.column.columnName, vId, kvsMap)
+ case _ =>
+ val serviceColumnNames = kvsMap.getOrElse(T.label.toString, DefaultColumnName).toString
+
+ val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter)
+ val (serviceName, columnName) =
+ if (names.length == 1) (DefaultServiceName, names(0))
+ else throw new RuntimeException("malformed data on vertex label.")
+
+ elementBuilder.toVertex(serviceName, columnName, idValue, kvsMap)
+ }
+ }
+
+ def addVertex(kvs: AnyRef*): structure.Vertex = {
+ if (!features().vertex().supportsUserSuppliedIds() && kvs.contains(T.id)) {
+ throw Vertex.Exceptions.userSuppliedIdsNotSupported
+ }
+
+ val kvsMap = S2Property.kvsToProps(kvs)
+ kvsMap.get(T.id.name()) match {
+ case Some(idValue) if !S2Property.validType(idValue) =>
+ throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported()
+ case _ =>
+ }
+
+ kvsMap.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
+
+ if (kvsMap.contains(T.label.name()) && kvsMap(T.label.name).toString.isEmpty)
+ throw Element.Exceptions.labelCanNotBeEmpty
+
+ val vertex = kvsMap.get(T.id.name()) match {
+ case None => // do nothing
+ val id = nextLocalLongId
+ makeVertex(Long.box(id), kvsMap)
+ case Some(idValue) if S2Property.validType(idValue) =>
+ makeVertex(idValue, kvsMap)
+ case _ =>
+ throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported
+ }
+
+ addVertexInner(vertex)
+
+ vertex
+ }
+
+ def addVertex(id: VertexId,
+ ts: Long = System.currentTimeMillis(),
+ props: S2Vertex.Props = S2Vertex.EmptyProps,
+ op: Byte = 0,
+ belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = {
+ val vertex = newVertex(id, ts, props, op, belongLabelIds)
+
+ val future = mutateVertices(Seq(vertex), withWait = true).map { rets =>
+ if (rets.forall(_.isSuccess)) vertex
+ else throw new RuntimeException("addVertex failed.")
+ }
+ Await.ready(future, WaitTimeout)
+
+ vertex
+ }
+
+ def addVertexInner(vertex: S2VertexLike): S2VertexLike = {
+ val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets =>
+ if (rets.forall(_.isSuccess)) {
+ indexProvider.mutateVerticesAsync(Seq(vertex))
+ } else throw new RuntimeException("addVertex failed.")
+ }
+ Await.ready(future, WaitTimeout)
+
+ vertex
+ }
+
+ /* tp3 only */
+ def addEdge(srcVertex: S2VertexLike, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = {
+ val containsId = kvs.contains(T.id)
+
+ tgtVertex match {
+ case otherV: S2VertexLike =>
+ if (!features().edge().supportsUserSuppliedIds() && containsId) {
+ throw Exceptions.userSuppliedIdsNotSupported()
+ }
+
+ val props = S2Property.kvsToProps(kvs)
+
+ props.foreach { case (k, v) => S2Property.assertValidProp(k, v) }
+
+ //TODO: direction, operation, _timestamp need to be reserved property key.
+
+ try {
+ val direction = props.get("direction").getOrElse("out").toString
+ val ts = props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis())
+ val operation = props.get("operation").map(_.toString).getOrElse("insert")
+ val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+ val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
+ val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts)
+ val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts)
+ val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+ val edge = newEdge(srcVertex, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs)
+
+ val future = mutateEdges(Seq(edge), withWait = true).flatMap { rets =>
+ indexProvider.mutateEdgesAsync(Seq(edge))
+ }
+ Await.ready(future, WaitTimeout)
+
+ edge
+ } catch {
+ case e: LabelNotExistException => throw new java.lang.IllegalArgumentException(e)
+ }
+ case null => throw new java.lang.IllegalArgumentException
+ case _ => throw new RuntimeException("only S2Graph vertex can be used.")
+ }
+ }
+
+ def close(): Unit = {
+ shutdown()
+ }
+
+ def compute[C <: GraphComputer](aClass: Class[C]): C = ???
+
+ def compute(): GraphComputer = {
+ if (!features.graph.supportsComputer) {
+ throw Graph.Exceptions.graphComputerNotSupported
+ }
+ ???
+ }
+
+ def io[I <: Io[_ <: GraphReader.ReaderBuilder[_ <: GraphReader], _ <: GraphWriter.WriterBuilder[_ <: GraphWriter], _ <: Mapper.Builder[_]]](builder: Io.Builder[I]): I = {
+ builder.graph(this).registry(S2GraphIoRegistry.instance).create().asInstanceOf[I]
+ }
+
+ override def toString(): String = "[s2graph]"
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/features/S2Features.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/features/S2Features.scala b/s2core/src/main/scala/org/apache/s2graph/core/features/S2Features.scala
new file mode 100644
index 0000000..36c9ecc
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/features/S2Features.scala
@@ -0,0 +1,19 @@
+package org.apache.s2graph.core.features
+
+import org.apache.tinkerpop.gremlin.structure.Graph.Features
+
+class S2Features extends Features {
+ import org.apache.s2graph.core.{features => FS}
+ override def edge(): Features.EdgeFeatures = new FS.S2EdgeFeatures
+
+ override def graph(): Features.GraphFeatures = new FS.S2GraphFeatures
+
+ override def supports(featureClass: Class[_ <: Features.FeatureSet], feature: String): Boolean =
+ super.supports(featureClass, feature)
+
+ override def vertex(): Features.VertexFeatures = new FS.S2VertexFeatures
+
+ override def toString: String = {
+ s"FEATURES:\nEdgeFeatures:${edge}\nGraphFeatures:${graph}\nVertexFeatures:${vertex}"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index e6075ec..2a8f1e2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -22,9 +22,10 @@ package org.apache.s2graph.core.storage
import com.typesafe.config.Config
import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.serde.Deserializable
+import org.apache.s2graph.core.storage.serde.{Deserializable, MutationHelper}
import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializable
import org.apache.s2graph.core.types._
+
import scala.concurrent.{ExecutionContext, Future}
abstract class Storage(val graph: S2Graph,
@@ -61,49 +62,51 @@ abstract class Storage(val graph: S2Graph,
*/
lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher)
- /** IO **/
- def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] =
- serDe.snapshotEdgeSerializer(snapshotEdge)
-
- def indexEdgeSerializer(indexEdge: IndexEdge): serde.Serializable[IndexEdge] =
- serDe.indexEdgeSerializer(indexEdge)
-
- def vertexSerializer(vertex: S2VertexLike): serde.Serializable[S2VertexLike] =
- serDe.vertexSerializer(vertex)
-
- def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] =
- serDe.snapshotEdgeDeserializer(schemaVer)
-
- def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable =
- serDe.indexEdgeDeserializer(schemaVer)
-
- def vertexDeserializer(schemaVer: String): Deserializable[S2VertexLike] =
- serDe.vertexDeserializer(schemaVer)
+ lazy val mutationHelper: MutationHelper = new MutationHelper(this)
+
+// /** IO **/
+// def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] =
+// serDe.snapshotEdgeSerializer(snapshotEdge)
+//
+// def indexEdgeSerializer(indexEdge: IndexEdge): serde.Serializable[IndexEdge] =
+// serDe.indexEdgeSerializer(indexEdge)
+//
+// def vertexSerializer(vertex: S2Vertex): serde.Serializable[S2Vertex] =
+// serDe.vertexSerializer(vertex)
+//
+// def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] =
+// serDe.snapshotEdgeDeserializer(schemaVer)
+//
+// def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable =
+// serDe.indexEdgeDeserializer(schemaVer)
+//
+// def vertexDeserializer(schemaVer: String): Deserializable[S2Vertex] =
+// serDe.vertexDeserializer(schemaVer)
/** Mutation Builder */
- def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) =
- io.increments(edgeMutate)
-
- def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
- io.indexedEdgeMutations(edgeMutate)
-
- def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
- io.buildIncrementsAsync(indexedEdge, amount)
-
- def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
- io.buildIncrementsCountAsync(indexedEdge, amount)
-
- def buildVertexPutsAsync(edge: S2EdgeLike): Seq[SKeyValue] =
- io.buildVertexPutsAsync(edge)
-
- def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
- io.snapshotEdgeMutations(edgeMutate)
-
- def buildDegreePuts(edge: S2EdgeLike, degreeVal: Long): Seq[SKeyValue] =
- io.buildDegreePuts(edge, degreeVal)
-
- def buildPutsAll(vertex: S2VertexLike): Seq[SKeyValue] =
- io.buildPutsAll(vertex)
+// def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) =
+// io.increments(edgeMutate)
+//
+// def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
+// io.indexedEdgeMutations(edgeMutate)
+//
+// def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
+// io.buildIncrementsAsync(indexedEdge, amount)
+//
+// def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
+// io.buildIncrementsCountAsync(indexedEdge, amount)
+//
+// def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] =
+// io.buildVertexPutsAsync(edge)
+//
+// def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
+// io.snapshotEdgeMutations(edgeMutate)
+//
+// def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] =
+// io.buildDegreePuts(edge, degreeVal)
+//
+// def buildPutsAll(vertex: S2Vertex): Seq[SKeyValue] =
+// io.buildPutsAll(vertex)
/** Mutation **/
@@ -129,8 +132,8 @@ abstract class Storage(val graph: S2Graph,
fetcher.fetchSnapshotEdgeInner(edge)
/** Conflict Resolver **/
- def retry(tryNum: Int)(edges: Seq[S2EdgeLike], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] =
- conflictResolver.retry(tryNum)(edges, statusCode, fetchedSnapshotEdgeOpt)
+// def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] =
+// conflictResolver.retry(tryNum)(edges, statusCode, fetchedSnapshotEdgeOpt)
/** Management **/
@@ -145,4 +148,25 @@ abstract class Storage(val graph: S2Graph,
def shutdown(): Unit = management.shutdown()
def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName)
+
+ def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+ requestTs: Long,
+ retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] =
+ mutationHelper.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum)
+
+ def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
+ mutationHelper.mutateVertex(zkQuorum: String, vertex, withWait)
+
+ def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] =
+ mutationHelper.mutateStrongEdges(zkQuorum, _edges, withWait)
+
+
+ def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] =
+ mutationHelper.mutateWeakEdges(zkQuorum, _edges, withWait)
+
+ def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] =
+ mutationHelper.incrementCounts(zkQuorum, edges, withWait)
+
+ def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] =
+ mutationHelper.updateDegree(zkQuorum, edge, degreeVal)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index 6be5f60..bdb6e99 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -303,7 +303,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
val cacheTTL = queryParam.cacheTTLInMillis
/* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
- val edge = graph.toRequestEdge(queryRequest, parentEdges)
+ val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
val request = buildRequest(queryRequest, edge)
val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index b618962..408822a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -97,7 +97,7 @@ class SnapshotEdgeDeserializable(graph: S2Graph) extends Deserializable[Snapshot
Option(pendingEdge)
}
- val snapshotEdge = graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
+ val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode,
pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index 8c961ce..ff8eb80 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -88,7 +88,7 @@ class SnapshotEdgeDeserializable(graph: S2Graph) extends Deserializable[Snapshot
Option(pendingEdge)
}
- val snapshotEdge = graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
+ val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode,
pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
index 0cbaa81..24e98a1 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala
@@ -46,8 +46,8 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels {
val labelOpt = Option(l)
val edge = graph.newEdge(vertex, tgtVertex, l, labelWithDir.dir, 0, ts, props, tsInnerValOpt = Option(InnerVal.withLong(ts, l.schemaVersion)))
val indexEdge = edge.edgesWithIndex.find(_.labelIndexSeq == LabelIndex.DefaultSeq).head
- val kvs = graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues
- val _indexEdgeOpt = graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(kvs, None)
+ val kvs = graph.getStorage(l).serDe.indexEdgeSerializer(indexEdge).toKeyValues
+ val _indexEdgeOpt = graph.getStorage(l).serDe.indexEdgeDeserializer(l.schemaVersion).fromKeyValues(kvs, None)
_indexEdgeOpt should not be empty
edge == _indexEdgeOpt.get should be(true)