You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/10/30 10:23:32 UTC

[4/8] incubator-s2graph git commit: Separate interfaces from Storage.

Separate interfaces from Storage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/39544dc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/39544dc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/39544dc5

Branch: refs/heads/master
Commit: 39544dc5debd4821c4f73db17e88bdbeb9f43b38
Parents: 3361320
Author: DO YUNG YOON <st...@apache.org>
Authored: Sat Oct 28 09:27:51 2017 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Sat Oct 28 09:27:51 2017 +0900

----------------------------------------------------------------------
 .gitignore                                      |    2 +-
 s2core/build.sbt                                |    3 +-
 .../org/apache/s2graph/core/Management.scala    |   74 +-
 .../org/apache/s2graph/core/QueryParam.scala    |    2 +-
 .../scala/org/apache/s2graph/core/S2Edge.scala  |    2 +-
 .../scala/org/apache/s2graph/core/S2Graph.scala |  275 +++--
 .../org/apache/s2graph/core/S2Vertex.scala      |   40 +-
 .../s2graph/core/storage/Deserializable.scala   |   43 -
 .../s2graph/core/storage/MutateResponse.scala   |   12 +
 .../apache/s2graph/core/storage/SKeyValue.scala |    1 +
 .../s2graph/core/storage/Serializable.scala     |   27 -
 .../apache/s2graph/core/storage/Storage.scala   | 1146 ++----------------
 .../core/storage/StorageDeserializable.scala    |  120 --
 .../apache/s2graph/core/storage/StorageIO.scala |  241 ++++
 .../core/storage/StorageManagement.scala        |   35 +
 .../s2graph/core/storage/StorageReadable.scala  |   62 +
 .../s2graph/core/storage/StorageSerDe.scala     |   59 +
 .../core/storage/StorageSerializable.scala      |   82 --
 .../s2graph/core/storage/StorageWritable.scala  |   45 +
 .../storage/WriteWriteConflictResolver.scala    |  438 +++++++
 .../core/storage/hbase/AsynchbaseStorage.scala  |  764 +-----------
 .../hbase/AsynchbaseStorageManagement.scala     |  263 ++++
 .../hbase/AsynchbaseStorageReadable.scala       |  335 +++++
 .../storage/hbase/AsynchbaseStorageSerDe.scala  |   68 ++
 .../hbase/AsynchbaseStorageWritable.scala       |  118 ++
 .../core/storage/serde/Deserializable.scala     |   41 +
 .../core/storage/serde/Serializable.scala       |   27 +
 .../storage/serde/StorageDeserializable.scala   |  144 +++
 .../storage/serde/StorageSerializable.scala     |   90 ++
 .../tall/IndexEdgeDeserializable.scala          |   12 +-
 .../indexedge/tall/IndexEdgeSerializable.scala  |    5 +-
 .../wide/IndexEdgeDeserializable.scala          |    4 +-
 .../indexedge/wide/IndexEdgeSerializable.scala  |    5 +-
 .../tall/SnapshotEdgeDeserializable.scala       |    7 +-
 .../tall/SnapshotEdgeSerializable.scala         |    4 +-
 .../wide/SnapshotEdgeDeserializable.scala       |    9 +-
 .../wide/SnapshotEdgeSerializable.scala         |    4 +-
 .../serde/vertex/VertexDeserializable.scala     |  146 +--
 .../serde/vertex/VertexSerializable.scala       |  114 +-
 .../vertex/tall/VertexDeserializable.scala      |   58 +
 .../serde/vertex/tall/VertexSerializable.scala  |   54 +
 .../vertex/wide/VertexDeserializable.scala      |   73 ++
 .../serde/vertex/wide/VertexSerializable.scala  |   52 +
 .../apache/s2graph/core/utils/DeferCache.scala  |    8 +-
 .../s2graph/core/Integrate/CrudTest.scala       |    8 +-
 .../LabelLabelIndexMutateOptionTest.scala       |    6 +-
 .../s2graph/core/storage/StorageIOTest.scala    |   59 +
 .../core/storage/hbase/IndexEdgeTest.scala      |    3 +-
 .../core/storage/rocks/RocksStorageTest.scala   |   33 +
 .../rest/play/controllers/EdgeController.scala  |   11 +-
 .../play/controllers/VertexController.scala     |    5 +-
 51 files changed, 2913 insertions(+), 2326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 9f295f2..ad5a5e9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -107,4 +107,4 @@ server.pid
 .cache
 
 ### Local Embedded HBase Data ###
