You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/30 11:48:59 UTC
[1/2] incubator-s2graph git commit: [S2GRAPH-129]: Restrict direct
access on Edge's properties from other classes. - add tp3 as dependencies. -
make propsWithTs as private.
Repository: incubator-s2graph
Updated Branches:
refs/heads/master f7154bac9 -> b89567606
[S2GRAPH-129]: Restrict direct access on Edge's properties from other classes.
- add tp3 as dependencies.
- make propsWithTs as private.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/292174ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/292174ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/292174ec
Branch: refs/heads/master
Commit: 292174ecf8da32604d61cb5f8388b8d9eeb54be3
Parents: f7154ba
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Nov 24 10:46:27 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 24 10:46:27 2016 +0900
----------------------------------------------------------------------
project/Common.scala | 1 +
s2core/build.sbt | 2 +-
.../scala/org/apache/s2graph/core/Edge.scala | 172 ++++++++++++++-----
.../scala/org/apache/s2graph/core/Graph.scala | 65 +++----
.../org/apache/s2graph/core/PostProcess.scala | 11 +-
.../org/apache/s2graph/core/QueryParam.scala | 8 +-
.../org/apache/s2graph/core/QueryResult.scala | 9 +-
.../org/apache/s2graph/core/S2Property.scala | 29 ++++
.../apache/s2graph/core/S2VertexProperty.scala | 28 +++
.../apache/s2graph/core/mysqls/ColumnMeta.scala | 1 +
.../org/apache/s2graph/core/mysqls/Label.scala | 2 +-
.../apache/s2graph/core/mysqls/LabelMeta.scala | 1 +
.../s2graph/core/parsers/WhereParser.scala | 9 +-
.../apache/s2graph/core/storage/Storage.scala | 30 ++--
.../core/storage/hbase/AsynchbaseStorage.scala | 5 +-
.../indexedge/tall/IndexEdgeSerializable.scala | 4 +-
.../indexedge/wide/IndexEdgeSerializable.scala | 4 +-
.../tall/SnapshotEdgeSerializable.scala | 5 +-
.../wide/SnapshotEdgeSerializable.scala | 5 +-
.../s2graph/core/types/InnerValLike.scala | 88 ++++++++++
.../s2graph/core/Integrate/QueryTest.scala | 2 +
.../s2graph/core/parsers/WhereParserTest.scala | 4 +-
.../loader/core/CounterEtlFunctions.scala | 2 +-
.../rest/play/controllers/EdgeController.scala | 4 +-
24 files changed, 347 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/project/Common.scala
----------------------------------------------------------------------
diff --git a/project/Common.scala b/project/Common.scala
index f3dfc68..036d5c9 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -26,6 +26,7 @@ object Common {
val hbaseVersion = "1.2.2"
val hadoopVersion = "2.7.3"
+ val tinkerpopVersion = "3.2.3"
/** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging libraries to forward JCL and JUL logs to SLF4j */
val loggingRuntime = Seq(
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 80f37b0..6434acc 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -42,7 +42,7 @@ libraryDependencies ++= Seq(
"io.netty" % "netty" % "3.9.4.Final" force(),
"org.hbase" % "asynchbase" % "1.7.2" excludeLogging(),
"net.bytebuddy" % "byte-buddy" % "1.4.26",
-
+ "org.apache.tinkerpop" % "gremlin-core" % tinkerpopVersion,
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
"org.specs2" %% "specs2-core" % specs2Version % "test"
)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 8a2784d..b27a05e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -19,14 +19,17 @@
package org.apache.s2graph.core
+import java.util
+
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.logger
+import org.apache.tinkerpop.gremlin.structure
import play.api.libs.json.{JsNumber, JsObject, Json}
import scala.util.hashing.MurmurHash3
-
+import org.apache.tinkerpop.gremlin.structure.{Edge => TpEdge, Direction, Property, Graph => TpGraph}
case class SnapshotEdge(srcVertex: Vertex,
tgtVertex: Vertex,
@@ -34,29 +37,33 @@ case class SnapshotEdge(srcVertex: Vertex,
direction: Int,
op: Byte,
version: Long,
- props: Map[LabelMeta, InnerValLikeWithTs],
+ private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs],
pendingEdgeOpt: Option[Edge],
statusCode: Byte = 0,
lockTs: Option[Long],
tsInnerValOpt: Option[InnerValLike] = None) {
lazy val labelWithDir = LabelWithDirection(label.id.get, direction)
- if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
+ if (!propsWithTs.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
// val label = Label.findById(labelWithDir.labelId)
lazy val schemaVer = label.schemaVersion
- lazy val propsWithoutTs = props.mapValues(_.innerVal)
- lazy val ts = props(LabelMeta.timestamp).innerVal.toString().toLong
+ lazy val propsWithoutTs = propsWithTs.mapValues(_.innerVal)
+ lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.toString().toLong
+
+ def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.map(kv => kv._1.seq -> kv._2).toSeq)
+
+ def allPropsDeleted = Edge.allPropsDeleted(propsWithTs)
def toEdge: Edge = {
- val ts = props.get(LabelMeta.timestamp).map(v => v.ts).getOrElse(version)
+ val ts = propsWithTs.get(LabelMeta.timestamp).map(v => v.ts).getOrElse(version)
Edge(srcVertex, tgtVertex, label, direction, op,
- version, props, pendingEdgeOpt = pendingEdgeOpt,
+ version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt,
statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
}
def propsWithName = (for {
- (meta, v) <- props
+ (meta, v) <- propsWithTs
jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
} yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
@@ -73,7 +80,7 @@ case class IndexEdge(srcVertex: Vertex,
op: Byte,
version: Long,
labelIndexSeq: Byte,
- props: Map[LabelMeta, InnerValLikeWithTs],
+ private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs],
tsInnerValOpt: Option[InnerValLike] = None) {
// if (!props.contains(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
// assert(props.contains(LabelMeta.timeStampSeq))
@@ -82,8 +89,8 @@ case class IndexEdge(srcVertex: Vertex,
lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in")
lazy val isOutEdge = !isInEdge
- lazy val ts = props(LabelMeta.timestamp).innerVal.toString.toLong
- lazy val degreeEdge = props.contains(LabelMeta.degree)
+ lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.toString.toLong
+ lazy val degreeEdge = propsWithTs.contains(LabelMeta.degree)
lazy val schemaVer = label.schemaVersion
lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get
@@ -96,7 +103,7 @@ case class IndexEdge(srcVertex: Vertex,
/** TODO: make sure call of this class fill props as this assumes */
lazy val orders = for (meta <- labelIndexMetaSeqs) yield {
- props.get(meta) match {
+ propsWithTs.get(meta) match {
case None =>
/**
@@ -118,7 +125,7 @@ case class IndexEdge(srcVertex: Vertex,
}
lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet
- lazy val metas = for ((meta, v) <- props if !ordersKeyMap.contains(meta)) yield meta -> v.innerVal
+ lazy val metas = for ((meta, v) <- propsWithTs if !ordersKeyMap.contains(meta)) yield meta -> v.innerVal
// lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
@@ -128,52 +135,111 @@ case class IndexEdge(srcVertex: Vertex,
lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length
def propsWithName = for {
- (meta, v) <- props if meta.seq >= 0
+ (meta, v) <- propsWithTs if meta.seq >= 0
jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
} yield meta.name -> jsValue
- def toEdge: Edge = Edge(srcVertex, tgtVertex, label, direction, op, version, props, tsInnerValOpt = tsInnerValOpt)
+ def toEdge: Edge = Edge(srcVertex, tgtVertex, label, direction, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
// only for debug
def toLogString() = {
List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t")
}
+
+ def property(key: String): Option[InnerValLikeWithTs] = {
+ label.metaPropsInvMap.get(key).map(labelMeta => property(labelMeta))
+ }
+
+ def property(labelMeta: LabelMeta): InnerValLikeWithTs = {
+ propsWithTs.get(labelMeta).getOrElse(label.metaPropsDefaultMapInner(labelMeta))
+ }
+
+ def updatePropsWithTs(others: Map[LabelMeta, InnerValLikeWithTs] = Map.empty): Map[LabelMeta, InnerValLikeWithTs] = {
+ if (others.isEmpty) propsWithTs
+ else propsWithTs ++ others
+ }
}
case class Edge(srcVertex: Vertex,
tgtVertex: Vertex,
- label: Label,
+ innerLabel: Label,
dir: Int,
op: Byte = GraphUtil.defaultOpByte,
version: Long = System.currentTimeMillis(),
- propsWithTs: Map[LabelMeta, InnerValLikeWithTs],
+ private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs],
parentEdges: Seq[EdgeWithScore] = Nil,
originalEdgeOpt: Option[Edge] = None,
pendingEdgeOpt: Option[Edge] = None,
statusCode: Byte = 0,
lockTs: Option[Long] = None,
- tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement {
+ tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with TpEdge {
- lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
-// if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
- // assert(propsWithTs.contains(LabelMeta.timeStampSeq))
- lazy val schemaVer = label.schemaVersion
+ lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir)
+ lazy val schemaVer = innerLabel.schemaVersion
lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.value match {
case b: BigDecimal => b.longValue()
case l: Long => l
case i: Int => i.toLong
case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].")
}
+
//FIXME
lazy val tsInnerVal = tsInnerValOpt.get.value
-// propsWithTs(LabelMeta.timestamp).innerVal.value
-
-// lazy val label = Label.findById(labelWithDir.labelId)
lazy val srcId = srcVertex.innerIdVal
lazy val tgtId = tgtVertex.innerIdVal
- lazy val labelName = label.label
+ lazy val labelName = innerLabel.label
lazy val direction = GraphUtil.fromDirection(dir)
+
+ def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
+
+ def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.map(kv => kv._1.seq -> kv._2).toSeq)
+
+ def updatePropsWithTs(others: Map[LabelMeta, InnerValLikeWithTs] = Map.empty): Map[LabelMeta, InnerValLikeWithTs] = {
+ if (others.isEmpty) propsWithTs
+ else propsWithTs ++ others
+ }
+
+ def propertyValue(key: String): Option[InnerValLikeWithTs] = {
+ key match {
+ case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts))
+ case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts))
+ case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts))
+ case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts))
+ case _ =>
+ innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta))
+ }
+ }
+
+ def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= {
+ propsWithTs.getOrElse(labelMeta, innerLabel.metaPropsDefaultMapInner(labelMeta))
+ }
+
+ def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
+ val labelMetas = for {
+ key <- keys
+ labelMeta <- innerLabel.metaPropsInvMap.get(key)
+ } yield labelMeta
+
+ propertyValuesInner(labelMetas)
+ }
+
+ def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
+ if (labelMetas.isEmpty) {
+ innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
+ labelMeta -> propertyValueInner(labelMeta)
+ }
+ } else {
+ // This is important since timestamp is required for all edges.
+ (LabelMeta.timestamp +: labelMetas).map { labelMeta =>
+ labelMeta -> propertyValueInner(labelMeta)
+ }.toMap
+ }
+ }
+
+// if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
+ // assert(propsWithTs.contains(LabelMeta.timeStampSeq))
+
lazy val properties = toProps()
def props = propsWithTs.mapValues(_.innerVal)
@@ -181,7 +247,7 @@ case class Edge(srcVertex: Vertex,
private def toProps(): Map[String, Any] = {
for {
- (labelMeta, defaultVal) <- label.metaPropsDefaultMapInner
+ (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner
} yield {
labelMeta.name -> propsWithTs.getOrElse(labelMeta, defaultVal).innerVal.value
}
@@ -189,7 +255,7 @@ case class Edge(srcVertex: Vertex,
def relatedEdges = {
if (labelWithDir.isDirected) {
- val skipReverse = label.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
+ val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
if (skipReverse) List(this) else List(this, duplicateEdge)
} else {
// val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
@@ -204,18 +270,18 @@ case class Edge(srcVertex: Vertex,
def srcForVertex = {
val belongLabelIds = Seq(labelWithDir.labelId)
if (labelWithDir.dir == GraphUtil.directions("in")) {
- Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+ Vertex(VertexId(innerLabel.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
} else {
- Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+ Vertex(VertexId(innerLabel.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
}
}
def tgtForVertex = {
val belongLabelIds = Seq(labelWithDir.labelId)
if (labelWithDir.dir == GraphUtil.directions("in")) {
- Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+ Vertex(VertexId(innerLabel.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
} else {
- Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+ Vertex(VertexId(innerLabel.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
}
}
@@ -228,13 +294,13 @@ case class Edge(srcVertex: Vertex,
def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
- override def serviceName = label.serviceName
+ override def serviceName = innerLabel.serviceName
override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|")
override def queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|")
- override def isAsync = label.isAsync
+ override def isAsync = innerLabel.isAsync
def isDegree = propsWithTs.contains(LabelMeta.degree)
@@ -246,11 +312,11 @@ case class Edge(srcVertex: Vertex,
def propsPlusTsValid = propsWithTs.filter(kv => LabelMeta.isValidSeq(kv._1.seq))
def edgesWithIndex = for (labelOrder <- labelOrders) yield {
- IndexEdge(srcVertex, tgtVertex, label, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+ IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
}
def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
- IndexEdge(srcVertex, tgtVertex, label, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
+ IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
}
/** force direction as out on invertedEdge */
@@ -259,7 +325,7 @@ case class Edge(srcVertex: Vertex,
// val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
- val ret = SnapshotEdge(smaller, larger, label, GraphUtil.directions("out"), op, version,
+ val ret = SnapshotEdge(smaller, larger, innerLabel, GraphUtil.directions("out"), op, version,
Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(ts, schemaVer), ts)) ++ propsWithTs,
pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
ret
@@ -278,7 +344,7 @@ case class Edge(srcVertex: Vertex,
}
def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(),
- "label" -> label.label, "service" -> label.serviceName)
+ "label" -> innerLabel.label, "service" -> innerLabel.serviceName)
def propsWithName =
for {
@@ -290,7 +356,7 @@ case class Edge(srcVertex: Vertex,
def updateTgtVertex(id: InnerValLike) = {
val newId = TargetVertexId(tgtVertex.id.colId, id)
val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props)
- Edge(srcVertex, newTgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+ Edge(srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
}
def rank(r: RankParam): Double =
@@ -316,8 +382,28 @@ case class Edge(srcVertex: Vertex,
def toLogString: String = {
val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
- List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, allPropsWithName).mkString("\t")
+ List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, allPropsWithName).mkString("\t")
}
+
+ override def vertices(direction: Direction): util.Iterator[structure.Vertex] = ???
+
+ override def properties[V](strings: String*): util.Iterator[Property[V]] = ???
+
+ override def property[V](key: String): Property[V] = ???
+
+ override def property[V](key: String, value: V): Property[V] = {
+ property(key, value, System.currentTimeMillis())
+ }
+
+ def property[V](key: String, value: V, ts: Long): Property[V] = ???
+
+ override def remove(): Unit = ???
+
+ override def graph(): TpGraph = ???
+
+ override def id(): AnyRef = ???
+
+ override def label(): String = innerLabel.label
}
@@ -399,14 +485,14 @@ object Edge {
val funcs = requestEdges.map { edge =>
if (edge.op == GraphUtil.operations("insert")) {
- edge.label.consistencyLevel match {
+ edge.innerLabel.consistencyLevel match {
case "strong" => Edge.mergeUpsert _
case _ => Edge.mergeInsertBulk _
}
} else if (edge.op == GraphUtil.operations("insertBulk")) {
Edge.mergeInsertBulk _
} else if (edge.op == GraphUtil.operations("delete")) {
- edge.label.consistencyLevel match {
+ edge.innerLabel.consistencyLevel match {
case "strong" => Edge.mergeDelete _
case _ => throw new RuntimeException("not supported")
}
@@ -438,7 +524,7 @@ object Edge {
val maxTs = prevPropsWithTs.map(_._2.ts).max
val newTs = if (maxTs > requestTs) maxTs else requestTs
val propsWithTs = prevPropsWithTs ++
- Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.label.schemaVersion), newTs))
+ Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.innerLabel.schemaVersion), newTs))
val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index ca32a14..a2b17ef 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -220,7 +220,7 @@ object Graph {
case None => 1.0
case Some(timeDecay) =>
val tsVal = try {
- val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMeta)
+ val innerValWithTsOpt = edge.propertyValue(timeDecay.labelMeta.name)
innerValWithTsOpt.map { innerValWithTs =>
val innerVal = innerValWithTs.innerVal
timeDecay.labelMeta.dataType match {
@@ -324,17 +324,7 @@ object Graph {
val label = edgeWithScore.label
/** Select */
- val mergedPropsWithTs =
- if (queryOption.selectColumns.isEmpty) {
- label.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
- labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultVal)
- }
- } else {
- val initial = Map(LabelMeta.timestamp -> edge.propsWithTs(LabelMeta.timestamp))
- propsSelectColumns.foldLeft(initial) { case (prev, labelMeta) =>
- prev + (labelMeta -> edge.propsWithTs.getOrElse(labelMeta, label.metaPropsDefaultMapInner(labelMeta)))
- }
- }
+ val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
@@ -414,19 +404,10 @@ object Graph {
/** Select */
val mergedPropsWithTs =
if (queryOption.selectColumns.isEmpty) {
- label.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
- labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultVal)
- }
+ edge.propertyValuesInner()
} else {
- val initial = Map(LabelMeta.timestamp -> edge.propsWithTs(LabelMeta.timestamp))
- queryOption.selectColumns.foldLeft(initial) { case (acc, labelMetaName) =>
- label.metaPropsDefaultMapInnerString.get(labelMetaName) match {
- case None => acc
- case Some(defaultValue) =>
- val labelMeta = label.metaPropsInvMap(labelMetaName)
- acc + (labelMeta -> edge.propsWithTs.getOrElse(labelMeta, defaultValue))
- }
- }
+ val initial = Map(LabelMeta.timestamp -> edge.propertyValueInner(LabelMeta.timestamp))
+ edge.propertyValues(queryOption.selectColumns) ++ initial
}
val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
@@ -475,7 +456,7 @@ object Graph {
stepIdx: Int,
stepResultLs: Seq[(QueryRequest, StepResult)],
parentEdges: Map[VertexId, Seq[EdgeWithScore]])
- (createFunc: (EdgeWithScore, Set[LabelMeta]) => T)
+ (createFunc: (EdgeWithScore, Seq[LabelMeta]) => T)
(implicit ev: WithScore[T]): ListBuffer[T] = {
import scala.collection._
@@ -500,7 +481,7 @@ object Graph {
val propsSelectColumns = (for {
column <- queryOption.propsSelectColumns
labelMeta <- label.metaPropsInvMap.get(column)
- } yield labelMeta).toSet
+ } yield labelMeta)
for {
edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges)
@@ -831,14 +812,14 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
/** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
* so use empty cacheKey.
* */
- val queryParam = QueryParam(labelName = edge.label.label,
+ val queryParam = QueryParam(labelName = edge.innerLabel.label,
direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
cacheTTLInMillis = -1)
val q = Query.toQuery(Seq(edge.srcVertex), queryParam)
val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
- val storage = getStorage(edge.label)
+ val storage = getStorage(edge.innerLabel)
storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs =>
val (edgeOpt, kvOpt) =
if (kvs.isEmpty) (None, None)
@@ -927,7 +908,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
if deleteStepInnerResult.edgeWithScores.nonEmpty
} yield {
val head = deleteStepInnerResult.edgeWithScores.head
- val label = head.edge.label
+ val label = head.edge.innerLabel
val ret = label.schemaVersion match {
case HBaseType.VERSION3 | HBaseType.VERSION4 =>
if (label.consistencyLevel == "strong") {
@@ -965,16 +946,18 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
if (filtered.isEmpty) StepResult.Empty
else {
val head = filtered.head
- val label = head.edge.label
+ val label = head.edge.innerLabel
val edgeWithScoreLs = filtered.map { edgeWithScore =>
val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
case "strong" =>
- val _newPropsWithTs = edgeWithScore.edge.propsWithTs ++
- Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
+ val _newPropsWithTs = edgeWithScore.edge.updatePropsWithTs(
+ Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion))
+ )
+
(GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
case _ =>
val oldEdge = edgeWithScore.edge
- (oldEdge.op, oldEdge.version, oldEdge.propsWithTs)
+ (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs())
}
val copiedEdge =
@@ -1029,11 +1012,11 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
val (strongEdges, weakEdges) =
edgeWithIdxs.partition { case (edge, idx) =>
val e = edge
- e.label.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")
+ e.innerLabel.consistencyLevel == "strong" && e.op != GraphUtil.operations("insertBulk")
}
- val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.label.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
- val futures = edgeWithIdxs.groupBy(_._1.label).map { case (label, edgeGroup) =>
+ val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
+ val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) =>
val storage = getStorage(label)
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
@@ -1056,10 +1039,10 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.op == GraphUtil.operations("deleteAll") }
val deleteAllFutures = strongDeleteAll.map { case (edge, idx) =>
- deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.label), edge.labelWithDir.dir, edge.ts).map(idx -> _)
+ deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.labelWithDir.dir, edge.ts).map(idx -> _)
}
- val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.label }.map { case (label, edgeGroup) =>
+ val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) =>
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
val storage = getStorage(label)
@@ -1087,19 +1070,19 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) {
def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = {
val edgesWithIdx = edges.zipWithIndex
- val futures = edgesWithIdx.groupBy { case (e, idx) => e.label }.map { case (label, edgeGroup) =>
+ val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
}
Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
}
def updateDegree(edge: Edge, degreeVal: Long = 0): Future[Boolean] = {
- val label = edge.label
+ val label = edge.innerLabel
val storage = getStorage(label)
val kvs = storage.buildDegreePuts(edge, degreeVal)
- storage.writeToStorage(edge.label.service.cluster, kvs, withWait = true)
+ storage.writeToStorage(edge.innerLabel.service.cluster, kvs, withWait = true)
}
def shutdown(): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
index 6c8563c..083159f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala
@@ -83,7 +83,7 @@ object PostProcess {
builder += ("from" -> anyValToJsValue(s2Edge.srcId).get)
builder += ("label" -> anyValToJsValue(label.label).get)
builder += ("direction" -> anyValToJsValue(s2Edge.direction).get)
- builder += (LabelMeta.degree.name -> anyValToJsValue(s2Edge.propsWithTs(LabelMeta.degree).innerVal.value).get)
+ builder += (LabelMeta.degree.name -> anyValToJsValue(s2Edge.propertyValueInner(LabelMeta.degree).innerVal.value).get)
JsObject(builder)
} else {
if (queryOption.withScore) builder += ("score" -> anyValToJsValue(score).get)
@@ -95,7 +95,7 @@ object PostProcess {
val innerProps = ArrayBuffer.empty[(String, JsValue)]
for {
- (labelMeta, v) <- edgeWithScore.edge.propsWithTs
+ (labelMeta, v) <- edgeWithScore.edge.propertyValues()
jsValue <- anyValToJsValue(v.innerVal.value)
} {
innerProps += (labelMeta.name -> jsValue)
@@ -126,9 +126,10 @@ object PostProcess {
}
val innerProps = ArrayBuffer.empty[(String, JsValue)]
for {
- (labelMeta, v) <- edgeWithScore.edge.propsWithTs
- if !checkSelectColumns || queryOption.selectColumnsMap.contains(labelMeta.name)
- jsValue <- anyValToJsValue(v.innerVal.value)
+ (selectColumnName, _) <- queryOption.selectColumnsMap
+ labelMeta <- label.metaPropsInvMap.get(selectColumnName)
+ innerValWithTs = edgeWithScore.edge.propertyValueInner(labelMeta)
+ jsValue <- anyValToJsValue(innerValWithTs.innerVal.value)
} {
innerProps += (labelMeta.name -> jsValue)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index b481880..7b10709 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -166,11 +166,7 @@ case class EdgeTransformer(jsValue: JsValue) {
fieldName match {
case LabelMeta.to.name => Option(edge.tgtVertex.innerId)
case LabelMeta.from.name => Option(edge.srcVertex.innerId)
- case _ =>
- for {
- labelMeta <- queryParam.label.metaPropsInvMap.get(fieldName)
- value <- edge.propsWithTs.get(labelMeta)
- } yield value.innerVal
+ case _ => edge.propertyValue(fieldName).map(_.innerVal)
}
}
@@ -376,7 +372,7 @@ case class QueryParam(labelName: String,
val propKey = _propKey.split("_parent.").last
val padding = Try(_padding.trim.toLong).getOrElse(0L)
- val labelMeta = edge.label.metaPropsInvMap.getOrElse(propKey, throw new RuntimeException(s"$propKey not found in ${edge} labelMetas."))
+ val labelMeta = edge.innerLabel.metaPropsInvMap.getOrElse(propKey, throw new RuntimeException(s"$propKey not found in ${edge} labelMetas."))
val propVal =
if (InnerVal.isNumericType(labelMeta.dataType)) {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
index dd7e45d..d8416c2 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala
@@ -83,15 +83,8 @@ case class EdgeWithScore(edge: Edge,
accumulatedScores: Map[String, Double] = Map.empty) {
def toValue(keyName: String): Option[Any] = keyName match {
- case "from" | "_from" => Option(edge.srcId)
- case "to" | "_to" => Option(edge.tgtId)
- case "label" => Option(label.label)
- case "direction" => Option(edge.dir)
case "score" => Option(score)
- case _ =>
- label.metaPropsInvMap.get(keyName).flatMap { labelMeta =>
- edge.propsWithTs.get(labelMeta).orElse(label.metaPropsDefaultMapInner.get(labelMeta)).map(_.innerVal.value)
- }
+ case _ => edge.propertyValue(keyName).map(_.innerVal.value)
}
def toValues(keyNames: Seq[String]): Seq[Option[Any]] = for {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
new file mode 100644
index 0000000..67a9d4c
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala
@@ -0,0 +1,29 @@
+package org.apache.s2graph.core
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.types.CanInnerValLike
+import org.apache.tinkerpop.gremlin.structure.{Element, Property}
+
+
+case class S2Property[V](element: Element,
+ labelMeta: LabelMeta,
+ key: String,
+ value: V,
+ ts: Long = System.currentTimeMillis()) extends Property[V] {
+
+ import CanInnerValLike._
+ lazy val innerVal = anyToInnerValLike.toInnerVal(value, labelMeta.label.schemaVersion)
+
+ def bytes: Array[Byte] = {
+ innerVal.bytes
+ }
+
+ def bytesWithTs: Array[Byte] = {
+ Bytes.add(innerVal.bytes, Bytes.toBytes(ts))
+ }
+
+ override def isPresent: Boolean = ???
+
+ override def remove(): Unit = ???
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
new file mode 100644
index 0000000..e6da3f6
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexProperty.scala
@@ -0,0 +1,28 @@
+package org.apache.s2graph.core
+
+import java.util
+
+import org.apache.s2graph.core.mysqls.ColumnMeta
+import org.apache.s2graph.core.types.CanInnerValLike
+import org.apache.tinkerpop.gremlin.structure.{Property, VertexProperty, Vertex => TpVertex}
+
+case class S2VertexProperty[V](element: TpVertex,
+ columnMeta: ColumnMeta,
+ key: String,
+ value: V) extends VertexProperty[V] {
+ implicit val encodingVer = columnMeta.serviceColumn.schemaVersion
+ val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(value)
+ def toBytes: Array[Byte] = {
+ innerVal.bytes
+ }
+
+ override def properties[U](strings: String*): util.Iterator[Property[U]] = ???
+
+ override def property[V](s: String, v: V): Property[V] = ???
+
+ override def remove(): Unit = ???
+
+ override def id(): AnyRef = ???
+
+ override def isPresent: Boolean = ???
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
index 9af6243..f6c174d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ColumnMeta.scala
@@ -125,5 +125,6 @@ object ColumnMeta extends Model[ColumnMeta] {
}
case class ColumnMeta(id: Option[Int], columnId: Int, name: String, seq: Byte, dataType: String) {
+ lazy val serviceColumn = ServiceColumn.findById(columnId)
lazy val toJson = Json.obj("name" -> name, "dataType" -> dataType)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
index 09d15d7..4970912 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
@@ -362,7 +362,7 @@ case class Label(id: Option[Int], label: String,
} yield prop.name -> innerVal).toMap
lazy val metaPropsDefaultMapInner = (for {
- prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
+ prop <- metaPropsInner
innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
} yield prop -> innerVal).toMap
lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
index 4a7e931..6636649 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
@@ -191,6 +191,7 @@ case class LabelMeta(id: Option[Int],
seq: Byte,
defaultValue: String,
dataType: String) {
+ lazy val label = Label.findById(labelId)
lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType)
override def equals(other: Any): Boolean = {
if (!other.isInstanceOf[LabelMeta]) false
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index effb94b..aa018a9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -36,22 +36,19 @@ trait ExtractValue {
def propToInnerVal(edge: Edge, key: String) = {
val (propKey, parentEdge) = findParentEdge(edge, key)
- val label = parentEdge.label
+ val label = parentEdge.innerLabel
val metaPropInvMap = label.metaPropsInvMap
val labelMeta = metaPropInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey"))
labelMeta match {
case LabelMeta.from => parentEdge.srcVertex.innerId
case LabelMeta.to => parentEdge.tgtVertex.innerId
- case _ => parentEdge.propsWithTs.get(labelMeta) match {
- case None => toInnerVal(labelMeta.defaultValue, labelMeta.dataType, label.schemaVersion)
- case Some(edgeVal) => edgeVal.innerVal
- }
+ case _ => parentEdge.propertyValueInner(labelMeta).innerVal
}
}
def valueToCompare(edge: Edge, key: String, value: String) = {
- val label = edge.label
+ val label = edge.innerLabel
if (value.startsWith(parent) || label.metaPropsInvMap.contains(value)) propToInnerVal(edge, value)
else {
val (propKey, _) = findParentEdge(edge, key)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index f2b07cd..b1ef11d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -301,7 +301,7 @@ abstract class Storage[Q, R](val graph: Graph,
val edgeWithIdxs = _edges.zipWithIndex
val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
- (edge.label, edge.srcVertex.innerId, edge.tgtVertex.innerId)
+ (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
} toSeq
val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
@@ -314,7 +314,7 @@ abstract class Storage[Q, R](val graph: Graph,
//TODO: decide what we will do on failure on vertex put
val puts = buildVertexPutsAsync(head)
- val vertexFuture = writeToStorage(head.label.hbaseZkAddr, puts, withWait)
+ val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
Seq(edgeFuture, vertexFuture)
case Nil => Nil
}
@@ -358,7 +358,7 @@ abstract class Storage[Q, R](val graph: Graph,
// TODO:: remove after code review: unreachable code
if (!checkConsistency) {
- val zkQuorum = edges.head.label.hbaseZkAddr
+ val zkQuorum = edges.head.innerLabel.hbaseZkAddr
val futures = edges.map { edge =>
val (_, edgeUpdate) = Edge.buildOperation(None, Seq(edge))
@@ -672,7 +672,7 @@ abstract class Storage[Q, R](val graph: Graph,
if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 3, s"$p"))
else {
val releaseLockEdgePuts = snapshotEdgeSerializer(releaseLockEdge).toKeyValues
- writeToStorage(squashedEdge.label.hbaseZkAddr, releaseLockEdgePuts, withWait = true).recoverWith {
+ writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, releaseLockEdgePuts, withWait = true).recoverWith {
case ex: Exception =>
logger.error(s"ReleaseLock RPC Failed.")
throw new PartialFailureException(squashedEdge, 3, "ReleaseLock RPC Failed")
@@ -719,7 +719,7 @@ abstract class Storage[Q, R](val graph: Graph,
val p = Random.nextDouble()
if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 1, s"$p"))
else
- writeToStorage(squashedEdge.label.hbaseZkAddr, indexedEdgeMutations(edgeMutate), withWait = true).map { ret =>
+ writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, indexedEdgeMutations(edgeMutate), withWait = true).map { ret =>
if (ret) {
debug(ret, "mutate", squashedEdge.toSnapshotEdge, edgeMutate)
} else {
@@ -746,7 +746,7 @@ abstract class Storage[Q, R](val graph: Graph,
edgeMutate: EdgeMutate): Future[Boolean] = {
def _write(kvs: Seq[SKeyValue], withWait: Boolean): Future[Boolean] = {
- writeToStorage(squashedEdge.label.hbaseZkAddr, kvs, withWait = withWait).map { ret =>
+ writeToStorage(squashedEdge.innerLabel.hbaseZkAddr, kvs, withWait = withWait).map { ret =>
if (ret) {
debug(ret, "increment", squashedEdge.toSnapshotEdge, edgeMutate)
} else {
@@ -789,7 +789,7 @@ abstract class Storage[Q, R](val graph: Graph,
if (stepInnerResult.isEmpty) Future.successful(true)
else {
val head = stepInnerResult.edgeWithScores.head
- val zkQuorum = head.edge.label.hbaseZkAddr
+ val zkQuorum = head.edge.innerLabel.hbaseZkAddr
val futures = for {
edgeWithScore <- stepInnerResult.edgeWithScores
} yield {
@@ -857,7 +857,7 @@ abstract class Storage[Q, R](val graph: Graph,
}
} else {
snapshotEdgeOpt.flatMap { snapshotEdge =>
- if (Edge.allPropsDeleted(snapshotEdge.props)) None
+ if (snapshotEdge.allPropsDeleted) None
else {
val edge = snapshotEdge.toEdge.copy(parentEdges = parentEdges)
if (queryParam.where.map(_.filter(edge)).getOrElse(true)) Option(edge)
@@ -982,7 +982,7 @@ abstract class Storage[Q, R](val graph: Graph,
/** TODO: Fix this. currently fetchSnapshotEdge should not use future cache
* so use empty cacheKey.
* */
- val queryParam = QueryParam(labelName = edge.label.label,
+ val queryParam = QueryParam(labelName = edge.innerLabel.label,
direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
cacheTTLInMillis = -1)
@@ -1075,14 +1075,14 @@ abstract class Storage[Q, R](val graph: Graph,
/** IndexEdge */
def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = {
- val newProps = indexedEdge.props ++ Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer))
- val _indexedEdge = indexedEdge.copy(props = newProps)
+ val newProps = indexedEdge.updatePropsWithTs(Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)))
+ val _indexedEdge = indexedEdge.copy(propsWithTs = newProps)
indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability))
}
def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = {
- val newProps = indexedEdge.props ++ Map(LabelMeta.count -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer))
- val _indexedEdge = indexedEdge.copy(props = newProps)
+ val newProps = indexedEdge.updatePropsWithTs(Map(LabelMeta.count -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer)))
+ val _indexedEdge = indexedEdge.copy(propsWithTs = newProps)
indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability))
}
@@ -1096,7 +1096,7 @@ abstract class Storage[Q, R](val graph: Graph,
}
def buildVertexPutsAsync(edge: Edge): Seq[SKeyValue] = {
- val storeVertex = edge.label.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false)
+ val storeVertex = edge.innerLabel.extraOptions.get("storeVertex").map(_.as[Boolean]).getOrElse(false)
if (storeVertex) {
if (edge.op == GraphUtil.operations("delete"))
@@ -1111,7 +1111,7 @@ abstract class Storage[Q, R](val graph: Graph,
def buildDegreePuts(edge: Edge, degreeVal: Long): Seq[SKeyValue] = {
val kvs = edge.edgesWithIndexValid.flatMap { _indexEdge =>
val newProps = Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, _indexEdge.ts, _indexEdge.schemaVer))
- val indexEdge = _indexEdge.copy(props = newProps)
+ val indexEdge = _indexEdge.copy(propsWithTs = newProps)
indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability))
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index e63dfea..b0287d5 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -251,8 +251,7 @@ class AsynchbaseStorage(override val graph: Graph,
val snapshotEdge = edge.toSnapshotEdge
snapshotEdgeSerializer(snapshotEdge)
} else {
- val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.label, edge.dir,
- edge.op, edge.version, queryParam.labelOrderSeq, edge.propsWithTs)
+ val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq)
indexEdgeSerializer(indexEdge)
}
@@ -435,7 +434,7 @@ class AsynchbaseStorage(override val graph: Graph,
relEdge <- edge.relatedEdges
edgeWithIndex <- relEdge.edgesWithIndexValid
} yield {
- val countWithTs = edge.propsWithTs(LabelMeta.count)
+ val countWithTs = edge.propertyValueInner(LabelMeta.count)
val countVal = countWithTs.innerVal.toString().toLong
val kv = buildIncrementsCountAsync(edgeWithIndex, countVal).head
val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
index cd242dc..2d49c11 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -59,9 +59,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byt
override def toValue: Array[Byte] =
if (indexEdge.degreeEdge)
- longToBytes(indexEdge.props(LabelMeta.degree).innerVal.toString().toLong)
+ longToBytes(indexEdge.property(LabelMeta.degree).innerVal.toString().toLong)
else if (indexEdge.op == GraphUtil.operations("incrementCount"))
- longToBytes(indexEdge.props(LabelMeta.count).innerVal.toString().toLong)
+ longToBytes(indexEdge.property(LabelMeta.count).innerVal.toString().toLong)
else propsToKeyValues(indexEdge.metas.toSeq)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index 211b159..f85159b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -60,9 +60,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byt
override def toValue: Array[Byte] =
if (indexEdge.degreeEdge)
- longToBytes(indexEdge.props(LabelMeta.degree).innerVal.toString().toLong)
+ longToBytes(indexEdge.property(LabelMeta.degree).innerVal.toString().toLong)
else if (indexEdge.op == GraphUtil.operations("incrementCount"))
- longToBytes(indexEdge.props(LabelMeta.count).innerVal.toString().toLong)
+ longToBytes(indexEdge.property(LabelMeta.count).innerVal.toString().toLong)
else propsToKeyValues(indexEdge.metas.toSeq)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
index fc84469..e71760d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -36,8 +36,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
val byte = (((statusCode << 4) | op).toByte)
Array.fill(1)(byte.toByte)
}
- def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
- propsToKeyValuesWithTs(snapshotEdge.props.toList))
+ def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), snapshotEdge.propsToKeyValuesWithTs)
override def toRowKey: Array[Byte] = {
val srcIdAndTgtIdBytes = SourceAndTargetVertexIdPair(snapshotEdge.srcVertex.innerId, snapshotEdge.tgtVertex.innerId).bytes
@@ -55,7 +54,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
case Some(pendingEdge) =>
val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
val versionBytes = Array.empty[Byte]
- val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
+ val propsBytes = pendingEdge.serializePropsWithTs()
val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
index 4ceb4a8..ee2645a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -41,8 +41,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
val byte = (((statusCode << 4) | op).toByte)
Array.fill(1)(byte.toByte)
}
- def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op),
- propsToKeyValuesWithTs(snapshotEdge.props.toList))
+ def valueBytes() = Bytes.add(statusCodeWithOp(snapshotEdge.statusCode, snapshotEdge.op), snapshotEdge.propsToKeyValuesWithTs)
override def toRowKey: Array[Byte] = {
@@ -62,7 +61,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[
case Some(pendingEdge) =>
val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.op)
val versionBytes = Array.empty[Byte]
- val propsBytes = propsToKeyValuesWithTs(pendingEdge.propsWithTs.toSeq)
+ val propsBytes = pendingEdge.serializePropsWithTs()
val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get)
Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes))
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
index 1c58086..d90cf8e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/InnerValLike.scala
@@ -261,3 +261,91 @@ case class InnerValLikeWithTs(innerVal: InnerValLike, ts: Long)
Bytes.add(innerVal.bytes, Bytes.toBytes(ts))
}
}
+
+trait CanInnerValLike[A] {
+ def toInnerVal(element: A)(implicit encodingVer: String): InnerValLike
+}
+object CanInnerValLike {
+ implicit val encodingVer = "v2"
+
+ def validate(element: Any, classType: String): Boolean = {
+ import InnerVal._
+ classType match {
+ case BLOB => element.isInstanceOf[Array[Byte]]
+ case STRING => element.isInstanceOf[String]
+ case DOUBLE => element.isInstanceOf[Double] || element.isInstanceOf[BigDecimal]
+ case FLOAT => element.isInstanceOf[Float] || element.isInstanceOf[BigDecimal]
+ case LONG => element.isInstanceOf[Long] || element.isInstanceOf[BigDecimal]
+ case INT => element.isInstanceOf[Int] || element.isInstanceOf[BigDecimal]
+ case SHORT => element.isInstanceOf[Short] || element.isInstanceOf[BigDecimal]
+ case BYTE => element.isInstanceOf[Byte] || element.isInstanceOf[BigDecimal]
+ case BOOLEAN => element.isInstanceOf[Boolean]
+ case _ => throw new RuntimeException(s"not supported data type: $element, $classType")
+ }
+ }
+ implicit val anyToInnerValLike = new CanInnerValLike[Any] {
+ override def toInnerVal(element: Any)(implicit encodingVer: String): InnerValLike = {
+ element match {
+ case i: InnerValLike => i
+ case s: String => stringToInnerValLike.toInnerVal(s)
+ case i: Int => intToInnerValLike.toInnerVal(i)
+ case l: Long => longToInnerValLike.toInnerVal(l)
+ case f: Float => floatToInnerValLike.toInnerVal(f)
+ case d: Double => doubleToInnerValLike.toInnerVal(d)
+ case b: BigDecimal => bigDecimalToInnerValLike.toInnerVal(b)
+ case b: Boolean => booleanToInnerValLike.toInnerVal(b)
+ case b: Array[Byte] => blobToInnerValLike.toInnerVal(b)
+ case _ => throw new RuntimeException(s"not supported element type: $element, ${element.getClass}")
+ }
+ }
+ }
+ implicit val innerValLikeToInnerValLike = new CanInnerValLike[InnerValLike] {
+ override def toInnerVal(element: InnerValLike)(implicit encodingVer: String): InnerValLike = element
+ }
+ implicit val objectToInnerValLike = new CanInnerValLike[Object] {
+ override def toInnerVal(element: Object)(implicit encodingVer: String): InnerValLike = {
+ anyToInnerValLike.toInnerVal(element.asInstanceOf[Any])
+ }
+ }
+
+ implicit val stringToInnerValLike = new CanInnerValLike[String] {
+ override def toInnerVal(element: String)(implicit encodingVer: String): InnerValLike = {
+ InnerVal.withStr(element, encodingVer)
+ }
+ }
+ implicit val longToInnerValLike = new CanInnerValLike[Long] {
+ override def toInnerVal(element: Long)(implicit encodingVer: String): InnerValLike = {
+ InnerVal.withLong(element, encodingVer)
+ }
+ }
+ implicit val intToInnerValLike = new CanInnerValLike[Int] {
+ override def toInnerVal(element: Int)(implicit encodingVer: String): InnerValLike = {
+ InnerVal.withInt(element, encodingVer)
+ }
+ }
+ implicit val floatToInnerValLike = new CanInnerValLike[Float] {
+ override def toInnerVal(element: Float)(implicit encodingVer: String): InnerValLike = {
+ InnerVal.withFloat(element, encodingVer)
+ }
+ }
+ implicit val doubleToInnerValLike = new CanInnerValLike[Double] {
+ override def toInnerVal(element: Double)(implicit encodingVer: String): InnerValLike = {
+ InnerVal.withDouble(element, encodingVer)
+ }
+ }
+ implicit val bigDecimalToInnerValLike = new CanInnerValLike[BigDecimal] {
+ override def toInnerVal(element: BigDecimal)(implicit encodingVer: String): InnerValLike = {
+ InnerVal.withNumber(element, encodingVer)
+ }
+ }
+ implicit val booleanToInnerValLike = new CanInnerValLike[Boolean] {
+ override def toInnerVal(element: Boolean)(implicit encodingVer: String): InnerValLike = {
+ InnerVal.withBoolean(element, encodingVer)
+ }
+ }
+ implicit val blobToInnerValLike = new CanInnerValLike[Array[Byte]] {
+ override def toInnerVal(element: Array[Byte])(implicit encodingVer: String): InnerValLike = {
+ InnerVal.withBlob(element, encodingVer)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
index 34f4d2c..f58b192 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
@@ -133,6 +133,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
queryOption = QueryOption(groupBy = GroupBy(props, 100))
)
+
test("query with defaultValue") {
// ref: edges from initTestData()
@@ -436,6 +437,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
// test parent With select fields
var result = TestUtil.getEdgesSync(queryParents(src))
+ println(s"$result")
var parents = (result \ "results").as[Seq[JsValue]]
var ret = parents.forall { edge =>
val parentEdges = (edge \ "parents").as[Seq[JsValue]]
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
index 042dce2..d70a08b 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
@@ -180,9 +180,9 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels {
val grandParentEdge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, parentPropsInner)
val parentEdge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, parentPropsInner,
- parentEdges = Seq(EdgeWithScore(grandParentEdge, 1.0, grandParentEdge.label)))
+ parentEdges = Seq(EdgeWithScore(grandParentEdge, 1.0, grandParentEdge.innerLabel)))
val edge = Edge(srcVertex, tgtVertex, label, labelWithDir.dir, 0.toByte, ts, propsInner,
- parentEdges = Seq(EdgeWithScore(parentEdge, 1.0, grandParentEdge.label)))
+ parentEdges = Seq(EdgeWithScore(parentEdge, 1.0, grandParentEdge.innerLabel)))
println(edge.toString)
println(parentEdge.toString)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
----------------------------------------------------------------------
diff --git a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
index 247cd07..cca3a59 100644
--- a/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
+++ b/s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/CounterEtlFunctions.scala
@@ -54,7 +54,7 @@ object CounterEtlFunctions extends Logging {
filterOps.contains(x.op)
}
} yield {
- val label = edge.label
+ val label = edge.innerLabel
val labelName = label.label
val tgtService = label.tgtColumn.service.serviceName
val tgtId = edge.tgtVertex.innerId.toString()
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/292174ec/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
index 835cc72..b1635fb 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala
@@ -54,7 +54,7 @@ object EdgeController extends Controller {
case v: Vertex =>
enqueue(kafkaTopic, graphElem, tsv)
case e: Edge =>
- e.label.extraOptions.get("walLog") match {
+ e.innerLabel.extraOptions.get("walLog") match {
case None =>
enqueue(kafkaTopic, e, tsv)
case Some(walLogOpt) =>
@@ -94,7 +94,7 @@ object EdgeController extends Controller {
results.get.zip(elements).map {
case (false, (e: Edge, tsv: String)) =>
val kafkaMessages = if(e.op == GraphUtil.operations("deleteAll")){
- toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.label), e.labelWithDir.dir, e.ts)
+ toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), e.labelWithDir.dir, e.ts)
} else{
Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, Some(tsv)))
}
[2/2] incubator-s2graph git commit: [S2GRAPH-129]: Restrict direct
access on Edge's properties from other classes.
Posted by st...@apache.org.
[S2GRAPH-129]: Restrict direct access on Edge's properties from other classes.
JIRA:
[S2GRAPH-123] https://issues.apache.org/jira/browse/S2GRAPH-129
Pull Request:
Closes #99
Authors
DO YUNG YOON: steamshon@apache.org
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b8956760
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b8956760
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b8956760
Branch: refs/heads/master
Commit: b895676063916cb76c4198c30598b60dd90cb668
Parents: 292174e
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed Nov 30 20:48:48 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed Nov 30 20:48:48 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8956760/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 1c5c01e..e76cb59 100644
--- a/CHANGES
+++ b/CHANGES
@@ -228,6 +228,8 @@ Release 0.1.0 - unreleased
S2GRAPH-114: `MethodNotSupportedException` class in s2counter_core project miss license header (Committed by DOYUNG YOON).
+ S2GRAPH-129: Restrict direct access on Edge's properties from other classes. (Committed by DOYUNG YOON).
+
TEST
S2GRAPH-21: Change PostProcessBenchmarkSpec not to store and fetch test data from storage. (Committed by DOYUNG YOON).