You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/11/19 02:28:46 UTC

[03/23] incubator-s2graph git commit: create S2EdgeLike.

create S2EdgeLike.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/7413aad4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/7413aad4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/7413aad4

Branch: refs/heads/master
Commit: 7413aad429b8f91ff223583ecb7e85688408c76d
Parents: f717023
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Nov 3 13:51:27 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Nov 3 13:51:27 2017 +0900

----------------------------------------------------------------------
 .../scala/org/apache/s2graph/core/S2Edge.scala  | 398 ++-----------------
 .../org/apache/s2graph/core/S2EdgeLike.scala    | 379 ++++++++++++++++++
 2 files changed, 406 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7413aad4/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 641db74..97abd26 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -301,167 +301,25 @@ case class IndexEdge(graph: S2Graph,
   }
 }
 
-case class S2Edge(innerGraph: S2Graph,
-                srcVertex: S2VertexLike,
-                var tgtVertex: S2VertexLike,
-                innerLabel: Label,
-                dir: Int,
-                var op: Byte = GraphUtil.defaultOpByte,
-                var version: Long = System.currentTimeMillis(),
-                propsWithTs: Props = S2Edge.EmptyProps,
-                parentEdges: Seq[EdgeWithScore] = Nil,
-                originalEdgeOpt: Option[S2Edge] = None,
-                pendingEdgeOpt: Option[S2Edge] = None,
-                statusCode: Byte = 0,
-                lockTs: Option[Long] = None,
-                var tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with Edge {
-
-  lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir)
-  lazy val schemaVer = innerLabel.schemaVersion
-  lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match {
-    case b: BigDecimal => b.longValue()
-    case l: Long => l
-    case i: Int => i.toLong
-    case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].")
-  }
-
-  lazy val operation = GraphUtil.fromOp(op)
-  lazy val tsInnerVal = tsInnerValOpt.get.value
-  lazy val srcId = srcVertex.innerIdVal
-  lazy val tgtId = tgtVertex.innerIdVal
-  lazy val labelName = innerLabel.label
-  lazy val direction = GraphUtil.fromDirection(dir)
-
-  def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
-
-  def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
-
-  def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = {
-    val emptyProp = S2Edge.EmptyProps
-
-    propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
-      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
-    })
-
-    others.forEach(new BiConsumer[String, S2Property[_]] {
-      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
-    })
-
-    emptyProp
-  }
-
-  def propertyValue(key: String): Option[InnerValLikeWithTs] = {
-    key match {
-      case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts))
-      case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts))
-      case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts))
-      case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts))
-      case _ =>
-        innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta))
-    }
-  }
-
-  def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= {
-    //    propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse()
-    if (propsWithTs.containsKey(labelMeta.name)) {
-      propsWithTs.get(labelMeta.name).innerValWithTs
-    } else {
-      innerLabel.metaPropsDefaultMapInner(labelMeta)
-    }
-  }
-
-  def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
-    val labelMetas = for {
-        key <- keys
-        labelMeta <- innerLabel.metaPropsInvMap.get(key)
-      } yield labelMeta
-
-    propertyValuesInner(labelMetas)
-  }
-
-  def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
-    if (labelMetas.isEmpty) {
-      innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
-        labelMeta -> propertyValueInner(labelMeta)
-      }
-    } else {
-      // This is important since timestamp is required for all edges.
-      (LabelMeta.timestamp +: labelMetas).map { labelMeta =>
-        labelMeta -> propertyValueInner(labelMeta)
-      }.toMap
-    }
-  }
-
-//  if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
+case class S2Edge(override val innerGraph: S2Graph,
+                  override val srcVertex: S2VertexLike,
+                  override var tgtVertex: S2VertexLike,
+                  override val innerLabel: Label,
+                  override val dir: Int,
+                  var op: Byte = GraphUtil.defaultOpByte,
+                  var version: Long = System.currentTimeMillis(),
+                  override val propsWithTs: Props = S2Edge.EmptyProps,
+                  override val parentEdges: Seq[EdgeWithScore] = Nil,
+                  override val originalEdgeOpt: Option[S2Edge] = None,
+                  override val pendingEdgeOpt: Option[S2Edge] = None,
+                  override val statusCode: Byte = 0,
+                  override val lockTs: Option[Long] = None,
+                  var tsInnerValOpt: Option[InnerValLike] = None) extends S2EdgeLike with GraphElement {
+
+  //  if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
   //  assert(propsWithTs.contains(LabelMeta.timeStampSeq))
 
-  lazy val properties = toProps()
-
-  def props = propsWithTs.asScala.mapValues(_.innerVal)
-
-
-  private def toProps(): Map[String, Any] = {
-    for {
-      (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner
-    } yield {
-      //      labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value
-      val value =
-        if (propsWithTs.containsKey(labelMeta.name)) {
-          propsWithTs.get(labelMeta.name).value
-        } else {
-          defaultVal.innerVal.value
-        }
-      labelMeta.name -> value
-    }
-  }
-
-  def relatedEdges = {
-    if (labelWithDir.isDirected) {
-      val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
-      if (skipReverse) List(this) else List(this, duplicateEdge)
-    } else {
-//      val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
-//      val base = copy(labelWithDir = outDir)
-      val base = copy(dir = GraphUtil.directions("out"))
-      List(base, base.reverseSrcTgtEdge)
-    }
-  }
-
-  //    def relatedEdges = List(this)
-
-  private def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) =
-      if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column
-
-  def srcForVertex = {
-    val belongLabelIds = Seq(labelWithDir.labelId)
-    if (labelWithDir.dir == GraphUtil.directions("in")) {
-      val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
-      innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
-    } else {
-      val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
-      innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
-    }
-  }
-
-  def tgtForVertex = {
-    val belongLabelIds = Seq(labelWithDir.labelId)
-    if (labelWithDir.dir == GraphUtil.directions("in")) {
-      val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
-      innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
-    } else {
-      val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
-      innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
-    }
-  }
-
-  def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
-
-//  def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
-  def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir))
-
-  def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex)
 