-storage/
+#storage/

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 8033581..e7e602f 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -48,7 +48,8 @@ libraryDependencies ++= Seq(
   "org.specs2" %% "specs2-core" % specs2Version % "test",
   "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion ,
   "org.apache.lucene" % "lucene-core" % "6.6.0",
-  "org.apache.lucene" % "lucene-queryparser" % "6.6.0"
+  "org.apache.lucene" % "lucene-queryparser" % "6.6.0",
+  "org.rocksdb" % "rocksdbjni" % "5.8.0"
 )
 
 libraryDependencies := {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index a9741d2..49d3c0e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.core
 
 import java.util
 
+import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException}
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
 import org.apache.s2graph.core.mysqls._
@@ -36,9 +37,22 @@ import scala.util.Try
  * s2core never use this for finding models.
  */
 object Management {
-
+  import HBaseType._
   import scala.collection.JavaConversions._
 
+  val ZookeeperQuorum = "hbase.zookeeper.quorum"
+  val ColumnFamilies = "hbase.table.column.family"
+  val RegionMultiplier = "hbase.table.region.multiplier"
+  val Ttl = "hbase.table.ttl"
+  val CompressionAlgorithm = "hbase.table.compression.algorithm"
+  val ReplicationScope = "hbase.table.replication.scope"
+  val TotalRegionCount = "hbase.table.total.region.count"
+
+  val DefaultColumnFamilies = Seq("e", "v")
+  val DefaultCompressionAlgorithm = "gz"
+  val LABEL_NAME_MAX_LENGTH = 100
+
+
   def newProp(name: String, defaultValue: String, datatType: String): Prop = {
     new Prop(name, defaultValue, datatType)
   }
@@ -56,10 +70,6 @@ object Management {
     case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None)
   }
 
-  import HBaseType._
-
-  val LABEL_NAME_MAX_LENGTH = 100
-  val DefaultCompressionAlgorithm = "gz"
 
   def findService(serviceName: String) = {
     Service.findByName(serviceName, useCache = false)
@@ -263,6 +273,24 @@ object Management {
       Label.updateName(tempLabel, rightLabel)
     }
   }
+  def toConfig(params: Map[String, Any]): Config = {
+    import scala.collection.JavaConversions._
+
+    val filtered = params.filter { case (k, v) =>
+        v match {
+          case None => false
+          case _ => true
+        }
+    }.map { case (k, v) =>
+        val newV = v match {
+          case Some(value) => value
+          case _ => v
+        }
+        k -> newV
+    }
+
+    ConfigFactory.parseMap(filtered)
+  }
 }
 
 class Management(graph: S2Graph) {
@@ -277,7 +305,15 @@ class Management(graph: S2Graph) {
                   compressionAlgorithm: String = DefaultCompressionAlgorithm,
                   replicationScopeOpt: Option[Int] = None,
                   totalRegionCount: Option[Int] = None): Unit = {
-    graph.defaultStorage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm, replicationScopeOpt, totalRegionCount)
+    val config = toConfig(Map(
+      ZookeeperQuorum -> zkAddr,
+//      ColumnFamilies -> cfs,
+      RegionMultiplier -> regionMultiplier,
+      Ttl -> ttl,
+      CompressionAlgorithm -> compressionAlgorithm,
+      TotalRegionCount -> totalRegionCount
+    ))
+    graph.defaultStorage.createTable(config, tableName)
   }
 
 
