You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/12/01 08:30:15 UTC

[05/10] incubator-s2graph git commit: [S2GRAPH-131]: Add actual implementation on interfaces from TinkerPop3 structure package. - Change core.Edge/Vertex/Graph to core.S2Edge/S2Vertex/S2Graph. - Implement base interfaces for tinkerpop3 structure packag

[S2GRAPH-131]: Add actual implementation on interfaces from TinkerPop3 structure package.
 - Change core.Edge/Vertex/Graph to core.S2Edge/S2Vertex/S2Graph.
 - Implement base interfaces for tinkerpop3 structure package.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e8c0bf20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e8c0bf20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e8c0bf20

Branch: refs/heads/master
Commit: e8c0bf20b517a2e9a752df63c156fc265b7365b6
Parents: 189bc41
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Nov 28 12:09:52 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Nov 28 12:09:52 2016 +0900

----------------------------------------------------------------------
 .../loader/subscriber/GraphSubscriber.scala     |   14 +-
 .../loader/subscriber/TransferToHFile.scala     |    6 +-
 .../s2graph/loader/subscriber/WalLogStat.scala  |    2 +-
 .../loader/subscriber/WalLogToHDFS.scala        |    2 +-
 .../scala/org/apache/s2graph/core/Edge.scala    |  956 ------------
 .../scala/org/apache/s2graph/core/Graph.scala   | 1238 ----------------
 .../org/apache/s2graph/core/Management.scala    |    2 +-
 .../org/apache/s2graph/core/PostProcess.scala   |    8 +-
 .../org/apache/s2graph/core/QueryParam.scala    |   23 +-
 .../org/apache/s2graph/core/QueryResult.scala   |   10 +-
 .../scala/org/apache/s2graph/core/S2Edge.scala  |  969 ++++++++++++
 .../scala/org/apache/s2graph/core/S2Graph.scala | 1397 ++++++++++++++++++
 .../org/apache/s2graph/core/S2Property.scala    |    7 +-
 .../org/apache/s2graph/core/S2Vertex.scala      |  221 +++
 .../apache/s2graph/core/S2VertexProperty.scala  |   31 +-
 .../scala/org/apache/s2graph/core/Vertex.scala  |  132 --
 .../apache/s2graph/core/mysqls/ColumnMeta.scala |   12 +-
 .../apache/s2graph/core/mysqls/LabelMeta.scala  |    1 -
 .../s2graph/core/mysqls/ServiceColumn.scala     |   15 +-
 .../s2graph/core/parsers/WhereParser.scala      |   34 +-
 .../s2graph/core/rest/RequestParser.scala       |   10 +-
 .../apache/s2graph/core/rest/RestHandler.scala  |    4 +-
 .../apache/s2graph/core/storage/Storage.scala   |   96 +-
 .../core/storage/hbase/AsynchbaseStorage.scala  |   10 +-
 .../tall/IndexEdgeDeserializable.scala          |   10 +-
 .../wide/IndexEdgeDeserializable.scala          |    8 +-
 .../tall/SnapshotEdgeDeserializable.scala       |    4 +-
 .../wide/SnapshotEdgeDeserializable.scala       |    4 +-
 .../serde/vertex/VertexDeserializable.scala     |   23 +-
 .../serde/vertex/VertexSerializable.scala       |   13 +-
 .../s2graph/core/types/InnerValLike.scala       |   17 +
 .../apache/s2graph/core/types/VertexId.scala    |    2 +-
 .../org/apache/s2graph/core/EdgeTest.scala      |  210 ---
 .../core/Integrate/IntegrateCommon.scala        |    6 +-
 .../Integrate/tinkerpop/S2S2GraphTest.scala     |  130 ++
 .../org/apache/s2graph/core/S2EdgeTest.scala    |  210 +++
 .../s2graph/core/TestCommonWithModels.scala     |    4 +-
 .../core/benchmark/BenchmarkCommon.scala        |    2 +-
 .../s2graph/core/parsers/WhereParserTest.scala  |    2 +-
 .../s2graph/counter/helper/CounterAdmin.scala   |    4 +-
 .../counter/core/RankingCounterSpec.scala       |    4 +-
 .../loader/core/CounterEtlFunctions.scala       |   14 +-
 .../loader/core/CounterEtlFunctionsSpec.scala   |    4 +-
 .../org/apache/s2graph/rest/netty/Server.scala  |    4 +-
 .../apache/s2graph/rest/play/Bootstrap.scala    |    6 +-
 .../s2graph/rest/play/actors/QueueActor.scala   |    8 +-
 .../rest/play/controllers/EdgeController.scala  |   12 +-
 .../play/controllers/VertexController.scala     |    4 +-
 48 files changed, 3179 insertions(+), 2726 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