-  def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
 
   override def serviceName = innerLabel.serviceName
 
@@ -471,74 +329,14 @@ case class S2Edge(innerGraph: S2Graph,
 
   override def isAsync = innerLabel.isAsync
 
-  def isDegree = propsWithTs.containsKey(LabelMeta.degree.name)
+  //  def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
+  //    case Some(_) => props
+  //    case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
+  //  }
 
-//  def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
-//    case Some(_) => props
-//    case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
-//  }
-
-  def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
-
-  def edgesWithIndex = for (labelOrder <- labelOrders) yield {
-    IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
-  }
-
-  def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
-    IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
-  }
-
-  /** force direction as out on invertedEdge */
-  def toSnapshotEdge: SnapshotEdge = {
-    val (smaller, larger) = (srcForVertex, tgtForVertex)
-
-//    val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
-
-    propertyInner(LabelMeta.timestamp.name, ts, ts)
-    val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel,
-      GraphUtil.directions("out"), op, version, propsWithTs,
-      pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
-    ret
-  }
-
-  def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(),
-    "label" -> innerLabel.label, "service" -> innerLabel.serviceName)
-
-  def propsWithName =
-    for {
-      (_, v) <- propsWithTs.asScala
-      meta = v.labelMeta
-      jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
-    } yield meta.name -> jsValue
-
-
-  def updateTgtVertex(id: InnerValLike) = {
-    val newId = TargetVertexId(tgtVertex.id.column, id)
-    val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props)
-    S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
-  }
-
-  def rank(r: RankParam): Double =
-    if (r.keySeqAndWeights.size <= 0) 1.0f
-    else {
-      var sum: Double = 0
-
-      for ((labelMeta, w) <- r.keySeqAndWeights) {
-        if (propsWithTs.containsKey(labelMeta.name)) {
-          val innerValWithTs = propsWithTs.get(labelMeta.name)
-          val cost = try innerValWithTs.innerVal.toString.toDouble catch {
-            case e: Exception =>
-              logger.error("toInnerval failed in rank", e)
-              1.0
-          }
-          sum += w * cost
-        }
-      }
-      sum
-    }
 
   def toLogString: String = {
-//    val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
+    //    val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
     List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t")
   }
 