@@ -299,8 +335,15 @@ class Management(graph: S2Graph) {
 
     Model withTx { implicit session =>
       val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, useCache = false)
+      val config = toConfig(Map(
+        ZookeeperQuorum -> service.cluster,
+//        ColumnFamilies -> List("e", "v"),
+        RegionMultiplier -> service.preSplitSize,
+        Ttl -> service.hTableTTL,
+        CompressionAlgorithm -> compressionAlgorithm
+      ))
       /* create hbase table for service */
-      graph.getStorage(service).createTable(service.cluster, service.hTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
+      graph.getStorage(service).createTable(config, service.hTableName)
       service
     }
   }
@@ -390,7 +433,14 @@ class Management(graph: S2Graph) {
       /* create hbase table */
       val storage = graph.getStorage(newLabel)
       val service = newLabel.service
-      storage.createTable(service.cluster, newLabel.hbaseTableName, List("e", "v"), service.preSplitSize, newLabel.hTableTTL, newLabel.compressionAlgorithm)
+      val config = toConfig(Map(
+        ZookeeperQuorum -> service.cluster,
+//        ColumnFamilies -> List("e", "v"),
+        RegionMultiplier -> service.preSplitSize,
+        Ttl -> newLabel.hTableTTL,
+        CompressionAlgorithm -> newLabel.compressionAlgorithm
+      ))
+      storage.createTable(config, newLabel.hbaseTableName)
 
       newLabel
     }
@@ -449,7 +499,9 @@ class Management(graph: S2Graph) {
       labelOpt.map { label =>
         val storage = graph.getStorage(label)
         val zkAddr = label.service.cluster
-        storage.truncateTable(zkAddr, label.hbaseTableName)
+
+        val config = toConfig(Map(ZookeeperQuorum -> zkAddr))
+        storage.truncateTable(config, label.hbaseTableName)
       }
     }
   }
@@ -459,7 +511,9 @@ class Management(graph: S2Graph) {
       labelOpt.map { label =>
         val storage = graph.getStorage(label)
         val zkAddr = label.service.cluster
-        storage.deleteTable(zkAddr, label.hbaseTableName)
+
+        val config = toConfig(Map(ZookeeperQuorum -> zkAddr))
+        storage.deleteTable(config, label.hbaseTableName)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
index 7e95f58..1100f6c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala
@@ -26,7 +26,7 @@ import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.parsers.{Where, WhereParser}
 import org.apache.s2graph.core.rest.TemplateHelper
-import org.apache.s2graph.core.storage.StorageSerializable._
+import org.apache.s2graph.core.storage.serde.StorageSerializable._
 import org.apache.s2graph.core.types._
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
 import org.hbase.async.ColumnRangeFilter

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
index 7165579..51af831 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala
@@ -684,7 +684,7 @@ case class S2Edge(innerGraph: S2Graph,
       // should we delete related edges also?
       val future = innerGraph.mutateEdges(Seq(edgeToDelete), withWait = true)
       val mutateSuccess = Await.result(future, innerGraph.WaitTimeout)
-      if (!mutateSuccess.forall(identity)) throw new RuntimeException("edge remove failed.")
+      if (!mutateSuccess.forall(_.isSuccess)) throw new RuntimeException("edge remove failed.")
     } else {
       throw Edge.Exceptions.edgeRemovalNotSupported()
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index d1eda5e..32724b4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -22,18 +22,17 @@ package org.apache.s2graph.core
 import java.util
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 import java.util.concurrent.{Executors, TimeUnit}
-import java.util.function.Consumer
-
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.commons.configuration.{BaseConfiguration, Configuration}
-import org.apache.s2graph.core.GraphExceptions.{FetchTimeoutException, LabelNotExistException}
+import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core.features.S2GraphVariables
-import org.apache.s2graph.core.index.{IndexProvider, LuceneIndexProvider}
+import org.apache.s2graph.core.index.IndexProvider
 import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
 import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
-import org.apache.s2graph.core.storage.{SKeyValue, Storage}
+import org.apache.s2graph.core.storage.rocks.RocksStorage
+import org.apache.s2graph.core.storage.{ MutateResponse, SKeyValue, Storage}
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
@@ -44,8 +43,6 @@ import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables}
 import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, Io, Mapper}
 import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex}
 import play.api.libs.json.{JsObject, Json}
