You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/11/19 02:28:47 UTC
[04/23] incubator-s2graph git commit: passed s2tests.
passed s2tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/87394b9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/87394b9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/87394b9f
Branch: refs/heads/master
Commit: 87394b9f7b7e241642201460a460ba2403a0fb99
Parents: 7413aad
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Nov 3 15:28:08 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Nov 3 15:28:08 2017 +0900
----------------------------------------------------------------------
.../org/apache/s2graph/core/QueryParam.scala | 10 +--
.../org/apache/s2graph/core/QueryResult.scala | 2 +-
.../scala/org/apache/s2graph/core/S2Edge.scala | 68 +++++++++-----------
.../org/apache/s2graph/core/S2EdgeLike.scala | 54 +++++++++++++---
.../scala/org/apache/s2graph/core/S2Graph.scala | 50 +++++++-------
.../org/apache/s2graph/core/S2Property.scala | 2 +-
.../org/apache/s2graph/core/S2VertexLike.scala | 2 +-
.../s2graph/core/index/IndexProvider.scala | 12 ++--
.../s2graph/core/parsers/WhereParser.scala | 34 +++++-----
.../s2graph/core/rest/RequestParser.scala | 4 +-
.../apache/s2graph/core/storage/Storage.scala | 10 +--
.../apache/s2graph/core/storage/StorageIO.scala | 20 +++---
.../s2graph/core/storage/StorageReadable.scala | 6 +-
.../storage/WriteWriteConflictResolver.scala | 34 +++++-----
.../hbase/AsynchbaseStorageReadable.scala | 6 +-
.../tall/IndexEdgeDeserializable.scala | 17 +++--
.../wide/IndexEdgeDeserializable.scala | 12 ++--
.../tall/SnapshotEdgeSerializable.scala | 2 +-
.../wide/SnapshotEdgeSerializable.scala | 2 +-
.../s2graph/core/parsers/WhereParserTest.scala | 2 +-
.../core/tinkerpop/S2GraphProvider.scala | 1 +
.../rest/play/controllers/EdgeController.scala | 6 +-
22 files changed, 195 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 2e8d1f4..e98ef37 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -163,7 +163,7 @@ case class EdgeTransformer(jsValue: JsValue) {
}
}
- def toInnerValOpt(queryParam: QueryParam, edge: S2Edge, fieldName: String): Option[InnerValLike] = {
+ def toInnerValOpt(queryParam: QueryParam, edge: S2EdgeLike, fieldName: String): Option[InnerValLike] = {
fieldName match {
case LabelMeta.to.name => Option(edge.tgtVertex.innerId)
case LabelMeta.from.name => Option(edge.srcVertex.innerId)
@@ -171,7 +171,7 @@ case class EdgeTransformer(jsValue: JsValue) {
}
}
- def transform(queryParam: QueryParam, edge: S2Edge, nextStepOpt: Option[Step]): Seq[S2Edge] = {
+ def transform(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = {
if (isDefault) Seq(edge)
else {
val edges = for {
@@ -311,7 +311,7 @@ case class QueryParam(labelName: String,
CanInnerValLike.anyToInnerValLike.toInnerVal(id)(label.tgtColumnWithDir(dir).schemaVersion)
}
- def buildInterval(edgeOpt: Option[S2Edge]) = intervalOpt match {
+ def buildInterval(edgeOpt: Option[S2EdgeLike]) = intervalOpt match {
case None => Array.empty[Byte] -> Array.empty[Byte]
case Some(interval) =>
val (froms, tos) = interval
@@ -359,7 +359,7 @@ case class QueryParam(labelName: String,
Bytes.add(bytes, optionalCacheKey)
}
- private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[S2Edge]): Seq[(LabelMeta, InnerValLike)] = {
+ private def convertToInner(kvs: Seq[(String, JsValue)], edgeOpt: Option[S2EdgeLike]): Seq[(LabelMeta, InnerValLike)] = {
kvs.map { case (propKey, propValJs) =>
propValJs match {
case JsString(in) if edgeOpt.isDefined && in.contains("_parent.") =>
@@ -392,7 +392,7 @@ case class QueryParam(labelName: String,
}
}
- def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: Seq[(String, JsValue)], edgeOpt: Option[S2Edge] = None) = {
+ def paddingInterval(len: Byte, froms: Seq[(String, JsValue)], tos: Seq[(String, JsValue)], edgeOpt: Option[S2EdgeLike] = None) = {
val fromInnerVal = convertToInner(froms, edgeOpt)
val toInnerVal = convertToInner(tos, edgeOpt)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index 7506b40..9bd3cdb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -91,7 +91,7 @@ object WithScore {
}
}
-case class EdgeWithScore(edge: S2Edge,
+case class EdgeWithScore(edge: S2EdgeLike,
score: Double,
label: Label,
orderByValues: (Any, Any, Any, Any) = StepResult.EmptyOrderByValues,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 97abd26..3529991 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -71,7 +71,7 @@ case class SnapshotEdge(graph: S2Graph,
op: Byte,
version: Long,
private val propsWithTs: Props,
- pendingEdgeOpt: Option[S2Edge],
+ pendingEdgeOpt: Option[S2EdgeLike],
statusCode: Byte = 0,
lockTs: Option[Long],
tsInnerValOpt: Option[InnerValLike] = None) {
@@ -89,7 +89,7 @@ case class SnapshotEdge(graph: S2Graph,
def allPropsDeleted = S2Edge.allPropsDeleted(propsWithTs)
- def toEdge: S2Edge = {
+ def toEdge: S2EdgeLike = {
S2Edge(graph, srcVertex, tgtVertex, label, dir, op,
version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt,
statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
@@ -243,7 +243,7 @@ case class IndexEdge(graph: S2Graph,
} yield meta.name -> jsValue
- def toEdge: S2Edge = S2Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+ def toEdge: S2EdgeLike = S2Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
// only for debug
def toLogString() = {
@@ -310,11 +310,11 @@ case class S2Edge(override val innerGraph: S2Graph,
var version: Long = System.currentTimeMillis(),
override val propsWithTs: Props = S2Edge.EmptyProps,
override val parentEdges: Seq[EdgeWithScore] = Nil,
- override val originalEdgeOpt: Option[S2Edge] = None,
- override val pendingEdgeOpt: Option[S2Edge] = None,
+ override val originalEdgeOpt: Option[S2EdgeLike] = None,
+ override val pendingEdgeOpt: Option[S2EdgeLike] = None,
override val statusCode: Byte = 0,
override val lockTs: Option[Long] = None,
- var tsInnerValOpt: Option[InnerValLike] = None) extends S2EdgeLike with GraphElement {
+ var tsInnerValOpt: Option[InnerValLike] = None) extends S2EdgeLike {
// if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
// assert(propsWithTs.contains(LabelMeta.timeStampSeq))
@@ -335,10 +335,10 @@ case class S2Edge(override val innerGraph: S2Graph,
// }
- def toLogString: String = {
- // val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
- List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t")
- }
+// def toLogString: String = {
+// // val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
+// List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t")
+// }
override def hashCode(): Int = {
id().hashCode()
@@ -455,7 +455,7 @@ object S2Edge {
type State = Map[LabelMeta, InnerValLikeWithTs]
type PropsPairWithTs = (State, State, Long, String)
type MergeState = PropsPairWithTs => (State, Boolean)
- type UpdateFunc = (Option[S2Edge], S2Edge, MergeState)
+ type UpdateFunc = (Option[S2EdgeLike], S2EdgeLike, MergeState)
def EmptyProps = new java.util.HashMap[String, S2Property[_]]
def EmptyState = Map.empty[LabelMeta, InnerValLikeWithTs]
@@ -490,7 +490,7 @@ object S2Edge {
def fillPropsWithTs(indexEdge: IndexEdge, state: State): Unit = {
state.foreach { case (k, v) => indexEdge.property(k.name, v.innerVal.value, v.ts) }
}
- def fillPropsWithTs(edge: S2Edge, state: State): Unit = {
+ def fillPropsWithTs(edge: S2EdgeLike, state: State): Unit = {
state.foreach { case (k, v) => edge.propertyInner(k.name, v.innerVal.value, v.ts) }
}
@@ -500,7 +500,7 @@ object S2Edge {
}.toMap
}
- def stateToProps(edge: S2Edge, state: State): Props = {
+ def stateToProps(edge: S2EdgeLike, state: State): Props = {
state.foreach { case (k, v) =>
edge.propertyInner(k.name, v.innerVal.value, v.ts)
}
@@ -533,7 +533,7 @@ object S2Edge {
ret
}
- def buildDeleteBulk(invertedEdge: Option[S2Edge], requestEdge: S2Edge): (S2Edge, EdgeMutate) = {
+ def buildDeleteBulk(invertedEdge: Option[S2EdgeLike], requestEdge: S2EdgeLike): (S2EdgeLike, EdgeMutate) = {
// assert(invertedEdge.isEmpty)
// assert(requestEdge.op == GraphUtil.operations("delete"))
@@ -543,7 +543,7 @@ object S2Edge {
(requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, newSnapshotEdge = edgeInverted))
}
- def buildOperation(invertedEdge: Option[S2Edge], requestEdges: Seq[S2Edge]): (S2Edge, EdgeMutate) = {
+ def buildOperation(invertedEdge: Option[S2EdgeLike], requestEdges: Seq[S2EdgeLike]): (S2EdgeLike, EdgeMutate) = {
// logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
// logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
val oldPropsWithTs =
@@ -551,21 +551,21 @@ object S2Edge {
else propsToState(invertedEdge.get.propsWithTs)
val funcs = requestEdges.map { edge =>
- if (edge.op == GraphUtil.operations("insert")) {
+ if (edge.getOp() == GraphUtil.operations("insert")) {
edge.innerLabel.consistencyLevel match {
case "strong" => S2Edge.mergeUpsert _
case _ => S2Edge.mergeInsertBulk _
}
- } else if (edge.op == GraphUtil.operations("insertBulk")) {
+ } else if (edge.getOp() == GraphUtil.operations("insertBulk")) {
S2Edge.mergeInsertBulk _
- } else if (edge.op == GraphUtil.operations("delete")) {
+ } else if (edge.getOp() == GraphUtil.operations("delete")) {
edge.innerLabel.consistencyLevel match {
case "strong" => S2Edge.mergeDelete _
case _ => throw new RuntimeException("not supported")
}
}
- else if (edge.op == GraphUtil.operations("update")) S2Edge.mergeUpdate _
- else if (edge.op == GraphUtil.operations("increment")) S2Edge.mergeIncrement _
+ else if (edge.getOp() == GraphUtil.operations("update")) S2Edge.mergeUpdate _
+ else if (edge.getOp() == GraphUtil.operations("increment")) S2Edge.mergeIncrement _
else throw new RuntimeException(s"not supported operation on edge: $edge")
}
@@ -587,7 +587,7 @@ object S2Edge {
}
val requestTs = requestEdge.ts
/* version should be monotoniously increasing so our RPC mutation should be applied safely */
- val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs)
+ val newVersion = invertedEdge.map(e => e.getVersion() + incrementVersion).getOrElse(requestTs)
val maxTs = prevPropsWithTs.map(_._2.ts).max
val newTs = if (maxTs > requestTs) maxTs else requestTs
val propsWithTs = prevPropsWithTs ++
@@ -597,14 +597,14 @@ object S2Edge {
// logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
// logger.error(s"$propsWithTs")
- val newEdge = requestEdge.copy(propsWithTs = EmptyProps)
- fillPropsWithTs(newEdge, propsWithTs)
+ val newEdge =requestEdge.copyEdgeWithState(propsWithTs)
+
(newEdge, edgeMutate)
}
}
- def buildMutation(snapshotEdgeOpt: Option[S2Edge],
- requestEdge: S2Edge,
+ def buildMutation(snapshotEdgeOpt: Option[S2EdgeLike],
+ requestEdge: S2EdgeLike,
newVersion: Long,
oldPropsWithTs: Map[LabelMeta, InnerValLikeWithTs],
newPropsWithTs: Map[LabelMeta, InnerValLikeWithTs]): EdgeMutate = {
@@ -615,14 +615,14 @@ object S2Edge {
} else {
val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAtSeq)
val newOp = snapshotEdgeOpt match {
- case None => requestEdge.op
+ case None => requestEdge.getOp()
case Some(old) =>
val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max
- if (oldMaxTs > requestEdge.ts) old.op
- else requestEdge.op
+ if (oldMaxTs > requestEdge.ts) old.getOp()
+ else requestEdge.getOp()
}
- val newSnapshotEdge = requestEdge.copy(op = newOp, version = newVersion).copyEdgeWithState(newPropsWithTs)
+ val newSnapshotEdge = requestEdge.copyOp(newOp).copyVersion(newVersion).copyEdgeWithState(newPropsWithTs)
val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge)
// delete request must always update snapshot.
@@ -631,8 +631,8 @@ object S2Edge {
EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt)
} else {
val edgesToDelete = snapshotEdgeOpt match {
- case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") =>
- snapshotEdge.copy(op = GraphUtil.defaultOpByte)
+ case Some(snapshotEdge) if snapshotEdge.getOp() != GraphUtil.operations("delete") =>
+ snapshotEdge.copyOp(GraphUtil.defaultOpByte)
.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
case _ => Nil
}
@@ -640,11 +640,7 @@ object S2Edge {
val edgesToInsert =
if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
else {
- val newEdge = requestEdge.copy(
- version = newVersion,
- propsWithTs = S2Edge.EmptyProps,
- op = GraphUtil.defaultOpByte
- )
+ val newEdge = requestEdge.copyOp(GraphUtil.defaultOpByte).copyVersion(newVersion).copyEdgeWithState(S2Edge.EmptyState)
newPropsWithTs.foreach { case (k, v) => newEdge.propertyInner(k.name, v.innerVal.value, v.ts) }
newEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
index 04ed7ab..bb58554 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
@@ -14,7 +14,7 @@ import play.api.libs.json.Json
import scala.concurrent.Await
import scala.collection.JavaConverters._
-trait S2EdgeLike extends Edge {
+trait S2EdgeLike extends Edge with GraphElement {
this: S2Edge =>
val innerGraph: S2Graph
@@ -27,8 +27,8 @@ trait S2EdgeLike extends Edge {
// var version: Long = System.currentTimeMillis()
val propsWithTs: Props = S2Edge.EmptyProps
val parentEdges: Seq[EdgeWithScore] = Nil
- val originalEdgeOpt: Option[S2Edge] = None
- val pendingEdgeOpt: Option[S2Edge] = None
+ val originalEdgeOpt: Option[S2EdgeLike] = None
+ val pendingEdgeOpt: Option[S2EdgeLike] = None
val statusCode: Byte = 0
val lockTs: Option[Long] = None
// var tsInnerValOpt: Option[InnerValLike] = None
@@ -48,6 +48,13 @@ trait S2EdgeLike extends Edge {
lazy val labelName = innerLabel.label
lazy val direction = GraphUtil.fromDirection(dir)
+ def getOp(): Byte = op
+ def setOp(newOp: Byte): Unit = op = newOp
+ def getVersion(): Long = version
+ def setVersion(newVersion: Long): Unit = version = newVersion
+ def getTsInnerValOpt(): Option[InnerValLike] = tsInnerValOpt
+ def setTsInnerValOpt(newTsInnerValOpt: Option[InnerValLike]): Unit = tsInnerValOpt = newTsInnerValOpt
+
def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
@@ -196,6 +203,11 @@ trait S2EdgeLike extends Edge {
S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
}
+ def updateOriginalEdgeOpt(newOriginalEdgeOpt: S2EdgeLike): S2EdgeLike = {
+ S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs, parentEdges,
+ Option(newOriginalEdgeOpt), pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+ }
+
def rank(r: RankParam): Double =
if (r.keySeqAndWeights.size <= 0) 1.0f
else {
@@ -225,12 +237,12 @@ trait S2EdgeLike extends Edge {
version: Long = version,
propsWithTs: State = S2Edge.propsToState(this.propsWithTs),
parentEdges: Seq[EdgeWithScore] = parentEdges,
- originalEdgeOpt: Option[S2Edge] = originalEdgeOpt,
- pendingEdgeOpt: Option[S2Edge] = pendingEdgeOpt,
+ originalEdgeOpt: Option[S2EdgeLike] = originalEdgeOpt,
+ pendingEdgeOpt: Option[S2EdgeLike] = pendingEdgeOpt,
statusCode: Byte = statusCode,
lockTs: Option[Long] = lockTs,
tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt,
- ts: Long = ts): S2Edge = {
+ ts: Long = ts): S2EdgeLike = {
val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps,
parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
S2Edge.fillPropsWithTs(edge, propsWithTs)
@@ -238,19 +250,39 @@ trait S2EdgeLike extends Edge {
edge
}
- def copyEdgeWithState(state: State, ts: Long): S2Edge = {
+ def copyEdgeWithState(state: State, ts: Long): S2EdgeLike = {
val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
S2Edge.fillPropsWithTs(newEdge, state)
newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts)
newEdge
}
- def copyEdgeWithState(state: State): S2Edge = {
+ def copyEdgeWithState(state: State): S2EdgeLike = {
val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
S2Edge.fillPropsWithTs(newEdge, state)
newEdge
}
+ def copyOp(newOp: Byte): S2EdgeLike = {
+ copy(op = newOp)
+ }
+
+ def copyVersion(newVersion: Long): S2EdgeLike = {
+ copy(version = newVersion)
+ }
+
+ def copyParentEdges(parents: Seq[EdgeWithScore]): S2EdgeLike = {
+ copy(parentEdges = parents)
+ }
+
+ def copyStatusCode(newStatusCode: Byte): S2EdgeLike = {
+ copy(statusCode = newStatusCode)
+ }
+
+ def copyLockTs(newLockTs: Option[Long]): S2EdgeLike = {
+ copy(lockTs = newLockTs)
+ }
+
def vertices(direction: Direction): util.Iterator[structure.Vertex] = {
val arr = new util.ArrayList[Vertex]()
@@ -373,7 +405,13 @@ trait S2EdgeLike extends Edge {
}
}
+ def toLogString: String = {
+ // val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
+ List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t")
+ }
+
private def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) =
if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 90190cf..8bb95fc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -201,13 +201,13 @@ object S2Graph {
}
/** common methods for filter out, transform, aggregate queryResult */
- def convertEdges(queryParam: QueryParam, edge: S2Edge, nextStepOpt: Option[Step]): Seq[S2Edge] = {
+ def convertEdges(queryParam: QueryParam, edge: S2EdgeLike, nextStepOpt: Option[Step]): Seq[S2EdgeLike] = {
for {
convertedEdge <- queryParam.edgeTransformer.transform(queryParam, edge, nextStepOpt) if !edge.isDegree
} yield convertedEdge
}
- def processTimeDecay(queryParam: QueryParam, edge: S2Edge) = {
+ def processTimeDecay(queryParam: QueryParam, edge: S2EdgeLike) = {
/* process time decay */
val tsVal = queryParam.timeDecay match {
case None => 1.0
@@ -258,7 +258,7 @@ object S2Graph {
}
}
- def toHashKey(queryParam: QueryParam, edge: S2Edge, isDegree: Boolean): (HashKey, FilterHashKey) = {
+ def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = {
val src = edge.srcVertex.innerId.hashCode()
val tgt = edge.tgtVertex.innerId.hashCode()
val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree)
@@ -435,7 +435,7 @@ object S2Graph {
val tsVal = processTimeDecay(queryParam, edge)
val newScore = degreeScore + score
// val newEdge = if (queryOption.returnTree) edge.copy(parentEdges = parents) else edge
- val newEdge = edge.copy(parentEdges = parents)
+ val newEdge = edge.copyParentEdges(parents)
edgeWithScore.copy(edge = newEdge, score = newScore * labelWeight * tsVal)
}
@@ -971,7 +971,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
def fallback = Future.successful(StepResult.Empty)
- def checkEdges(edges: Seq[S2Edge]): Future[StepResult] = {
+ def checkEdges(edges: Seq[S2EdgeLike]): Future[StepResult] = {
val futures = for {
edge <- edges
} yield {
@@ -1319,11 +1319,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
def mutateElements(elements: Seq[GraphElement],
withWait: Boolean = false): Future[Seq[MutateResponse]] = {
- val edgeBuffer = ArrayBuffer[(S2Edge, Int)]()
+ val edgeBuffer = ArrayBuffer[(S2EdgeLike, Int)]()
val vertexBuffer = ArrayBuffer[(S2VertexLike, Int)]()
elements.zipWithIndex.foreach {
- case (e: S2Edge, idx: Int) => edgeBuffer.append((e, idx))
+ case (e: S2EdgeLike, idx: Int) => edgeBuffer.append((e, idx))
case (v: S2VertexLike, idx: Int) => vertexBuffer.append((v, idx))
case any@_ => logger.error(s"Unknown type: ${any}")
}
@@ -1347,13 +1347,13 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
// def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
- def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
+ def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
val edgeWithIdxs = edges.zipWithIndex
val (strongEdges, weakEdges) =
edgeWithIdxs.partition { case (edge, idx) =>
val e = edge
- e.innerLabel.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")
+ e.innerLabel.consistencyLevel == "strong" && e.getOp() != GraphUtil.operations("insertBulk")
}
val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
@@ -1365,7 +1365,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
/* multiple edges with weak consistency level will be processed as batch */
val mutations = edges.flatMap { edge =>
val (_, edgeUpdate) =
- if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
+ if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
else S2Edge.buildOperation(None, Seq(edge))
val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy)
@@ -1380,7 +1380,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
Future.sequence(futures)
}
- val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") }
+ val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.getOp() == GraphUtil.operations("deleteAll") }
val deleteAllFutures = strongDeleteAll.map { case (edge, idx) =>
deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.labelWithDir.dir, edge.ts).map(idx -> _)
@@ -1404,7 +1404,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
}
- private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = {
+ private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[Boolean]] = {
val edgeWithIdxs = _edges.zipWithIndex
val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
@@ -1440,7 +1440,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
- private def mutateEdgesInner(storage: Storage)(edges: Seq[S2Edge],
+ private def mutateEdgesInner(storage: Storage)(edges: Seq[S2EdgeLike],
checkConsistency: Boolean,
withWait: Boolean): Future[MutateResponse] = {
assert(edges.nonEmpty)
@@ -1495,8 +1495,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
- def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
- def incrementCounts(storage: Storage)(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
+ def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
+ def incrementCounts(storage: Storage)(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
val futures = for {
edge <- edges
} yield {
@@ -1523,7 +1523,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
}
- def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[MutateResponse] = {
+ def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = {
val label = edge.innerLabel
val storage = getStorage(label)
@@ -1571,11 +1571,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
toVertex(GraphUtil.split(s))
}
- def toEdge(s: String): Option[S2Edge] = {
+ def toEdge(s: String): Option[S2EdgeLike] = {
toEdge(GraphUtil.split(s))
}
- def toEdge(parts: Array[String]): Option[S2Edge] = Try {
+ def toEdge(parts: Array[String]): Option[S2EdgeLike] = Try {
val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
val tempDirection = if (parts.length >= 8) parts(7) else "out"
@@ -1605,7 +1605,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
direction: String,
props: Map[String, Any] = Map.empty,
ts: Long = System.currentTimeMillis(),
- operation: String = "insert"): S2Edge = {
+ operation: String = "insert"): S2EdgeLike = {
val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
val srcColumn = if (direction == "out") label.srcColumn else label.tgtColumn
@@ -1649,7 +1649,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
vertex
}
- def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2Edge = {
+ def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2EdgeLike = {
val srcVertex = queryRequest.vertex
val queryParam = queryRequest.queryParam
val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt
@@ -1710,11 +1710,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
version: Long = System.currentTimeMillis(),
propsWithTs: S2Edge.State,
parentEdges: Seq[EdgeWithScore] = Nil,
- originalEdgeOpt: Option[S2Edge] = None,
- pendingEdgeOpt: Option[S2Edge] = None,
+ originalEdgeOpt: Option[S2EdgeLike] = None,
+ pendingEdgeOpt: Option[S2EdgeLike] = None,
statusCode: Byte = 0,
lockTs: Option[Long] = None,
- tsInnerValOpt: Option[InnerValLike] = None): S2Edge = {
+ tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = {
val edge = S2Edge(
this,
srcVertex,
@@ -1758,7 +1758,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
op: Byte,
version: Long,
propsWithTs: S2Edge.State,
- pendingEdgeOpt: Option[S2Edge],
+ pendingEdgeOpt: Option[S2EdgeLike],
statusCode: Byte = 0,
lockTs: Option[Long],
tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = {
@@ -1874,7 +1874,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = {
val s2EdgeIds = edgeIds.collect {
- case s2Edge: S2Edge => s2Edge.id().asInstanceOf[EdgeId]
+ case s2Edge: S2EdgeLike => s2Edge.id().asInstanceOf[EdgeId]
case id: EdgeId => id
case s: String => EdgeId.fromString(s)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
index e86c17f..50b94de 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -102,7 +102,7 @@ object S2Property {
}
}
-case class S2Property[V](element: S2Edge,
+case class S2Property[V](element: S2EdgeLike,
labelMeta: LabelMeta,
key: String,
v: V,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
index 5a8f722..b88c18d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala
@@ -71,7 +71,7 @@ trait S2VertexLike extends Vertex with GraphElement {
val arr = new util.ArrayList[Vertex]()
edges(direction, edgeLabels: _*).forEachRemaining(new Consumer[Edge] {
override def accept(edge: Edge): Unit = {
- val s2Edge = edge.asInstanceOf[S2Edge]
+ val s2Edge = edge.asInstanceOf[S2EdgeLike]
s2Edge.direction match {
case "out" => arr.add(edge.inVertex())
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
index 098d0b4..e5005b7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala
@@ -29,7 +29,7 @@ import org.apache.lucene.queryparser.classic.{ParseException, QueryParser}
import org.apache.lucene.search.IndexSearcher
import org.apache.lucene.store.{BaseDirectory, RAMDirectory}
import org.apache.s2graph.core.io.Conversions
-import org.apache.s2graph.core.{EdgeId, S2Edge, S2Vertex, S2VertexLike}
+import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls._
import org.apache.s2graph.core.types.{InnerValLike, VertexId}
import org.apache.s2graph.core.utils.logger
@@ -130,8 +130,8 @@ trait IndexProvider {
def mutateVertices(vertices: Seq[S2VertexLike]): Seq[Boolean]
def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]]
- def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean]
- def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]]
+ def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean]
+ def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]]
def shutdown(): Unit
}
@@ -179,7 +179,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
}
}
- private def toDocument(globalIndex: GlobalIndex, edge: S2Edge): Option[Document] = {
+ private def toDocument(globalIndex: GlobalIndex, edge: S2EdgeLike): Option[Document] = {
val props = edge.propsWithTs.asScala
val exist = props.exists(t => globalIndex.propNamesSet(t._1))
if (!exist) None
@@ -222,7 +222,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
vertices.map(_ => true)
}
- override def mutateEdges(edges: Seq[S2Edge]): Seq[Boolean] = {
+ override def mutateEdges(edges: Seq[S2EdgeLike]): Seq[Boolean] = {
val globalIndexOptions = GlobalIndex.findAll(GlobalIndex.EdgeType)
globalIndexOptions.map { globalIndex =>
@@ -316,5 +316,5 @@ class LuceneIndexProvider(config: Config) extends IndexProvider {
override def mutateVerticesAsync(vertices: Seq[S2VertexLike]): Future[Seq[Boolean]] = Future.successful(mutateVertices(vertices))
- override def mutateEdgesAsync(edges: Seq[S2Edge]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges))
+ override def mutateEdgesAsync(edges: Seq[S2EdgeLike]): Future[Seq[Boolean]] = Future.successful(mutateEdges(edges))
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index d754bb7..a0d56b2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.core.parsers
import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, WhereParserException}
import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
import org.apache.s2graph.core.types.InnerValLike
-import org.apache.s2graph.core.{S2Edge, GraphUtil}
+import org.apache.s2graph.core.{GraphUtil, S2Edge, S2EdgeLike}
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.utils.logger
@@ -33,7 +33,7 @@ import scala.util.parsing.combinator.JavaTokenParsers
trait ExtractValue {
val parent = "_parent."
- def propToInnerVal(edge: S2Edge, key: String) = {
+ def propToInnerVal(edge: S2EdgeLike, key: String) = {
val (propKey, parentEdge) = findParentEdge(edge, key)
val label = parentEdge.innerLabel
@@ -47,7 +47,7 @@ trait ExtractValue {
}
}
- def valueToCompare(edge: S2Edge, key: String, value: String) = {
+ def valueToCompare(edge: S2EdgeLike, key: String, value: String) = {
val label = edge.innerLabel
if (value.startsWith(parent) || label.metaPropsInvMap.contains(value)) propToInnerVal(edge, value)
else {
@@ -65,11 +65,11 @@ trait ExtractValue {
}
@tailrec
- private def findParent(edge: S2Edge, depth: Int): S2Edge =
+ private def findParent(edge: S2EdgeLike, depth: Int): S2EdgeLike =
if (depth > 0) findParent(edge.parentEdges.head.edge, depth - 1)
else edge
- private def findParentEdge(edge: S2Edge, key: String): (String, S2Edge) = {
+ private def findParentEdge(edge: S2EdgeLike, key: String): (String, S2EdgeLike) = {
if (!key.startsWith(parent)) (key, edge)
else {
val split = key.split(parent)
@@ -88,9 +88,9 @@ trait Clause extends ExtractValue {
def or(otherField: Clause): Clause = Or(this, otherField)
- def filter(edge: S2Edge): Boolean
+ def filter(edge: S2EdgeLike): Boolean
- def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: S2Edge): Boolean = {
+ def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: S2EdgeLike): Boolean = {
val propValue = propToInnerVal(edge, propKey)
val compValue = valueToCompare(edge, propKey, value)
@@ -105,20 +105,20 @@ object Where {
}
}
case class Where(clauses: Seq[Clause] = Seq.empty[Clause]) {
- def filter(edge: S2Edge) =
+ def filter(edge: S2EdgeLike) =
if (clauses.isEmpty) true else clauses.map(_.filter(edge)).forall(identity)
}
case class Gt(propKey: String, value: String) extends Clause {
- override def filter(edge: S2Edge): Boolean = binaryOp(_ > _)(propKey, value)(edge)
+ override def filter(edge: S2EdgeLike): Boolean = binaryOp(_ > _)(propKey, value)(edge)
}
case class Lt(propKey: String, value: String) extends Clause {
- override def filter(edge: S2Edge): Boolean = binaryOp(_ < _)(propKey, value)(edge)
+ override def filter(edge: S2EdgeLike): Boolean = binaryOp(_ < _)(propKey, value)(edge)
}
case class Eq(propKey: String, value: String) extends Clause {
- override def filter(edge: S2Edge): Boolean = binaryOp(_ == _)(propKey, value)(edge)
+ override def filter(edge: S2EdgeLike): Boolean = binaryOp(_ == _)(propKey, value)(edge)
}
case class InWithoutParent(label: Label, propKey: String, values: Set[String]) extends Clause {
@@ -144,7 +144,7 @@ case class InWithoutParent(label: Label, propKey: String, values: Set[String]) e
toInnerVal(value, dataType, label.schemaVersion)
}
- override def filter(edge: S2Edge): Boolean = {
+ override def filter(edge: S2EdgeLike): Boolean = {
if (edge.dir == GraphUtil.directions("in")) {
val propVal = propToInnerVal(edge, propKey)
innerValLikeLsIn.contains(propVal)
@@ -156,7 +156,7 @@ case class InWithoutParent(label: Label, propKey: String, values: Set[String]) e
}
case class IN(propKey: String, values: Set[String]) extends Clause {
- override def filter(edge: S2Edge): Boolean = {
+ override def filter(edge: S2EdgeLike): Boolean = {
val propVal = propToInnerVal(edge, propKey)
values.exists { value =>
valueToCompare(edge, propKey, value) == propVal
@@ -165,7 +165,7 @@ case class IN(propKey: String, values: Set[String]) extends Clause {
}
case class Between(propKey: String, minValue: String, maxValue: String) extends Clause {
- override def filter(edge: S2Edge): Boolean = {
+ override def filter(edge: S2EdgeLike): Boolean = {
val propVal = propToInnerVal(edge, propKey)
val minVal = valueToCompare(edge, propKey, minValue)
val maxVal = valueToCompare(edge, propKey, maxValue)
@@ -175,15 +175,15 @@ case class Between(propKey: String, minValue: String, maxValue: String) extends
}
case class Not(self: Clause) extends Clause {
- override def filter(edge: S2Edge) = !self.filter(edge)
+ override def filter(edge: S2EdgeLike) = !self.filter(edge)
}
case class And(left: Clause, right: Clause) extends Clause {
- override def filter(edge: S2Edge) = left.filter(edge) && right.filter(edge)
+ override def filter(edge: S2EdgeLike) = left.filter(edge) && right.filter(edge)
}
case class Or(left: Clause, right: Clause) extends Clause {
- override def filter(edge: S2Edge) = left.filter(edge) || right.filter(edge)
+ override def filter(edge: S2EdgeLike) = left.filter(edge) || right.filter(edge)
}
object WhereParser {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 55b6e12..6afbd87 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -548,12 +548,12 @@ class RequestParser(graph: S2Graph) {
elementsWithTsv
}
- def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = {
+ def parseJsonFormat(jsValue: JsValue, operation: String): Seq[(S2EdgeLike, String)] = {
val jsValues = toJsValues(jsValue)
jsValues.flatMap(toEdgeWithTsv(_, operation))
}
- private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(S2Edge, String)] = {
+ private def toEdgeWithTsv(jsValue: JsValue, operation: String): Seq[(S2EdgeLike, String)] = {
val srcIds = (jsValue \ "from").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "froms").asOpt[Seq[JsValue]].getOrElse(Nil)
val tgtIds = (jsValue \ "to").asOpt[JsValue].map(Seq(_)).getOrElse(Nil) ++ (jsValue \ "tos").asOpt[Seq[JsValue]].getOrElse(Nil)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 01dd128..e6075ec 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -93,13 +93,13 @@ abstract class Storage(val graph: S2Graph,
def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] =
io.buildIncrementsCountAsync(indexedEdge, amount)
- def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] =
+ def buildVertexPutsAsync(edge: S2EdgeLike): Seq[SKeyValue] =
io.buildVertexPutsAsync(edge)
def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] =
io.snapshotEdgeMutations(edgeMutate)
- def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] =
+ def buildDegreePuts(edge: S2EdgeLike, degreeVal: Long): Seq[SKeyValue] =
io.buildDegreePuts(edge, degreeVal)
def buildPutsAll(vertex: S2VertexLike): Seq[SKeyValue] =
@@ -121,15 +121,15 @@ abstract class Storage(val graph: S2Graph,
def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] =
fetcher.fetchVertices(vertices)
- def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = fetcher.fetchEdgesAll()
+ def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = fetcher.fetchEdgesAll()
def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = fetcher.fetchVerticesAll()
- def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] =
+ def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] =
fetcher.fetchSnapshotEdgeInner(edge)
/** Conflict Resolver **/
- def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] =
+ def retry(tryNum: Int)(edges: Seq[S2EdgeLike], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] =
conflictResolver.retry(tryNum)(edges, statusCode, fetchedSnapshotEdgeOpt)
/** Management **/
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
index 4014b6d..d0a59b2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala
@@ -32,8 +32,8 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
/** Parsing Logic: parse from kv from Storage into Edge */
def toEdge[K: CanSKeyValue](kv: K,
queryRequest: QueryRequest,
- cacheElementOpt: Option[S2Edge],
- parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = {
+ cacheElementOpt: Option[S2EdgeLike],
+ parentEdges: Seq[EdgeWithScore]): Option[S2EdgeLike] = {
logger.debug(s"toEdge: $kv")
try {
@@ -41,7 +41,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
val queryParam = queryRequest.queryParam
val schemaVer = queryParam.label.schemaVersion
val indexEdgeOpt = serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), cacheElementOpt)
- if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copy(parentEdges = parentEdges))
+ if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copyParentEdges(parentEdges))
else indexEdgeOpt
} catch {
case ex: Exception =>
@@ -54,7 +54,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
queryRequest: QueryRequest,
cacheElementOpt: Option[SnapshotEdge] = None,
isInnerCall: Boolean,
- parentEdges: Seq[EdgeWithScore]): Option[S2Edge] = {
+ parentEdges: Seq[EdgeWithScore]): Option[S2EdgeLike] = {
// logger.debug(s"SnapshottoEdge: $kv")
val queryParam = queryRequest.queryParam
val schemaVer = queryParam.label.schemaVersion
@@ -62,7 +62,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
if (isInnerCall) {
snapshotEdgeOpt.flatMap { snapshotEdge =>
- val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
+ val edge = snapshotEdge.toEdge.copyParentEdges(parentEdges)
if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge)
else None
}
@@ -70,7 +70,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
snapshotEdgeOpt.flatMap { snapshotEdge =>
if (snapshotEdge.allPropsDeleted) None
else {
- val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
+ val edge = snapshotEdge.toEdge.copyParentEdges(parentEdges)
if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge)
else None
}
@@ -144,7 +144,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
}
val tsVal = processTimeDecay(queryParam, edge)
val newScore = degreeScore + score
- EdgeWithScore(convertedEdge.copy(parentEdges = parentEdges), score = newScore * labelWeight * tsVal, label = label)
+ EdgeWithScore(convertedEdge.copyParentEdges(parentEdges), score = newScore * labelWeight * tsVal, label = label)
}
val sampled =
@@ -229,11 +229,11 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
}
}
- def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = {
+ def buildVertexPutsAsync(edge: S2EdgeLike): Seq[SKeyValue] = {
val storeVertex = edge.innerLabel.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false)
if (storeVertex) {
- if (edge.op == GraphUtil.operations("delete"))
+ if (edge.getOp() == GraphUtil.operations("delete"))
buildDeleteBelongsToId(edge.srcForVertex) ++ buildDeleteBelongsToId(edge.tgtForVertex)
else
serDe.vertexSerializer(edge.srcForVertex).toKeyValues ++ serDe.vertexSerializer(edge.tgtForVertex).toKeyValues
@@ -242,7 +242,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) {
}
}
- def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = {
+ def buildDegreePuts(edge: S2EdgeLike, degreeVal: Long): Seq[SKeyValue] = {
edge.propertyInner(LabelMeta.degree.name, degreeVal, edge.ts)
val kvs = edge.edgesWithIndexValid.flatMap { indexEdge =>
serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index c3b38e8..44bd4dc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -39,16 +39,16 @@ trait StorageReadable {
def fetches(queryRequests: Seq[QueryRequest],
prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
- def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]]
+ def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
- protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+ protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
- def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] = {
+ def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = {
val queryParam = QueryParam(labelName = edge.innerLabel.label,
direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
index 227cfa7..854fc18 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
@@ -52,7 +52,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
Random.nextInt(Math.min(BackoffTimeout, slot * Math.pow(2, tryNum)).toInt)
}
- def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] = {
+ def retry(tryNum: Int)(edges: Seq[S2EdgeLike], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = {
if (tryNum >= MaxRetryNum) {
edges.foreach { edge =>
logger.error(s"commit failed after $MaxRetryNum\n${edge.toLogString}")
@@ -112,9 +112,9 @@ class WriteWriteConflictResolver(graph: S2Graph,
}
}
- protected def commitUpdate(edges: Seq[S2Edge],
+ protected def commitUpdate(edges: Seq[S2EdgeLike],
statusCode: Byte,
- fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] = {
+ fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = {
// Future.failed(new PartialFailureException(edges.head, 0, "ahahah"))
assert(edges.nonEmpty)
// assert(statusCode == 0 || fetchedSnapshotEdgeOpt.isDefined)
@@ -135,7 +135,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
assert(edgeMutate.newSnapshotEdge.isDefined)
val lockTs = Option(System.currentTimeMillis())
- val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = squashedEdge.ts + 1)
+ val pendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(squashedEdge.ts + 1)
val lockSnapshotEdge = squashedEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
@@ -158,7 +158,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
Future.successful(true)
} else {
val lockTs = Option(System.currentTimeMillis())
- val pendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1)
+ val pendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(snapshotEdge.getVersion() + 1)
val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(pendingEdge))
val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
@@ -182,7 +182,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
else S2Edge.buildOperation(fetchedSnapshotEdgeOpt, pendingEdge +: edges)
val lockTs = Option(System.currentTimeMillis())
- val newPendingEdge = squashedEdge.copy(statusCode = 1, lockTs = lockTs, version = snapshotEdge.version + 1)
+ val newPendingEdge = squashedEdge.copyStatusCode(1).copyLockTs(lockTs).copyVersion(snapshotEdge.getVersion() + 1)
val lockSnapshotEdge = snapshotEdge.toSnapshotEdge.copy(pendingEdgeOpt = Option(newPendingEdge))
val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge.get.copy(statusCode = 0,
pendingEdgeOpt = None, version = lockSnapshotEdge.version + 1)
@@ -222,7 +222,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges
else edges
val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges)
- val newVersion = fetchedSnapshotEdgeOpt.map(_.version).getOrElse(squashedEdge.ts) + 2
+ val newVersion = fetchedSnapshotEdgeOpt.map(_.getVersion()).getOrElse(squashedEdge.ts) + 2
val releaseLockSnapshotEdge = edgeMutate.newSnapshotEdge match {
case None => squashedEdge.toSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion)
case Some(newSnapshotEdge) => newSnapshotEdge.copy(statusCode = 0, pendingEdgeOpt = None, version = newVersion)
@@ -246,8 +246,8 @@ class WriteWriteConflictResolver(graph: S2Graph,
* @return
*/
protected def commitProcess(statusCode: Byte,
- squashedEdge: S2Edge,
- fetchedSnapshotEdgeOpt: Option[S2Edge],
+ squashedEdge: S2EdgeLike,
+ fetchedSnapshotEdgeOpt: Option[S2EdgeLike],
lockSnapshotEdge: SnapshotEdge,
releaseLockSnapshotEdge: SnapshotEdge,
edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = {
@@ -259,7 +259,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
} yield lockReleased
}
- case class PartialFailureException(edge: S2Edge, statusCode: Byte, failReason: String) extends NoStackException(failReason)
+ case class PartialFailureException(edge: S2EdgeLike, statusCode: Byte, failReason: String) extends NoStackException(failReason)
protected def debug(ret: Boolean, phase: String, snapshotEdge: SnapshotEdge) = {
val msg = Seq(s"[$ret] [$phase]", s"${snapshotEdge.toLogString()}").mkString("\n")
@@ -282,8 +282,8 @@ class WriteWriteConflictResolver(graph: S2Graph,
* @return
*/
protected def acquireLock(statusCode: Byte,
- squashedEdge: S2Edge,
- fetchedSnapshotEdgeOpt: Option[S2Edge],
+ squashedEdge: S2EdgeLike,
+ fetchedSnapshotEdgeOpt: Option[S2EdgeLike],
lockEdge: SnapshotEdge)(implicit ec: ExecutionContext): Future[Boolean] = {
if (statusCode >= 1) {
logger.debug(s"skip acquireLock: [$statusCode]\n${squashedEdge.toLogString}")
@@ -334,7 +334,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
*/
protected def releaseLock(predicate: Boolean,
statusCode: Byte,
- squashedEdge: S2Edge,
+ squashedEdge: S2EdgeLike,
releaseLockEdge: SnapshotEdge)(implicit ec: ExecutionContext): Future[Boolean] = {
if (!predicate) {
Future.failed(new PartialFailureException(squashedEdge, 3, "predicate failed."))
@@ -379,7 +379,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
*/
protected def commitIndexEdgeMutations(predicate: Boolean,
statusCode: Byte,
- squashedEdge: S2Edge,
+ squashedEdge: S2EdgeLike,
edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = {
if (!predicate) Future.failed(new PartialFailureException(squashedEdge, 1, "predicate failed."))
else {
@@ -413,7 +413,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
*/
protected def commitIndexEdgeDegreeMutations(predicate: Boolean,
statusCode: Byte,
- squashedEdge: S2Edge,
+ squashedEdge: S2EdgeLike,
edgeMutate: EdgeMutate)(implicit ec: ExecutionContext): Future[Boolean] = {
def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
@@ -445,8 +445,8 @@ class WriteWriteConflictResolver(graph: S2Graph,
/** end of methods for consistency */
- def mutateLog(snapshotEdgeOpt: Option[S2Edge], edges: Seq[S2Edge],
- newEdge: S2Edge, edgeMutate: EdgeMutate) =
+ def mutateLog(snapshotEdgeOpt: Option[S2EdgeLike], edges: Seq[S2EdgeLike],
+ newEdge: S2EdgeLike, edgeMutate: EdgeMutate) =
Seq("----------------------------------------------",
s"SnapshotEdge: ${snapshotEdgeOpt.map(_.toLogString)}",
s"requestEdges: ${edges.map(_.toLogString).mkString("\n")}",
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index 92130f5..6be5f60 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -67,7 +67,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
* @param queryRequest
* @return
*/
- private def buildRequest(queryRequest: QueryRequest, edge: S2Edge) = {
+ private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike) = {
import Serializable._
val queryParam = queryRequest.queryParam
val label = queryParam.label
@@ -178,7 +178,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
Left(get)
}
- override def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext) = {
+ override def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext) = {
val rpc = buildRequest(queryRequest, edge)
fetchKeyValues(rpc)
}
@@ -221,7 +221,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
}
}
- override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = {
+ override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
val distinctLabels = labels.toSet
val scan = AsynchbasePatcher.newScanner(client, hTableName)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index 2501ed9..01f268b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -22,10 +22,9 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.{Label, LabelMeta, ServiceColumn}
-import org.apache.s2graph.core.storage.serde._
-import org.apache.s2graph.core.storage.serde.StorageDeserializable._
-import org.apache.s2graph.core.storage.serde.Deserializable
import org.apache.s2graph.core.storage.CanSKeyValue
+import org.apache.s2graph.core.storage.serde.Deserializable
+import org.apache.s2graph.core.storage.serde.StorageDeserializable._
import org.apache.s2graph.core.types._
object IndexEdgeDeserializable{
@@ -33,13 +32,13 @@ object IndexEdgeDeserializable{
}
class IndexEdgeDeserializable(graph: S2Graph,
bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong,
- tallSchemaVersions: Set[String] = Set(HBaseType.VERSION4)) extends Deserializable[S2Edge] {
+ tallSchemaVersions: Set[String] = Set(HBaseType.VERSION4)) extends Deserializable[S2EdgeLike] {
type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
- cacheElementOpt: Option[S2Edge]): Option[S2Edge] = {
+ cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = {
try {
assert(_kvs.size == 1)
@@ -79,8 +78,8 @@ class IndexEdgeDeserializable(graph: S2Graph,
edge.propertyInner(LabelMeta.timestamp.name, version, version)
edge.propertyInner(LabelMeta.degree.name, degreeVal, version)
edge.tgtVertex = graph.newVertex(tgtVertexId, version)
- edge.op = GraphUtil.defaultOpByte
- edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+ edge.setOp(GraphUtil.defaultOpByte)
+ edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
} else {
// not degree edge
val (idxPropsRaw, endAt) =
@@ -150,8 +149,8 @@ class IndexEdgeDeserializable(graph: S2Graph,
edge.propertyInner(LabelMeta.timestamp.name, tsVal, version)
edge.tgtVertex = graph.newVertex(tgtVertexId, version)
- edge.op = op
- edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+ edge.setOp(op)
+ edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
}
Option(edge)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 68732ce..a7fe8a1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -28,13 +28,13 @@ import org.apache.s2graph.core.storage.serde.Deserializable
import org.apache.s2graph.core.types._
class IndexEdgeDeserializable(graph: S2Graph,
- bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[S2Edge] {
+ bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[S2EdgeLike] {
type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
- cacheElementOpt: Option[S2Edge]): Option[S2Edge] = {
+ cacheElementOpt: Option[S2EdgeLike]): Option[S2EdgeLike] = {
try {
assert(_kvs.size == 1)
@@ -68,8 +68,8 @@ class IndexEdgeDeserializable(graph: S2Graph,
edge.propertyInner(LabelMeta.timestamp.name, version, version)
edge.propertyInner(LabelMeta.degree.name, degreeVal, version)
edge.tgtVertex = graph.newVertex(tgtVertexId, version)
- edge.op = GraphUtil.defaultOpByte
- edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+ edge.setOp(GraphUtil.defaultOpByte)
+ edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
} else {
pos = 0
val (idxPropsRaw, endAt) = bytesToProps(kv.qualifier, pos, schemaVer)
@@ -123,8 +123,8 @@ class IndexEdgeDeserializable(graph: S2Graph,
edge.propertyInner(LabelMeta.timestamp.name, tsVal, version)
edge.tgtVertex = graph.newVertex(tgtVertexId, version)
- edge.op = op
- edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
+ edge.setOp(op)
+ edge.setTsInnerValOpt(Option(InnerVal.withLong(tsVal, schemaVer)))
}
Option(edge)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
index 5f00b48..24775e7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -52,7 +52,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
snapshotEdge.pendingEdgeOpt match {
case None => valueBytes()
case Some(pendingEdge) =>
- val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
+ val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.getOp())
val versionBytes = Array.empty[Byte]
val propsBytes = pendingEdge.serializePropsWithTs()
val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
index df84e86..44f2596 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -59,7 +59,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
snapshotEdge.pendingEdgeOpt match {
case None => valueBytes()
case Some(pendingEdge) =>
- val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
+ val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.getOp())
val versionBytes = Array.empty[Byte]
val propsBytes = pendingEdge.serializePropsWithTs()
val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
index ad9299c..55658b9 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
@@ -37,7 +37,7 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels {
val ts = System.currentTimeMillis()
val dummyTs = LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)
- def validate(label: Label)(edge: S2Edge)(sql: String)(expected: Boolean) = {
+ def validate(label: Label)(edge: S2EdgeLike)(sql: String)(expected: Boolean) = {
def debug(whereOpt: Try[Where]) = {
println("==================")
println(s"$whereOpt")
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
index d8b2cfa..c7d474b 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/tinkerpop/S2GraphProvider.scala
@@ -37,6 +37,7 @@ import scala.collection.JavaConverters._
object S2GraphProvider {
val Implementation: Set[Class[_]] = Set(
+ classOf[S2EdgeLike],
classOf[S2Edge],
classOf[S2Vertex],
classOf[S2VertexLike],
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/87394b9f/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 101b331..2b31fcd 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -54,7 +54,7 @@ object EdgeController extends Controller {
graphElem match {
case v: S2VertexLike =>
enqueue(kafkaTopic, graphElem, tsv)
- case e: S2Edge =>
+ case e: S2EdgeLike =>
e.innerLabel.extraOptions.get("walLog") match {
case None =>
enqueue(kafkaTopic, e, tsv)
@@ -93,8 +93,8 @@ object EdgeController extends Controller {
val result = s2.mutateElements(elements.map(_._1), true)
result onComplete { results =>
results.get.zip(elements).map {
- case (r: MutateResponse, (e: S2Edge, tsv: String)) if !r.isSuccess =>
- val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){
+ case (r: MutateResponse, (e: S2EdgeLike, tsv: String)) if !r.isSuccess =>
+ val kafkaMessages = if(e.getOp() == GraphUtil.operations("deleteAll")){
toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), e.labelWithDir.dir, e.ts)
} else{
Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, Some(tsv)))