@@ -551,12 +349,12 @@ case class S2Edge(innerGraph: S2Graph,
     case _ => false
   }
 
-//  override def toString(): String = {
-//    Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction,
-//      "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
-//      "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs
-//    ).toString
-//  }
+  //  override def toString(): String = {
+  //    Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction,
+  //      "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
+  //      "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs
+  //    ).toString
+  //  }
 
   override def toString: String = {
     // E + L_BRACKET + edge.id() + R_BRACKET + L_BRACKET + edge.outVertex().id() + DASH + edge.label() + ARROW + edge.inVertex().id() + R_BRACKET;
@@ -564,148 +362,6 @@ case class S2Edge(innerGraph: S2Graph,
     //    s"e[${srcForVertex.id}-${innerLabel.label}->${tgtForVertex.id}]"
   }
 
-  def checkProperty(key: String): Boolean = propsWithTs.containsKey(key)
-
-  def copyEdge(srcVertex: S2VertexLike = srcVertex,
-               tgtVertex: S2VertexLike = tgtVertex,
-               innerLabel: Label = innerLabel,
-               dir: Int = dir,
-               op: Byte = op,
-               version: Long = version,
-               propsWithTs: State = S2Edge.propsToState(this.propsWithTs),
-               parentEdges: Seq[EdgeWithScore] = parentEdges,
-               originalEdgeOpt: Option[S2Edge] = originalEdgeOpt,
-               pendingEdgeOpt: Option[S2Edge] = pendingEdgeOpt,
-               statusCode: Byte = statusCode,
-               lockTs: Option[Long] = lockTs,
-               tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt,
-               ts: Long = ts): S2Edge = {
-    val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps,
-      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
-    S2Edge.fillPropsWithTs(edge, propsWithTs)
-    edge.propertyInner(LabelMeta.timestamp.name, ts, ts)
-    edge
-  }
-
-  def copyEdgeWithState(state: State, ts: Long): S2Edge = {
-    val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
-    S2Edge.fillPropsWithTs(newEdge, state)
-    newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts)
-    newEdge
-  }
-
-  def copyEdgeWithState(state: State): S2Edge = {
-    val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
-    S2Edge.fillPropsWithTs(newEdge, state)
-    newEdge
-  }
-
-  override def vertices(direction: Direction): util.Iterator[structure.Vertex] = {
-    val arr = new util.ArrayList[Vertex]()
-
-    direction match {
-      case Direction.OUT =>
-//        val newVertexId = this.direction match {
-//          case "out" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
-//          case "in" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
-//          case _ => throw new IllegalArgumentException("direction can only be out/in.")
-//        }
-        val newVertexId = edgeId.srcVertexId
-        innerGraph.getVertex(newVertexId).foreach(arr.add)
-      case Direction.IN =>
-//        val newVertexId = this.direction match {
-//          case "in" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
-//          case "out" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
-//          case _ => throw new IllegalArgumentException("direction can only be out/in.")
-//        }
-        val newVertexId = edgeId.tgtVertexId
-        innerGraph.getVertex(newVertexId).foreach(arr.add)
-      case _ =>
-        import scala.collection.JavaConversions._
-        vertices(Direction.OUT).foreach(arr.add)
-        vertices(Direction.IN).foreach(arr.add)
-    }
-    arr.iterator()
-  }
-
-  override def properties[V](keys: String*): util.Iterator[Property[V]] = {
-    val ls = new util.ArrayList[Property[V]]()
-    if (keys.isEmpty) {
-      propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
-        override def accept(key: String, property: S2Property[_]): Unit = {
-          if (!LabelMeta.reservedMetaNamesSet(key) && property.isPresent && key != T.id.name)
-            ls.add(property.asInstanceOf[S2Property[V]])
-        }
-      })
-    } else {
-      keys.foreach { key =>
-        val prop = property[V](key)
-        if (prop.isPresent) ls.add(prop)
-      }
-    }
-    ls.iterator()
-  }
-
-  override def property[V](key: String): Property[V] = {
-    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new java.lang.IllegalStateException(s"$key is not configured on Edge."))
-    if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]]
-    else {
-      Property.empty()
-//      val default = innerLabel.metaPropsDefaultMapInner(labelMeta)
-//      propertyInner(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]]
-    }
-  }
-
-  // just for tinkerpop: save to storage, do not use for internal
-  override def property[V](key: String, value: V): Property[V] = {
-    S2Property.assertValidProp(key, value)
-
-    val v = propertyInner(key, value, System.currentTimeMillis())
-    val newTs = props.get(LabelMeta.timestamp.name).map(_.toString.toLong + 1).getOrElse(System.currentTimeMillis())
-    val newEdge = this.copyEdge(ts = newTs)
-
-    Await.result(innerGraph.mutateEdges(Seq(newEdge), withWait = true), innerGraph.WaitTimeout)
-
-    v
-  }
-
-  def propertyInner[V](key: String, value: V, ts: Long): Property[V] = {
-    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
-    val newProp = new S2Property[V](this, labelMeta, key, value, ts)
-    propsWithTs.put(key, newProp)
-    newProp
-  }
-
-  override def remove(): Unit =  {
-    if (graph.features().edge().supportsRemoveEdges()) {
-      val requestTs = System.currentTimeMillis()
-      val edgeToDelete = this.copyEdge(op = GraphUtil.operations("delete"),
-        version = version + S2Edge.incrementVersion, propsWithTs = S2Edge.propsToState(updatePropsWithTs()), ts = requestTs)
-      // should we delete related edges also?
-      val future = innerGraph.mutateEdges(Seq(edgeToDelete), withWait = true)
-      val mutateSuccess = Await.result(future, innerGraph.WaitTimeout)
-      if (!mutateSuccess.forall(_.isSuccess)) throw new RuntimeException("edge remove failed.")
-    } else {
-      throw Edge.Exceptions.edgeRemovalNotSupported()
-    }
-  }
-
-  override def graph(): Graph = innerGraph
-
-  lazy val edgeId: EdgeId = {
-    // NOTE: xxxForVertex makes direction to be "out"
-    val timestamp = if (this.innerLabel.consistencyLevel == "strong") 0l else ts
-//    EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), "out", timestamp)
-    val (srcColumn, tgtColumn) = innerLabel.srcTgtColumn(dir)
-    if (direction == "out")
-      EdgeId(VertexId(srcColumn, srcVertex.id.innerId), VertexId(tgtColumn, tgtVertex.id.innerId), label(), "out", timestamp)
-    else
-      EdgeId(VertexId(tgtColumn, tgtVertex.id.innerId), VertexId(srcColumn, srcVertex.id.innerId), label(), "out", timestamp)
-  }
-
-  override def id(): AnyRef = edgeId
-
-  override def label(): String = innerLabel.label
 }
 object EdgeId {
   val EdgeIdDelimiter = ","

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7413aad4/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
new file mode 100644
index 0000000..04ed7ab
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala
@@ -0,0 +1,379 @@
+package org.apache.s2graph.core
+import java.util
+import java.util.function.BiConsumer
+
+import org.apache.s2graph.core.JSONParser.innerValToJsValue
+import org.apache.s2graph.core.S2Edge.{Props, State}
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta, ServiceColumn}
+import org.apache.s2graph.core.types._
+import org.apache.s2graph.core.utils.logger
+import org.apache.tinkerpop.gremlin.structure
+import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph, Property, T, Vertex}
+import play.api.libs.json.Json
+
+import scala.concurrent.Await
+import scala.collection.JavaConverters._
+
+trait S2EdgeLike extends Edge {
+  this: S2Edge =>
+
+  val innerGraph: S2Graph
+  val srcVertex: S2VertexLike
+  var tgtVertex: S2VertexLike
+  val innerLabel: Label
+  val dir: Int
+
+//  var op: Byte = GraphUtil.defaultOpByte
+//  var version: Long = System.currentTimeMillis()
+  val propsWithTs: Props = S2Edge.EmptyProps
+  val parentEdges: Seq[EdgeWithScore] = Nil
+  val originalEdgeOpt: Option[S2Edge] = None
+  val pendingEdgeOpt: Option[S2Edge] = None
+  val statusCode: Byte = 0
+  val lockTs: Option[Long] = None
+//  var tsInnerValOpt: Option[InnerValLike] = None
+
+  lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir)
+  lazy val schemaVer = innerLabel.schemaVersion
+  lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match {
+    case b: BigDecimal => b.longValue()
+    case l: Long => l
+    case i: Int => i.toLong
+    case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].")
+  }
+  lazy val operation = GraphUtil.fromOp(op)
+  lazy val tsInnerVal = tsInnerValOpt.get.value
+  lazy val srcId = srcVertex.innerIdVal
+  lazy val tgtId = tgtVertex.innerIdVal
+  lazy val labelName = innerLabel.label
+  lazy val direction = GraphUtil.fromDirection(dir)
+
+  def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
+
+  def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
+
+  def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = {
+    val emptyProp = S2Edge.EmptyProps
+
+    propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
+      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
+    })
+
+    others.forEach(new BiConsumer[String, S2Property[_]] {
+      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
+    })
+
+    emptyProp
+  }
+
+  def propertyValue(key: String): Option[InnerValLikeWithTs] = {
+    key match {
+      case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts))
+      case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts))
+      case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts))
+      case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts))
+      case _ =>
+        innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta))
+    }
+  }
+
+  def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs = {
+    //    propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse()
+    if (propsWithTs.containsKey(labelMeta.name)) {
+      propsWithTs.get(labelMeta.name).innerValWithTs
+    } else {
+      innerLabel.metaPropsDefaultMapInner(labelMeta)
+    }
+  }
+
+  def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
+    val labelMetas = for {
+      key <- keys
+      labelMeta <- innerLabel.metaPropsInvMap.get(key)
+    } yield labelMeta
+
+    propertyValuesInner(labelMetas)
+  }
+
+  def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
+    if (labelMetas.isEmpty) {
+      innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
+        labelMeta -> propertyValueInner(labelMeta)
+      }
+    } else {
+      // This is important since timestamp is required for all edges.
+      (LabelMeta.timestamp +: labelMetas).map { labelMeta =>
+        labelMeta -> propertyValueInner(labelMeta)
+      }.toMap
+    }
+  }
+
+  lazy val properties = toProps()
+
+  def props = propsWithTs.asScala.mapValues(_.innerVal)
+
+  def relatedEdges = {
+    if (labelWithDir.isDirected) {
+      val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
+      if (skipReverse) List(this) else List(this, duplicateEdge)
+    } else {
+      //      val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
+      //      val base = copy(labelWithDir = outDir)
+      val base = copy(dir = GraphUtil.directions("out"))
+      List(base, base.reverseSrcTgtEdge)
+    }
+  }
+
+  def srcForVertex = {
+    val belongLabelIds = Seq(labelWithDir.labelId)
+    if (labelWithDir.dir == GraphUtil.directions("in")) {
+      val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
+      innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+    } else {
+      val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
+      innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+    }
+  }
+
+  def tgtForVertex = {
+    val belongLabelIds = Seq(labelWithDir.labelId)
+    if (labelWithDir.dir == GraphUtil.directions("in")) {
+      val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn)
+      innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+    } else {
+      val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn)
+      innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+    }
+  }
+
+  def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
+
+  //  def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
+  def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir))
+
+  def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex)
+
+  def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
+
+  def isDegree = propsWithTs.containsKey(LabelMeta.degree.name)
+
+  def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
+
+  def edgesWithIndex = for (labelOrder <- labelOrders) yield {
+    IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+  }
+
+  def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
+    IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
+  }
+
+  /** force direction as out on invertedEdge */
+  def toSnapshotEdge: SnapshotEdge = {
+    val (smaller, larger) = (srcForVertex, tgtForVertex)
+
+    //    val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
+
+    propertyInner(LabelMeta.timestamp.name, ts, ts)
+    val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel,
+      GraphUtil.directions("out"), op, version, propsWithTs,
+      pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
+    ret
+  }
+
+  def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(),
+    "label" -> innerLabel.label, "service" -> innerLabel.serviceName)
+
+  def propsWithName =
+    for {
+      (_, v) <- propsWithTs.asScala
+      meta = v.labelMeta
+      jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
+    } yield meta.name -> jsValue
+
+  def updateTgtVertex(id: InnerValLike) = {
+    val newId = TargetVertexId(tgtVertex.id.column, id)
+    val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props)
+    S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
+  }
+
+  def rank(r: RankParam): Double =
+    if (r.keySeqAndWeights.size <= 0) 1.0f
+    else {
+      var sum: Double = 0
+
+      for ((labelMeta, w) <- r.keySeqAndWeights) {
+        if (propsWithTs.containsKey(labelMeta.name)) {
+          val innerValWithTs = propsWithTs.get(labelMeta.name)
+          val cost = try innerValWithTs.innerVal.toString.toDouble catch {
+            case e: Exception =>
+              logger.error("toInnerval failed in rank", e)
+              1.0
+          }
+          sum += w * cost
+        }
+      }
+      sum
+    }
+
+  def checkProperty(key: String): Boolean = propsWithTs.containsKey(key)
+
+  def copyEdge(srcVertex: S2VertexLike = srcVertex,
+               tgtVertex: S2VertexLike = tgtVertex,
+               innerLabel: Label = innerLabel,
+               dir: Int = dir,
+               op: Byte = op,
+               version: Long = version,
+               propsWithTs: State = S2Edge.propsToState(this.propsWithTs),
+               parentEdges: Seq[EdgeWithScore] = parentEdges,
+               originalEdgeOpt: Option[S2Edge] = originalEdgeOpt,
+               pendingEdgeOpt: Option[S2Edge] = pendingEdgeOpt,
+               statusCode: Byte = statusCode,
+               lockTs: Option[Long] = lockTs,
+               tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt,
+               ts: Long = ts): S2Edge = {
+    val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps,
+      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
+    S2Edge.fillPropsWithTs(edge, propsWithTs)
+    edge.propertyInner(LabelMeta.timestamp.name, ts, ts)
+    edge
+  }
+
+  def copyEdgeWithState(state: State, ts: Long): S2Edge = {
+    val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
+    S2Edge.fillPropsWithTs(newEdge, state)
+    newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts)
+    newEdge
+  }
+
+  def copyEdgeWithState(state: State): S2Edge = {
+    val newEdge = copy(propsWithTs = S2Edge.EmptyProps)
+    S2Edge.fillPropsWithTs(newEdge, state)
+    newEdge
+  }
+
+  def vertices(direction: Direction): util.Iterator[structure.Vertex] = {
+    val arr = new util.ArrayList[Vertex]()
+
+    direction match {
+      case Direction.OUT =>
+        //        val newVertexId = this.direction match {
+        //          case "out" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
+        //          case "in" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
+        //          case _ => throw new IllegalArgumentException("direction can only be out/in.")
+        //        }
+        val newVertexId = edgeId.srcVertexId
+        innerGraph.getVertex(newVertexId).foreach(arr.add)
+      case Direction.IN =>
+        //        val newVertexId = this.direction match {
+        //          case "in" => VertexId(innerLabel.srcColumn, srcVertex.innerId)
+        //          case "out" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId)
+        //          case _ => throw new IllegalArgumentException("direction can only be out/in.")
+        //        }
+        val newVertexId = edgeId.tgtVertexId
+        innerGraph.getVertex(newVertexId).foreach(arr.add)
+      case _ =>
+        import scala.collection.JavaConversions._
+        vertices(Direction.OUT).foreach(arr.add)
+        vertices(Direction.IN).foreach(arr.add)
+    }
+    arr.iterator()
+  }
+
+  def properties[V](keys: String*): util.Iterator[Property[V]] = {
+    val ls = new util.ArrayList[Property[V]]()
+    if (keys.isEmpty) {
+      propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
+        override def accept(key: String, property: S2Property[_]): Unit = {
+          if (!LabelMeta.reservedMetaNamesSet(key) && property.isPresent && key != T.id.name)
+            ls.add(property.asInstanceOf[S2Property[V]])
+        }
+      })
+    } else {
+      keys.foreach { key =>
+        val prop = property[V](key)
+        if (prop.isPresent) ls.add(prop)
+      }
+    }
+    ls.iterator()
+  }
+
+  def property[V](key: String): Property[V] = {
+    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new java.lang.IllegalStateException(s"$key is not configured on Edge."))
+    if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]]
+    else {
+      Property.empty()
+      //      val default = innerLabel.metaPropsDefaultMapInner(labelMeta)
+      //      propertyInner(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]]
+    }
+  }
+
+  // just for tinkerpop: save to storage, do not use for internal
+  def property[V](key: String, value: V): Property[V] = {
+    S2Property.assertValidProp(key, value)
+
+    val v = propertyInner(key, value, System.currentTimeMillis())
+    val newTs = props.get(LabelMeta.timestamp.name).map(_.toString.toLong + 1).getOrElse(System.currentTimeMillis())
+    val newEdge = this.copyEdge(ts = newTs)
+
+    Await.result(innerGraph.mutateEdges(Seq(newEdge), withWait = true), innerGraph.WaitTimeout)
+
+    v
+  }
+
+  def propertyInner[V](key: String, value: V, ts: Long): Property[V] = {
+    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
+    val newProp = new S2Property[V](this, labelMeta, key, value, ts)
+    propsWithTs.put(key, newProp)
+    newProp
+  }
+
+  def remove(): Unit = {
+    if (graph.features().edge().supportsRemoveEdges()) {
+      val requestTs = System.currentTimeMillis()
+      val edgeToDelete = this.copyEdge(op = GraphUtil.operations("delete"),
+        version = version + S2Edge.incrementVersion, propsWithTs = S2Edge.propsToState(updatePropsWithTs()), ts = requestTs)
+      // should we delete related edges also?
+      val future = innerGraph.mutateEdges(Seq(edgeToDelete), withWait = true)
+      val mutateSuccess = Await.result(future, innerGraph.WaitTimeout)
+      if (!mutateSuccess.forall(_.isSuccess)) throw new RuntimeException("edge remove failed.")
+    } else {
+      throw Edge.Exceptions.edgeRemovalNotSupported()
+    }
+  }
+
+  def graph(): Graph = innerGraph
+
+  lazy val edgeId: EdgeId = {
+    // NOTE: xxxForVertex makes direction to be "out"
+    val timestamp = if (this.innerLabel.consistencyLevel == "strong") 0l else ts
+    //    EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), "out", timestamp)
+    val (srcColumn, tgtColumn) = innerLabel.srcTgtColumn(dir)
+    if (direction == "out")
+      EdgeId(VertexId(srcColumn, srcVertex.id.innerId), VertexId(tgtColumn, tgtVertex.id.innerId), label(), "out", timestamp)
+    else
+      EdgeId(VertexId(tgtColumn, tgtVertex.id.innerId), VertexId(srcColumn, srcVertex.id.innerId), label(), "out", timestamp)
+  }
+
+  def id(): AnyRef = edgeId
+
+  def label(): String = innerLabel.label
+
+  private def toProps(): Map[String, Any] = {
+    for {
+      (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner
+    } yield {
+      //      labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value
+      val value =
+        if (propsWithTs.containsKey(labelMeta.name)) {
+          propsWithTs.get(labelMeta.name).value
+        } else {
+          defaultVal.innerVal.value
+        }
+      labelMeta.name -> value
+    }
+  }
+
+  private def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) =
+    if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column
+
+}