-import scalikejdbc.DBSession
-
 import scala.annotation.tailrec
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -137,12 +134,13 @@ object S2Graph {
     new S2Graph(configuration)(ec)
   }
 
-  def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = {
+  def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_] = {
     val storageBackend = config.getString("s2graph.storage.backend")
     logger.info(s"[InitStorage]: $storageBackend")
 
     storageBackend match {
-      case "hbase" => new AsynchbaseStorage(graph, config)(ec)
+      case "hbase" => new AsynchbaseStorage(graph, config)
+      case "rocks" => new RocksStorage(graph, config)
       case _ => throw new RuntimeException("not supported storage.")
     }
   }
@@ -912,7 +910,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
   /**
     * TODO: we need to some way to handle malformed configuration for storage.
     */
-  val storagePool: scala.collection.mutable.Map[String, Storage[_, _]] = {
+  val storagePool: scala.collection.mutable.Map[String, Storage[_]] = {
     val labels = Label.findAll()
     val services = Service.findAll()
 
@@ -923,12 +921,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
       confWithFallback(conf)
     }.toSet
 
-    val pools = new java.util.HashMap[Config, Storage[_, _]]()
+    val pools = new java.util.HashMap[Config, Storage[_]]()
     configs.foreach { config =>
       pools.put(config, S2Graph.initStorage(this, config)(ec))
     }
 
-    val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_, _]]()
+    val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_]]()
 
     labels.foreach { label =>
       if (label.storageConfigOpt.isDefined) {
@@ -945,7 +943,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     m
   }
 
-  val defaultStorage: Storage[_, _] = S2Graph.initStorage(this, config)(ec)
+  val defaultStorage: Storage[_] = S2Graph.initStorage(this, config)(ec)
 
   /** QueryLevel FutureCache */
   val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty)
@@ -957,11 +955,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
 
   val indexProvider = IndexProvider.apply(config)
 
-  def getStorage(service: Service): Storage[_, _] = {
+  def getStorage(service: Service): Storage[_] = {
     storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
   }
 
-  def getStorage(label: Label): Storage[_, _] = {
+  def getStorage(label: Label): Storage[_] = {
     storagePool.getOrElse(s"label:${label.label}", defaultStorage)
   }
 
@@ -979,8 +977,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     val futures = for {
       edge <- edges
     } yield {
-      fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) =>
-        edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, queryParam.label))
+      getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (_, edgeOpt, _) =>
+        edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, edge.innerLabel))
       }
     }
 
@@ -1147,39 +1145,33 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
         fallback
     } get
   }