index b25bc84..2352cdf 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala
@@ -48,7 +48,7 @@ object GraphConfig {
       if (kafkaBrokerList.isEmpty) Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> database, "cache.ttl.seconds" -> cacheTTL)
       else Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> database, "kafka.metadata.broker.list" -> kafkaBrokers, "cache.ttl.seconds" -> cacheTTL)
 
-    ConfigFactory.parseMap(newConf).withFallback(Graph.DefaultConfig)
+    ConfigFactory.parseMap(newConf).withFallback(S2Graph.DefaultConfig)
   }
 }
 
@@ -64,7 +64,7 @@ object GraphSubscriberHelper extends WithKafka {
   private val sleepPeriod = 10000
   private val maxTryNum = 10
 
-  var g: Graph = null
+  var g: S2Graph = null
   var management: Management = null
   val conns = new scala.collection.mutable.HashMap[String, Connection]()
 
@@ -80,7 +80,7 @@ object GraphSubscriberHelper extends WithKafka {
 
     if (g == null) {
       val ec = ExecutionContext.Implicits.global
-      g = new Graph(config)(ec)
+      g = new S2Graph(config)(ec)
       management = new Management(g)
     }
   }
@@ -107,12 +107,12 @@ object GraphSubscriberHelper extends WithKafka {
     (for (msg <- msgs) yield {
       statFunc("total", 1)
       g.toGraphElement(msg, labelMapping) match {
-        case Some(e) if e.isInstanceOf[Edge] =>
+        case Some(e) if e.isInstanceOf[S2Edge] =>
           statFunc("EdgeParseOk", 1)
-          e.asInstanceOf[Edge]
-        case Some(v) if v.isInstanceOf[Vertex] =>
+          e.asInstanceOf[S2Edge]
+        case Some(v) if v.isInstanceOf[S2Vertex] =>
           statFunc("VertexParseOk", 1)
-          v.asInstanceOf[Vertex]
+          v.asInstanceOf[S2Vertex]
         case Some(x) =>
           throw new RuntimeException(s">>>>> GraphSubscriber.toGraphElements: parsing failed. ${x.serviceName}")
         case None =>

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index d1da319..dce085e 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -57,7 +57,7 @@ object TransferToHFile extends SparkApp {
   /** build key values */
   case class DegreeKey(vertexIdStr: String, labelName: String, direction: String)
 
-  private def insertBulkForLoaderAsync(edge: Edge, createRelEdges: Boolean = true): List[PutRequest] = {
+  private def insertBulkForLoaderAsync(edge: S2Edge, createRelEdges: Boolean = true): List[PutRequest] = {
     val relEdges = if (createRelEdges) edge.relatedEdges else List(edge)
     buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e =>
       e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(indexEdge) }
@@ -125,8 +125,8 @@ object TransferToHFile extends SparkApp {
   def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = {
     val kvs = for {
       s <- strs
-      element <- GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[Edge]
-      edge = element.asInstanceOf[Edge]
+      element <- GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[S2Edge]
+      edge = element.asInstanceOf[S2Edge]
       putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate)
     } yield {
         val p = putRequest

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
index 5b68754..40f936d 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala
@@ -21,7 +21,7 @@ package org.apache.s2graph.loader.subscriber
 
 import kafka.producer.KeyedMessage
 import kafka.serializer.StringDecoder
-import org.apache.s2graph.core.Graph
+import org.apache.s2graph.core.S2Graph$
 import org.apache.s2graph.spark.spark.{WithKafka, SparkApp}
 import org.apache.spark.streaming.Durations._
 import org.apache.spark.streaming.kafka.HasOffsetRanges

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
index 0f69dc7..23c3cda 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala
@@ -22,7 +22,7 @@ package org.apache.s2graph.loader.subscriber
 import java.text.SimpleDateFormat
 import java.util.Date
 import kafka.serializer.StringDecoder
-import org.apache.s2graph.core.Graph
+import org.apache.s2graph.core.S2Graph$
 import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam}
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.streaming.Durations._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
deleted file mode 100644
index f10b4db..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ /dev/null
@@ -1,956 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import java.util
-import java.util.function.BiConsumer
-
-import org.apache.s2graph.core.Edge.{Props, State}
-import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
-import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
-import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.utils.logger
-import org.apache.tinkerpop.gremlin.structure
-import org.apache.tinkerpop.gremlin.structure.{Direction, Edge => TpEdge, Graph => TpGraph, Property}
-import play.api.libs.json.{JsNumber, JsObject, Json}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{Map => MutableMap}
-import scala.util.hashing.MurmurHash3
-
-case class SnapshotEdge(graph: Graph,
-                        srcVertex: Vertex,
-                        tgtVertex: Vertex,
-                        label: Label,
-                        dir: Int,
-                        op: Byte,
-                        version: Long,
-                        private val propsWithTs: Props,
-                        pendingEdgeOpt: Option[Edge],
-                        statusCode: Byte = 0,
-                        lockTs: Option[Long],
-                        tsInnerValOpt: Option[InnerValLike] = None) {
-  lazy val direction = GraphUtil.fromDirection(dir)
-  lazy val operation = GraphUtil.fromOp(op)
-  lazy val edge = toEdge
-  lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
-//  if (!propsWithTs.contains(LabelMeta.timestamp.name)) throw new Exception("Timestamp is required.")
-
-//  val label = Label.findById(labelWithDir.labelId)
-  lazy val schemaVer = label.schemaVersion
-  lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString().toLong
-
-  def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
-
-  def allPropsDeleted = Edge.allPropsDeleted(propsWithTs)
-
-  def toEdge: Edge = {
-    Edge(graph, srcVertex, tgtVertex, label, dir, op,
-      version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt,
-      statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
-  }
-
-  def propsWithName = (for {
-    (_, v) <- propsWithTs.asScala
-    meta = v.labelMeta
-    jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
-  } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version))
-
-  // only for debug
-  def toLogString() = {
-    List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t")
-  }
-
-  def property[V](key: String, value: V, ts: Long): S2Property[V] = {
-    val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge."))
-    val newProps = new S2Property(edge, labelMeta, key, value, ts)
-    propsWithTs.put(key, newProps)
-    newProps
-  }
-  override def hashCode(): Int = {
-    MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case e: SnapshotEdge =>
-      srcVertex.innerId == e.srcVertex.innerId &&
-        tgtVertex.innerId == e.tgtVertex.innerId &&
-        labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
-        pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode
-    case _ => false
-  }
-
-  override def toString(): String = {
-    Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> direction,
-      "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
-      "statusCode" -> statusCode, "lockTs" -> lockTs).toString
-  }
-}
-
-case class IndexEdge(graph: Graph,
-                     srcVertex: Vertex,
-                     tgtVertex: Vertex,
-                     label: Label,
-                     dir: Int,
-                     op: Byte,
-                     version: Long,
-                     labelIndexSeq: Byte,
-                     private val propsWithTs: Props,
-                     tsInnerValOpt: Option[InnerValLike] = None)  {
-//  if (!props.contains(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.")
-  //  assert(props.contains(LabelMeta.timeStampSeq))
-  lazy val direction = GraphUtil.fromDirection(dir)
-  lazy val operation = GraphUtil.fromOp(op)
-  lazy val edge = toEdge
-  lazy val labelWithDir = LabelWithDirection(label.id.get, dir)
-
-  lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in")
-  lazy val isOutEdge = !isInEdge
-
-  lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString.toLong
-  lazy val degreeEdge = propsWithTs.containsKey(LabelMeta.degree.name)
-
-  lazy val schemaVer = label.schemaVersion
-  lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get
-  lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta =>
-    val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer)
-    meta.seq -> innerVal
-  }.toMap
-
-  lazy val labelIndexMetaSeqs = labelIndex.sortKeyTypes
-
-  /** TODO: make sure call of this class fill props as this assumes */
-  lazy val orders = for (meta <- labelIndexMetaSeqs) yield {
-    propsWithTs.get(meta.name) match {
-      case null =>
-
-        /**
-          * TODO: agly hack
-          * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once
-          */
-        val v = meta match {
-          case LabelMeta.timestamp=> InnerVal.withLong(version, schemaVer)
-          case LabelMeta.to => toEdge.tgtVertex.innerId
-          case LabelMeta.from => toEdge.srcVertex.innerId
-          // for now, it does not make sense to build index on srcVertex.innerId since all edges have same data.
-          //            throw new RuntimeException("_from on indexProps is not supported")
-          case _ => toInnerVal(meta.defaultValue, meta.dataType, schemaVer)
-        }
-
-        meta -> v
-      case v => meta -> v.innerVal
-    }
-  }
-
-  lazy val ordersKeyMap = orders.map { case (meta, _) => meta.name }.toSet
-  lazy val metas = for ((meta, v) <- propsWithTs.asScala if !ordersKeyMap.contains(meta)) yield v.labelMeta -> v.innerVal
-
-//  lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) }
-
-  //TODO:
-  //  lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList
-
-  lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length
-
-  def propsWithName = for {
-    (_, v) <- propsWithTs.asScala
-    meta = v.labelMeta
-    jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
-  } yield meta.name -> jsValue
-
-
-  def toEdge: Edge = Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
-
-  // only for debug
-  def toLogString() = {
-    List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t")
-  }
-
-  def property(key: String): Option[InnerValLikeWithTs] = {
-    label.metaPropsInvMap.get(key).map(labelMeta => property(labelMeta))
-  }
-
-  def property(labelMeta: LabelMeta): InnerValLikeWithTs = {
-//    propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(label.metaPropsDefaultMapInner(labelMeta))
-    if (propsWithTs.containsKey(labelMeta.name)) {
-      propsWithTs.get(labelMeta.name).innerValWithTs
-    } else {
-      label.metaPropsDefaultMapInner(labelMeta)
-    }
-  }
-
-  def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = {
-    if (others.isEmpty) propsWithTs
-    else {
-      val iter = others.entrySet().iterator()
-      while (iter.hasNext) {
-        val e = iter.next()
-        propsWithTs.put(e.getKey, e.getValue)
-      }
-      propsWithTs
-    }
-  }
-
-  def property[V](key: String, value: V, ts: Long): S2Property[V] = {
-    val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge."))
-    val newProps = new S2Property(edge, labelMeta, key, value, ts)
-    propsWithTs.put(key, newProps)
-    newProps
-  }
-  override def hashCode(): Int = {
-    MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId + "," + labelIndexSeq)
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case e: IndexEdge =>
-      srcVertex.innerId == e.srcVertex.innerId &&
-        tgtVertex.innerId == e.tgtVertex.innerId &&
-        labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
-        labelIndexSeq == e.labelIndexSeq
-    case _ => false
-  }
-
-  override def toString(): String = {
-    Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> dir,
-      "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString
-    ).toString
-  }
-}
-
-case class Edge(innerGraph: Graph,
-                srcVertex: Vertex,
-                var tgtVertex: Vertex,
-                innerLabel: Label,
-                dir: Int,
-                var op: Byte = GraphUtil.defaultOpByte,
-                var version: Long = System.currentTimeMillis(),
-                propsWithTs: Props = Edge.EmptyProps,
-                parentEdges: Seq[EdgeWithScore] = Nil,
-                originalEdgeOpt: Option[Edge] = None,
-                pendingEdgeOpt: Option[Edge] = None,
-                statusCode: Byte = 0,
-                lockTs: Option[Long] = None,
-                var tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with TpEdge {
-
-  lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir)
-  lazy val schemaVer = innerLabel.schemaVersion
-  lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match {
-    case b: BigDecimal => b.longValue()
-    case l: Long => l
-    case i: Int => i.toLong
-    case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].")
-  }
-
-  lazy val operation = GraphUtil.fromOp(op)
-  lazy val tsInnerVal = tsInnerValOpt.get.value
-  lazy val srcId = srcVertex.innerIdVal
-  lazy val tgtId = tgtVertex.innerIdVal
-  lazy val labelName = innerLabel.label
-  lazy val direction = GraphUtil.fromDirection(dir)
-  
-  def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs)
-
-  def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq)
-
-  def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = {
-    val emptyProp = Edge.EmptyProps
-
-    propsWithTs.forEach(new BiConsumer[String, S2Property[_]] {
-      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
-    })
-
-    others.forEach(new BiConsumer[String, S2Property[_]] {
-      override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value)
-    })
-
-    emptyProp
-  }
-
-  def propertyValue(key: String): Option[InnerValLikeWithTs] = {
-    key match {
-      case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts))
-      case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts))
-      case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts))
-      case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts))
-      case _ =>
-        innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta))
-    }
-  }
-
-  def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= {
-    //    propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse()
-    if (propsWithTs.containsKey(labelMeta.name)) {
-      propsWithTs.get(labelMeta.name).innerValWithTs
-    } else {
-      innerLabel.metaPropsDefaultMapInner(labelMeta)
-    }
-  }
-
-  def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
-    val labelMetas = for {
-        key <- keys
-        labelMeta <- innerLabel.metaPropsInvMap.get(key)
-      } yield labelMeta
-
-    propertyValuesInner(labelMetas)
-  }
-
-  def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = {
-    if (labelMetas.isEmpty) {
-      innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) =>
-        labelMeta -> propertyValueInner(labelMeta)
-      }
-    } else {
-      // This is important since timestamp is required for all edges.
-      (LabelMeta.timestamp +: labelMetas).map { labelMeta =>
-        labelMeta -> propertyValueInner(labelMeta)
-      }.toMap
-    }
-  }
-
-//  if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.")
-  //  assert(propsWithTs.contains(LabelMeta.timeStampSeq))
-
-  lazy val properties = toProps()
-
-  def props = propsWithTs.asScala.mapValues(_.innerVal)
-
-
-  private def toProps(): Map[String, Any] = {
-    for {
-      (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner
-    } yield {
-      //      labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value
-      val value =
-        if (propsWithTs.containsKey(labelMeta.name)) {
-          propsWithTs.get(labelMeta.name).value
-        } else {
-          defaultVal.innerVal.value
-        }
-      labelMeta.name -> value
-    }
-  }
-
-  def relatedEdges = {
-    if (labelWithDir.isDirected) {
-      val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false)
-      if (skipReverse) List(this) else List(this, duplicateEdge)
-    } else {
-//      val outDir = labelWithDir.copy(dir = GraphUtil.directions("out"))
-//      val base = copy(labelWithDir = outDir)
-      val base = copy(dir = GraphUtil.directions("out"))
-      List(base, base.reverseSrcTgtEdge)
-    }
-  }
-
-  //    def relatedEdges = List(this)
-
-  def srcForVertex = {
-    val belongLabelIds = Seq(labelWithDir.labelId)
-    if (labelWithDir.dir == GraphUtil.directions("in")) {
-      innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
-    } else {
-      innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
-    }
-  }
-
-  def tgtForVertex = {
-    val belongLabelIds = Seq(labelWithDir.labelId)
-    if (labelWithDir.dir == GraphUtil.directions("in")) {
-      innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
-    } else {
-      innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
-    }
-  }
-
-  def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge
-
-//  def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled)
-  def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir))
-
-  def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex)
-
-  def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId)
-
-  override def serviceName = innerLabel.serviceName
-
-  override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|")
-
-  override def queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|")
-
-  override def isAsync = innerLabel.isAsync
-
-  def isDegree = propsWithTs.containsKey(LabelMeta.degree.name)
-
-//  def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match {
-//    case Some(_) => props
-//    case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer))
-//  }
-
-  def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava
-
-  def edgesWithIndex = for (labelOrder <- labelOrders) yield {
-    IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt)
-  }
-
-  def edgesWithIndexValid = for (labelOrder <- labelOrders) yield {
-    IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt)
-  }
-
-  /** force direction as out on invertedEdge */
-  def toSnapshotEdge: SnapshotEdge = {
-    val (smaller, larger) = (srcForVertex, tgtForVertex)
-
-//    val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out"))
-
-    property(LabelMeta.timestamp.name, ts, ts)
-    val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel,
-      GraphUtil.directions("out"), op, version, propsWithTs,
-      pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt)
-    ret
-  }
-
-  def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(),
-    "label" -> innerLabel.label, "service" -> innerLabel.serviceName)
-
-  def propsWithName =
-    for {
-      (_, v) <- propsWithTs.asScala
-      meta = v.labelMeta
-      jsValue <- innerValToJsValue(v.innerVal, meta.dataType)
-    } yield meta.name -> jsValue
-
-
-  def updateTgtVertex(id: InnerValLike) = {
-    val newId = TargetVertexId(tgtVertex.id.column, id)
-    val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props)
-    Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
-  }
-
-  def rank(r: RankParam): Double =
-    if (r.keySeqAndWeights.size <= 0) 1.0f
-    else {
-      var sum: Double = 0
-
-      for ((labelMeta, w) <- r.keySeqAndWeights) {
-        if (propsWithTs.containsKey(labelMeta.name)) {
-          val innerValWithTs = propsWithTs.get(labelMeta.name)
-          val cost = try innerValWithTs.innerVal.toString.toDouble catch {
-            case e: Exception =>
-              logger.error("toInnerval failed in rank", e)
-              1.0
-          }
-          sum += w * cost
-        }
-      }
-      sum
-    }
-
-  def toLogString: String = {
-    val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj())
-    List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, allPropsWithName).mkString("\t")
-  }
-
-  override def hashCode(): Int = {
-    MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId)
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case e: Edge =>
-      srcVertex.innerId == e.srcVertex.innerId &&
-        tgtVertex.innerId == e.tgtVertex.innerId &&
-        labelWithDir == e.labelWithDir && op == e.op && version == e.version &&
-        pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode &&
-        parentEdges == e.parentEdges && originalEdgeOpt == originalEdgeOpt
-    case _ => false
-  }
-
-  override def toString(): String = {
-    Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction,
-      "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString,
-      "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs
-    ).toString
-  }
-
-  def checkProperty(key: String): Boolean = propsWithTs.containsKey(key)
-
-  def copyEdge(srcVertex: Vertex = srcVertex,
-               tgtVertex: Vertex = tgtVertex,
-               innerLabel: Label = innerLabel,
-               dir: Int = dir,
-               op: Byte = op,
-               version: Long = version,
-               propsWithTs: State = Edge.propsToState(this.propsWithTs),
-               parentEdges: Seq[EdgeWithScore] = parentEdges,
-               originalEdgeOpt: Option[Edge] = originalEdgeOpt,
-               pendingEdgeOpt: Option[Edge] = pendingEdgeOpt,
-               statusCode: Byte = statusCode,
-               lockTs: Option[Long] = lockTs,
-               tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt,
-               ts: Long = ts): Edge = {
-    val edge = new Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, Edge.EmptyProps,
-      parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt)
-    Edge.fillPropsWithTs(edge, propsWithTs)
-    edge.property(LabelMeta.timestamp.name, ts, ts)
-    edge
-  }
-
-  def copyEdgeWithState(state: State, ts: Long): Edge = {
-    val newEdge = copy(propsWithTs = Edge.EmptyProps)
-    Edge.fillPropsWithTs(newEdge, state)
-    newEdge.property(LabelMeta.timestamp.name, ts, ts)
-    newEdge
-  }
-
-  def copyEdgeWithState(state: State): Edge = {
-    val newEdge = copy(propsWithTs = Edge.EmptyProps)
-    Edge.fillPropsWithTs(newEdge, state)
-    newEdge
-  }
-
-  override def vertices(direction: Direction): util.Iterator[structure.Vertex] = ???
-
-  override def properties[V](strings: String*): util.Iterator[Property[V]] = ???
-
-  override def property[V](key: String): Property[V] = {
-    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
-    if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]]
-    else {
-      val default = innerLabel.metaPropsDefaultMapInner(labelMeta)
-      property(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]]
-    }
-  }
-
-  override def property[V](key: String, value: V): Property[V] = {
-    property(key, value, System.currentTimeMillis())
-  }
-
-  def property[V](key: String, value: V, ts: Long): Property[V] = {
-    val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge."))
-    val newProp = new S2Property[V](this, labelMeta, key, value, ts)
-    propsWithTs.put(key, newProp)
-    newProp
-  }
-
-  override def remove(): Unit = {}
-
-  override def graph(): TpGraph = innerGraph
-
-  override def id(): AnyRef = (srcVertex.innerId, labelWithDir, tgtVertex.innerId)
-
-  override def label(): String = innerLabel.label
-}
-
-
-case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge],
-                      edgesToInsert: List[IndexEdge] = List.empty[IndexEdge],
-                      newSnapshotEdge: Option[SnapshotEdge] = None) {
-
-  def toLogString: String = {
-    val l = (0 until 50).map(_ => "-").mkString("")
-    val deletes = s"deletes: ${edgesToDelete.map(e => e.toLogString).mkString("\n")}"
-    val inserts = s"inserts: ${edgesToInsert.map(e => e.toLogString).mkString("\n")}"
-    val updates = s"snapshot: ${newSnapshotEdge.map(e => e.toLogString).mkString("\n")}"
-
-    List("\n", l, deletes, inserts, updates, l, "\n").mkString("\n")
-  }
-}
-
-object Edge {
-  val incrementVersion = 1L
-  val minTsVal = 0L
-
-  /** now version information is required also **/
-  type Props = java.util.Map[String, S2Property[_]]
-  type State = Map[LabelMeta, InnerValLikeWithTs]
-  type PropsPairWithTs = (State, State, Long, String)
-  type MergeState = PropsPairWithTs => (State, Boolean)
-  type UpdateFunc = (Option[Edge], Edge, MergeState)
-
-  def EmptyProps = new java.util.HashMap[String, S2Property[_]]
-  def EmptyState = Map.empty[LabelMeta, InnerValLikeWithTs]
-  def sameProps(base: Props, other: Props): Boolean = {
-    if (base.size != other.size) false
-    else {
-      var ret = true
-      val iter = base.entrySet().iterator()
-      while (iter.hasNext) {
-        val e = iter.next()
-        if (!other.containsKey(e.getKey)) ret = false
-        else if (e.getValue != other.get(e.getKey)) ret = false
-        else {
-
-        }
-      }
-      val otherIter = other.entrySet().iterator()
-      while (otherIter.hasNext) {
-        val e = otherIter.next()
-        if (!base.containsKey(e.getKey)) ret = false
-        else if (e.getValue != base.get(e.getKey)) ret = false
-        else {
-
-        }
-      }
-      ret
-    }
-//    base.sameElements(other)
-  }
-  def fillPropsWithTs(snapshotEdge: SnapshotEdge, state: State): Unit = {
-    state.foreach { case (k, v) => snapshotEdge.property(k.name, v.innerVal.value, v.ts) }
-  }
-  def fillPropsWithTs(indexEdge: IndexEdge, state: State): Unit = {
-    state.foreach { case (k, v) => indexEdge.property(k.name, v.innerVal.value, v.ts) }
-  }
-  def fillPropsWithTs(edge: Edge, state: State): Unit = {
-    state.foreach { case (k, v) => edge.property(k.name, v.innerVal.value, v.ts) }
-  }
-
-  def propsToState(props: Props): State = {
-    props.asScala.map { case (k, v) =>
-      v.labelMeta -> v.innerValWithTs
-    }.toMap
-  }
-
-  def stateToProps(edge: Edge, state: State): Props = {
-    state.foreach { case (k, v) =>
-      edge.property(k.name, v.innerVal.value, v.ts)
-    }
-    edge.propsWithTs
-  }
-
-  def allPropsDeleted(props: Map[LabelMeta, InnerValLikeWithTs]): Boolean =
-    if (!props.contains(LabelMeta.lastDeletedAt)) false
-    else {
-      val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts
-      val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt
-
-      propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
-    }
-
-  def allPropsDeleted(props: Props): Boolean =
-    if (!props.containsKey(LabelMeta.lastDeletedAt.name)) false
-    else {
-      val lastDeletedAt = props.get(LabelMeta.lastDeletedAt.name).ts
-      props.remove(LabelMeta.lastDeletedAt.name)
-//      val propsWithoutLastDeletedAt = props
-//
-//      propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt }
-      var ret = true
-      val iter = props.entrySet().iterator()
-      while (iter.hasNext) {
-        val e = iter.next()
-        if (e.getValue.ts > lastDeletedAt) ret = false
-      }
-      ret
-    }
-
-  def buildDeleteBulk(invertedEdge: Option[Edge], requestEdge: Edge): (Edge, EdgeMutate) = {
-    //    assert(invertedEdge.isEmpty)
-    //    assert(requestEdge.op == GraphUtil.operations("delete"))
-
-    val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid }
-    val edgeInverted = Option(requestEdge.toSnapshotEdge)
-
-    (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, newSnapshotEdge = edgeInverted))
-  }
-
-  def buildOperation(invertedEdge: Option[Edge], requestEdges: Seq[Edge]): (Edge, EdgeMutate) = {
-    //            logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}")
-    //            logger.debug(s"requestEdge: ${requestEdge.toStringRaw}")
-    val oldPropsWithTs =
-      if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs]
-      else propsToState(invertedEdge.get.propsWithTs)
-
-    val funcs = requestEdges.map { edge =>
-      if (edge.op == GraphUtil.operations("insert")) {
-        edge.innerLabel.consistencyLevel match {
-          case "strong" => Edge.mergeUpsert _
-          case _ => Edge.mergeInsertBulk _
-        }
-      } else if (edge.op == GraphUtil.operations("insertBulk")) {
-        Edge.mergeInsertBulk _
-      } else if (edge.op == GraphUtil.operations("delete")) {
-        edge.innerLabel.consistencyLevel match {
-          case "strong" => Edge.mergeDelete _
-          case _ => throw new RuntimeException("not supported")
-        }
-      }
-      else if (edge.op == GraphUtil.operations("update")) Edge.mergeUpdate _
-      else if (edge.op == GraphUtil.operations("increment")) Edge.mergeIncrement _
-      else throw new RuntimeException(s"not supported operation on edge: $edge")
-    }
-
-    val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal)
-    val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != _._1.ts).sortBy(_._1.ts)
-
-    if (requestWithFuncs.isEmpty) {
-      (requestEdges.head, EdgeMutate())
-    } else {
-      val requestEdge = requestWithFuncs.last._1
-      var prevPropsWithTs = oldPropsWithTs
-
-      for {
-        (requestEdge, func) <- requestWithFuncs
-      } {
-        val (_newPropsWithTs, _) = func(prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer)
-        prevPropsWithTs = _newPropsWithTs
-        //        logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n")
-      }
-      val requestTs = requestEdge.ts
-      /** version should be monotoniously increasing so our RPC mutation should be applied safely */
-      val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs)
-      val maxTs = prevPropsWithTs.map(_._2.ts).max
-      val newTs = if (maxTs > requestTs) maxTs else requestTs
-      val propsWithTs = prevPropsWithTs ++
-        Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.innerLabel.schemaVersion), newTs))
-
-      val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs)
-
-      //      logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}")
-      //      logger.error(s"$propsWithTs")
-      val newEdge = requestEdge.copy(propsWithTs = EmptyProps)
-      fillPropsWithTs(newEdge, propsWithTs)
-      (newEdge, edgeMutate)
-    }
-  }
-
-  def filterOutWithLabelOption(ls: Seq[IndexEdge]): Seq[IndexEdge] = ls.filter { ie =>
-    ie.labelIndex.dir match {
-      case None =>
-        // both direction use same indices that is defined when label creation.
-        true
-      case Some(dir) =>
-        if (dir != ie.dir) {
-          // current labelIndex's direction is different with indexEdge's direction so don't touch
-          false
-        } else {
-          ie.labelIndex.writeOption.map { option =>
-            val hashValueOpt = ie.orders.find { case (k, v) => k == LabelMeta.fromHash }.map{ case (k, v) => v.value.toString.toLong }
-            option.sample(ie, hashValueOpt)
-          }.getOrElse(true)
-        }
-    }
-  }
-
-  def buildMutation(snapshotEdgeOpt: Option[Edge],
-                    requestEdge: Edge,
-                    newVersion: Long,
-                    oldPropsWithTs: Map[LabelMeta, InnerValLikeWithTs],
-                    newPropsWithTs: Map[LabelMeta, InnerValLikeWithTs]): EdgeMutate = {
-
-    if (oldPropsWithTs == newPropsWithTs) {
-      // all requests should be dropped. so empty mutation.
-      EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = None)
-    } else {
-      val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAtSeq)
-      val newOp = snapshotEdgeOpt match {
-        case None => requestEdge.op
-        case Some(old) =>
-          val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max
-          if (oldMaxTs > requestEdge.ts) old.op
-          else requestEdge.op
-      }
-
-      val newSnapshotEdge = requestEdge.copy(op = newOp, version = newVersion).copyEdgeWithState(newPropsWithTs)
-
-      val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge)
-      // delete request must always update snapshot.
-      if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.contains(LabelMeta.lastDeletedAt)) {
-        // no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt.
-        EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt)
-      } else {
-        val edgesToDelete = snapshotEdgeOpt match {
-          case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") =>
-            snapshotEdge.copy(op = GraphUtil.defaultOpByte)
-              .relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
-          case _ => Nil
-        }
-
-        val edgesToInsert =
-          if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil
-          else {
-            val newEdge = requestEdge.copy(
-              version = newVersion,
-              propsWithTs = Edge.EmptyProps,
-              op = GraphUtil.defaultOpByte
-            )
-            newPropsWithTs.foreach { case (k, v) => newEdge.property(k.name, v.innerVal.value, v.ts) }
-
-            newEdge.relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) }
-          }
-
-
-        EdgeMutate(edgesToDelete = edgesToDelete,
-          edgesToInsert = edgesToInsert,
-          newSnapshotEdge = newSnapshotEdgeOpt)
-      }
-    }
-  }
-
-  def mergeUpsert(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
-    var shouldReplace = false
-    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
-    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
-    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
-      propsWithTs.get(k) match {
-        case Some(newValWithTs) =>
-          assert(oldValWithTs.ts >= lastDeletedAt)
-          val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
-          else {
-            shouldReplace = true
-            newValWithTs
-          }
-          Some(k -> v)
-
-        case None =>
-          assert(oldValWithTs.ts >= lastDeletedAt)
-          if (oldValWithTs.ts >= requestTs || k.seq < 0) Some(k -> oldValWithTs)
-          else {
-            shouldReplace = true
-            None
-          }
-      }
-    }
-    val existInNew =
-      for {
-        (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
-      } yield {
-        shouldReplace = true
-        Some(k -> newValWithTs)
-      }
-
-    ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
-  }
-
-  def mergeUpdate(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
-    var shouldReplace = false
-    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
-    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
-    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
-      propsWithTs.get(k) match {
-        case Some(newValWithTs) =>
-          assert(oldValWithTs.ts >= lastDeletedAt)
-          val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
-          else {
-            shouldReplace = true
-            newValWithTs
-          }
-          Some(k -> v)
-        case None =>
-          // important: update need to merge previous valid values.
-          assert(oldValWithTs.ts >= lastDeletedAt)
-          Some(k -> oldValWithTs)
-      }
-    }
-    val existInNew = for {
-      (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
-    } yield {
-      shouldReplace = true
-      Some(k -> newValWithTs)
-    }
-
-    ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
-  }
-
-  def mergeIncrement(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
-    var shouldReplace = false
-    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
-    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal)
-    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
-      propsWithTs.get(k) match {
-        case Some(newValWithTs) =>
-          if (k == LabelMeta.timestamp) {
-            val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs
-            else {
-              shouldReplace = true
-              newValWithTs
-            }
-            Some(k -> v)
-          } else {
-            if (oldValWithTs.ts >= newValWithTs.ts) {
-              Some(k -> oldValWithTs)
-            } else {
-              assert(oldValWithTs.ts < newValWithTs.ts && oldValWithTs.ts >= lastDeletedAt)
-              shouldReplace = true
-              // incr(t0), incr(t2), d(t1) => deleted
-              Some(k -> InnerValLikeWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, oldValWithTs.ts))
-            }
-          }
-
-        case None =>
-          assert(oldValWithTs.ts >= lastDeletedAt)
-          Some(k -> oldValWithTs)
-        //          if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) else None
-      }
-    }
-    val existInNew = for {
-      (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt
-    } yield {
-      shouldReplace = true
-      Some(k -> newValWithTs)
-    }
-
-    ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace)
-  }
-
-  def mergeDelete(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
-    var shouldReplace = false
-    val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs
-    val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt) match {
-      case Some(prevDeletedAt) =>
-        if (prevDeletedAt.ts >= requestTs) prevDeletedAt.ts
-        else {
-          shouldReplace = true
-          requestTs
-        }
-      case None => {
-        shouldReplace = true
-        requestTs
-      }
-    }
-    val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield {
-      if (k == LabelMeta.timestamp) {
-        if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs)
-        else {
-          shouldReplace = true
-          Some(k -> InnerValLikeWithTs.withLong(requestTs, requestTs, version))
-        }
-      } else {
-        if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs)
-        else {
-          shouldReplace = true
-          None
-        }
-      }
-    }
-    val mustExistInNew = Map(LabelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(lastDeletedAt, lastDeletedAt, version))
-    ((existInOld.flatten ++ mustExistInNew).toMap, shouldReplace)
-  }
-
-  def mergeInsertBulk(propsPairWithTs: PropsPairWithTs): (State, Boolean) = {
-    val (_, propsWithTs, _, _) = propsPairWithTs
-    (propsWithTs, true)
-  }
-
-//  def fromString(s: String): Option[Edge] = Graph.toEdge(s)
-
-
-}