+  
+  def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
+    def getVertices[Q](storage: Storage[Q])(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
+      def fromResult(kvs: Seq[SKeyValue],
+                     version: String): Option[S2Vertex] = {
+        if (kvs.isEmpty) None
+        else storage.vertexDeserializer(version).fromKeyValues(kvs, None)
+        //        .map(S2Vertex(graph, _))
+      }
 
-
-  def fetchSnapshotEdge(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
-    /* TODO: Fix this. currently fetchSnapshotEdge should not use future cache
-      * so use empty cacheKey.
-      * */
-    val queryParam = QueryParam(labelName = edge.innerLabel.label,
-      direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
-      tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
-      cacheTTLInMillis = -1)
-    val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
-    val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
-
-    val storage = getStorage(edge.innerLabel)
-    storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs =>
-      val (edgeOpt, kvOpt) =
-        if (kvs.isEmpty) (None, None)
-        else {
-          val snapshotEdgeOpt = storage.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
-          val _kvOpt = kvs.headOption
-          (snapshotEdgeOpt, _kvOpt)
+      val futures = vertices.map { vertex =>
+        val queryParam = QueryParam.Empty
+        val q = Query.toQuery(Seq(vertex), Seq(queryParam))
+        val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+        val rpc = storage.buildRequest(queryRequest, vertex)
+        storage.fetchKeyValues(rpc).map { kvs =>
+          fromResult(kvs, vertex.serviceColumn.schemaVersion)
+        } recoverWith { case ex: Throwable =>
+          Future.successful(None)
         }
-      (queryParam, edgeOpt, kvOpt)
-    } recoverWith { case ex: Throwable =>
-      logger.error(s"fetchQueryParam failed. fallback return.", ex)
-      throw FetchTimeoutException(s"${edge.toLogString}")
-    }
-  }
+      }
 
-  def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
+      Future.sequence(futures).map { result => result.toList.flatten }
+    }
     val verticesWithIdx = vertices.zipWithIndex
     val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
-      getStorage(service).getVertices(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2)))
+      getVertices(getStorage(service))(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2)))
     }
 
     Future.sequence(futures).map { ls =>
@@ -1221,8 +1213,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
   }
 
   def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = {
+    val futures = queries.map(getEdgesStepInner(_, true))
     val future = for {
-      stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_, true)))
+      stepInnerResultLs <- Future.sequence(futures)
       (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs)
     } yield {
       //        logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}")
@@ -1241,6 +1234,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
   def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult],
                               requestTs: Long): Future[(Boolean, Boolean)] = {
     stepInnerResultLs.foreach { stepInnerResult =>
+      logger.error(s"[!!!!!!]: ${stepInnerResult.edgeWithScores.size}")
       if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.")
     }
     val futures = for {
@@ -1257,9 +1251,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
               * read: snapshotEdge on queryResult = O(N)
               * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge))
               */
-            mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(identity))
+            mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
           } else {
-            getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
+            deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum)
           }
         case _ =>
 
@@ -1267,7 +1261,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
             * read: x
             * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
             */
-          getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum)
+          deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum)
       }
       ret
     }
@@ -1280,6 +1274,44 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     }
   }
 
+  private def deleteAllFetchedEdgesAsyncOld(storage: Storage[_])(stepInnerResult: StepResult,
+                                                         requestTs: Long,
+                                                         retryNum: Int): Future[Boolean] = {
+    if (stepInnerResult.isEmpty) Future.successful(true)
+    else {
+      val head = stepInnerResult.edgeWithScores.head
+      val zkQuorum = head.edge.innerLabel.hbaseZkAddr
+      val futures = for {
+        edgeWithScore <- stepInnerResult.edgeWithScores
+      } yield {
+        val edge = edgeWithScore.edge
+        val score = edgeWithScore.score
+
+        val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+        val reversedSnapshotEdgeMutations = storage.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+
+        val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+        val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
+          storage.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+            storage.buildIncrementsAsync(indexEdge, -1L)
+        }
+
+        /* reverted direction */
+        val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()))
+        val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+          storage.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+            storage.buildIncrementsAsync(indexEdge, -1L)
+        }
+
+        val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+
+        storage.writeToStorage(zkQuorum, mutations, withWait = true)
+      }
+
+      Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
+    }
+  }
+
   def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = {
     val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore =>
       (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree
@@ -1297,20 +1329,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
             case _ =>
               edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs)
           }
-//        val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match {
-//          case "strong" =>
-//            val edge = edgeWithScore.edge
-//            edge.property(LabelMeta.timestamp.name, requestTs)
-//            val _newPropsWithTs = edge.updatePropsWithTs()
-//
-//            (GraphUtil.operations("delete"), requestTs, _newPropsWithTs)
-//          case _ =>
-//            val oldEdge = edgeWithScore.edge
-//            (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs())
-//        }
-//
-//        val copiedEdge =
-//          edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs)
 
         val edgeToDelete = edgeWithScore.copy(edge = copiedEdge)
         //      logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}")
@@ -1321,11 +1339,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     }
   }
 
-  //  def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] =
-  //    storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts)
 
   def mutateElements(elements: Seq[GraphElement],
-                     withWait: Boolean = false): Future[Seq[Boolean]] = {
+                     withWait: Boolean = false): Future[Seq[MutateResponse]] = {
 
     val edgeBuffer = ArrayBuffer[(S2Edge, Int)]()
     val vertexBuffer = ArrayBuffer[(S2Vertex, Int)]()
@@ -1355,7 +1371,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
 
   //  def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait)
 
-  def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[Boolean]] = {
+  def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
     val edgeWithIdxs = edges.zipWithIndex
 
     val (strongEdges, weakEdges) =
@@ -1383,7 +1399,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
         }
 
         storage.writeToStorage(zkQuorum, mutations, withWait).map { ret =>
-          idxs.map(idx => idx -> ret)
+          idxs.map(idx => idx -> ret.isSuccess)
         }
       }
       Future.sequence(futures)
@@ -1398,7 +1414,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
       val edges = edgeGroup.map(_._1)
       val idxs = edgeGroup.map(_._2)
       val storage = getStorage(label)
-      storage.mutateStrongEdges(edges, withWait = true).map { rets =>
+      mutateStrongEdges(storage)(edges, withWait = true).map { rets =>
         idxs.zip(rets)
       }
     }
@@ -1408,27 +1424,130 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
       deleteAll <- Future.sequence(deleteAllFutures)
       strong <- Future.sequence(strongEdgesFutures)
     } yield {
-      (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(_._2)
+      (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(r => new MutateResponse(r._2))
     }
   }
 
-  def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = {
+  private def mutateStrongEdges(storage: Storage[_])(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = {
+
+    val edgeWithIdxs = _edges.zipWithIndex
+    val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
+      (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
+    } toSeq
+
+    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
+      val edges = edgeGroup.map(_._1)
+      val idxs = edgeGroup.map(_._2)
+      // After deleteAll, process others
+      val mutateEdgeFutures = edges.toList match {
+        case head :: tail =>
+          val edgeFuture = mutateEdgesInner(storage)(edges, checkConsistency = true , withWait)
+
+          //TODO: decide what we will do on failure on vertex put
+          val puts = storage.buildVertexPutsAsync(head)
+          val vertexFuture = storage.writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
+          Seq(edgeFuture, vertexFuture)
+        case Nil => Nil
+      }
+
+      val composed = for {
+      //        deleteRet <- Future.sequence(deleteAllFutures)
+        mutateRet <- Future.sequence(mutateEdgeFutures)
+      } yield mutateRet
+
+      composed.map(_.forall(_.isSuccess)).map { ret => idxs.map( idx => idx -> ret) }
+    }
+
+    Future.sequence(mutateEdges).map { squashedRets =>
+      squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
+    }
+  }
+
+
+  private def mutateEdgesInner(storage: Storage[_])(edges: Seq[S2Edge],
+                       checkConsistency: Boolean,
+                       withWait: Boolean): Future[MutateResponse] = {
+    assert(edges.nonEmpty)
+    // TODO:: remove after code review: unreachable code
+    if (!checkConsistency) {
+
+      val zkQuorum = edges.head.innerLabel.hbaseZkAddr
+      val futures = edges.map { edge =>
+        val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
+
+        val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy)
+        val mutations =
+          storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+
+        if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false)
+
+        storage.writeToStorage(zkQuorum, mutations, withWait)
+      }
+      Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
+    } else {
+      storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
+        storage.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
+      }
+    }
+  }
+
+  def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
+    def mutateVertex(storage: Storage[_])(vertex: S2Vertex, withWait: Boolean): Future[MutateResponse] = {
+      if (vertex.op == GraphUtil.operations("delete")) {
+        storage.writeToStorage(vertex.hbaseZkAddr,
+          storage.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
+      } else if (vertex.op == GraphUtil.operations("deleteAll")) {
+        logger.info(s"deleteAll for vertex is truncated. $vertex")
+        Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
+      } else {
+        storage.writeToStorage(vertex.hbaseZkAddr, storage.buildPutsAll(vertex), withWait)
+      }
+    }
+
+    def mutateVertices(storage: Storage[_])(vertices: Seq[S2Vertex],
+                       withWait: Boolean = false): Future[Seq[MutateResponse]] = {
+      val futures = vertices.map { vertex => mutateVertex(storage)(vertex, withWait) }
+      Future.sequence(futures)
+    }
+
     val verticesWithIdx = vertices.zipWithIndex
     val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
-      getStorage(service).mutateVertices(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
+      mutateVertices(getStorage(service))(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
     }
     Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
   }
 
-  def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = {
+
+
+  def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
+    def incrementCounts(storage: Storage[_])(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
+      val futures = for {
+        edge <- edges
+      } yield {
+        val kvs = for {
+          relEdge <- edge.relatedEdges
+          edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
+        } yield {
+          val countWithTs = edge.propertyValueInner(LabelMeta.count)
+          val countVal = countWithTs.innerVal.toString().toLong
+          storage.buildIncrementsCountAsync(edgeWithIndex, countVal).head
+        }
+        storage.writeToStorage(edge.innerLabel.hbaseZkAddr, kvs, withWait = withWait)
+      }
+
+      Future.sequence(futures)
+    }
+
     val edgesWithIdx = edges.zipWithIndex
     val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
-      getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+      incrementCounts(getStorage(label))(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+    }
+    Future.sequence(futures).map { ls =>
+      ls.flatten.toSeq.sortBy(_._2).map(_._1)
     }
-    Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) }
   }
 
-  def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[Boolean] = {
+  def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[MutateResponse] = {
     val label = edge.innerLabel
 
     val storage = getStorage(label)
@@ -1840,7 +1959,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
     val vertex = newVertex(id, ts, props, op, belongLabelIds)
 
     val future = mutateVertices(Seq(vertex), withWait = true).map { rets =>
-      if (rets.forall(identity)) vertex
+      if (rets.forall(_.isSuccess)) vertex
       else throw new RuntimeException("addVertex failed.")
     }
     Await.ready(future, WaitTimeout)
@@ -1850,7 +1969,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
 
   def addVertexInner(vertex: S2Vertex): S2Vertex = {
     val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets =>
-      if (rets.forall(identity)) {
+      if (rets.forall(_.isSuccess)) {
         indexProvider.mutateVerticesAsync(Seq(vertex))
       } else throw new RuntimeException("addVertex failed.")
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
index c0dc23b..177d859 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala
@@ -169,6 +169,29 @@ case class S2Vertex(graph: S2Graph,
     graph.fetchEdges(this, labelNameWithDirs.distinct)
   }
 
+  private def edgesAsync(direction: Direction, labelNames: String*): Future[util.Iterator[Edge]] = {
+    val labelNameWithDirs =
+      if (labelNames.isEmpty) {
+        // TODO: Let's clarify direction
+        if (direction == Direction.BOTH) {
+          Label.findBySrcColumnId(id.colId).map(l => l.label -> Direction.OUT.name) ++
+            Label.findByTgtColumnId(id.colId).map(l => l.label -> Direction.IN.name)
+        } else if (direction == Direction.IN) {
+          Label.findByTgtColumnId(id.colId).map(l => l.label -> direction.name)
+        } else {
+          Label.findBySrcColumnId(id.colId).map(l => l.label -> direction.name)
+        }
+      } else {
+        direction match {
+          case Direction.BOTH =>
+            Seq(Direction.OUT, Direction.IN).flatMap { dir => labelNames.map(_ -> dir.name()) }
+          case _ => labelNames.map(_ -> direction.name())
+        }
+      }
+
+    graph.fetchEdgesAsync(this, labelNameWithDirs.distinct)
+  }
+
   // do no save to storage
   def propertyInner[V](cardinality: Cardinality, key: String, value: V, objects: AnyRef*): VertexProperty[V] = {
     S2Property.assertValidProp(key, value)
@@ -242,21 +265,18 @@ case class S2Vertex(graph: S2Graph,
       // remove edge
       // TODO: remove related edges also.
       implicit val ec = graph.ec
-      val ts = System.currentTimeMillis()
-      val outLabels = Label.findBySrcColumnId(id.colId)
-      val inLabels = Label.findByTgtColumnId(id.colId)
+
       val verticesToDelete = Seq(this.copy(op = GraphUtil.operations("delete")))
-      val outFuture = graph.deleteAllAdjacentEdges(verticesToDelete, outLabels, GraphUtil.directions("out"), ts)
-      val inFuture = graph.deleteAllAdjacentEdges(verticesToDelete, inLabels, GraphUtil.directions("in"), ts)
+
       val vertexFuture = graph.mutateVertices(verticesToDelete, withWait = true)
+
       val future = for {
-        outSuccess <- outFuture
-        inSuccess <- inFuture
         vertexSuccess <- vertexFuture
+        edges <- edgesAsync(Direction.BOTH)
       } yield {
-        if (!outSuccess) throw new RuntimeException("Vertex.remove out direction edge delete failed.")
-        if (!inSuccess) throw new RuntimeException("Vertex.remove in direction edge delete failed.")
-        if (!vertexSuccess.forall(identity)) throw new RuntimeException("Vertex.remove vertex delete failed.")
+        edges.asScala.toSeq.foreach { edge => edge.remove() }
+        if (!vertexSuccess.forall(_.isSuccess)) throw new RuntimeException("Vertex.remove vertex delete failed.")
+
         true
       }
       Await.result(future, graph.WaitTimeout)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
deleted file mode 100644
index af20483..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage
-
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceVertexId, VertexId}
-
-
-trait Deserializable[E] extends StorageDeserializable[E] {
-  import StorageDeserializable._
-
-  type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int)
-
-//  /** version 1 and version 2 share same code for parsing row key part */
-//  def parseRow(kv: SKeyValue, version: String = HBaseType.DEFAULT_VERSION): RowKeyRaw = {
-//    var pos = 0
-//    val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version)
-//    pos += srcIdLen
-//    val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
-//    pos += 4
-//    val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
-//
-//    val rowLen = srcIdLen + 4 + 1
-//    (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen)
-//  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala
new file mode 100644
index 0000000..bed1152
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala
@@ -0,0 +1,12 @@
+package org.apache.s2graph.core.storage
+
+object MutateResponse {
+  val Success = new MutateResponse(isSuccess = true)
+  val Failure = new MutateResponse(isSuccess = false)
+  val IncrementFailure = new IncrementResponse(isSuccess = false, -1, -1)
+  val IncrementSuccess = new IncrementResponse(isSuccess = true, -1, -1)
+}
+
+class MutateResponse(val isSuccess: Boolean)
+
+case class IncrementResponse(override val isSuccess: Boolean, afterCount: Long, beforeCount: Long) extends MutateResponse(isSuccess)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
index db9a9da..924d9a3 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -26,6 +26,7 @@ import org.hbase.async.KeyValue
 
 object SKeyValue {
   val EdgeCf = "e".getBytes("UTF-8")
+  val VertexCf = "v".getBytes("UTF-8")
   val Put = 1
   val Delete = 2
   val Increment = 3

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala
deleted file mode 100644
index 6de0b30..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage
-
-object Serializable {
-  val vertexCf = "v".getBytes("UTF-8")
-  val edgeCf = "e".getBytes("UTF-8")
-}
-
-trait Serializable[E] extends StorageSerializable[E]