You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/30 12:07:20 UTC

[4/7] incubator-s2graph git commit: - add graph on Vertex. - change colId into ServiceColumn in VertexId.

- add graph on Vertex.
- change colId into ServiceColumn in VertexId.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/189bc41e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/189bc41e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/189bc41e

Branch: refs/heads/master
Commit: 189bc41e04818833792899b37e15ffe88f9f98ad
Parents: a81a74c
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Nov 25 09:59:54 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Nov 25 09:59:54 2016 +0900

----------------------------------------------------------------------
 .../loader/subscriber/TransferToHFile.scala     |  2 +-
 .../scala/org/apache/s2graph/core/Edge.scala    | 12 +++----
 .../scala/org/apache/s2graph/core/Graph.scala   | 36 +++++++++++++++++---
 .../scala/org/apache/s2graph/core/Vertex.scala  | 25 +++-----------
 .../s2graph/core/mysqls/ServiceColumn.scala     |  3 +-
 .../s2graph/core/rest/RequestParser.scala       |  6 ++--
 .../apache/s2graph/core/rest/RestHandler.scala  |  2 +-
 .../apache/s2graph/core/storage/Storage.scala   | 10 +++---
 .../tall/IndexEdgeDeserializable.scala          | 12 +++----
 .../wide/IndexEdgeDeserializable.scala          | 10 +++---
 .../tall/SnapshotEdgeDeserializable.scala       | 12 +++----
 .../wide/SnapshotEdgeDeserializable.scala       |  6 ++--
 .../serde/vertex/VertexDeserializable.scala     |  7 ++--
 .../apache/s2graph/core/types/VertexId.scala    | 31 +++++++++--------
 .../org/apache/s2graph/core/EdgeTest.scala      | 22 ++++++------
 .../s2graph/core/Integrate/QueryTest.scala      | 12 +++----
 .../s2graph/core/benchmark/GraphUtilSpec.scala  |  3 +-
 .../s2graph/core/parsers/WhereParserTest.scala  | 12 +++----
 18 files changed, 119 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
----------------------------------------------------------------------
diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
index 3345d56..d1da319 100644
--- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
+++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala
@@ -97,7 +97,7 @@ object TransferToHFile extends SparkApp {
     val innerVal = JSONParser.jsValueToInnerVal(Json.toJson(vertexId), label.srcColumnWithDir(dir).columnType, label.schemaVersion).getOrElse {
       throw new RuntimeException(s"$vertexId can not be converted into innerval")
     }
-    val vertex = Vertex(SourceVertexId(label.srcColumn.id.get, innerVal))
+    val vertex = GraphSubscriberHelper.g.newVertex(SourceVertexId(label.srcColumn, innerVal))
 
     val ts = System.currentTimeMillis()
     val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
index 87f9cd7..f10b4db 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala
@@ -366,18 +366,18 @@ case class Edge(innerGraph: Graph,
   def srcForVertex = {
     val belongLabelIds = Seq(labelWithDir.labelId)
     if (labelWithDir.dir == GraphUtil.directions("in")) {
-      Vertex(VertexId(innerLabel.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+      innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
     } else {
-      Vertex(VertexId(innerLabel.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+      innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
     }
   }
 
   def tgtForVertex = {
     val belongLabelIds = Seq(labelWithDir.labelId)
     if (labelWithDir.dir == GraphUtil.directions("in")) {
-      Vertex(VertexId(innerLabel.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
+      innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds)
     } else {
-      Vertex(VertexId(innerLabel.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
+      innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds)
     }
   }
 
@@ -440,8 +440,8 @@ case class Edge(innerGraph: Graph,
 
 
   def updateTgtVertex(id: InnerValLike) = {
-    val newId = TargetVertexId(tgtVertex.id.colId, id)
-    val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props)
+    val newId = TargetVertexId(tgtVertex.id.column, id)
+    val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props)
     Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index ec3f286..38477b4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -26,7 +26,7 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.commons.configuration.Configuration
 import org.apache.s2graph.core.GraphExceptions.{FetchAllStepFailException, FetchTimeoutException, LabelNotExistException}
 import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta, Model, Service}
+import org.apache.s2graph.core.mysqls._
 import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
 import org.apache.s2graph.core.storage.{SKeyValue, Storage}
 import org.apache.s2graph.core.types._
@@ -1069,7 +1069,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) extends TpGraph
                 ts: Long = System.currentTimeMillis(),
                 operation: String = "insert",
                 withWait: Boolean = true): Future[Boolean] = {
-    val innerVertices = Seq(Vertex.toVertex(serviceName, columnName, id, props.toMap, ts, operation))
+    val innerVertices = Seq(toVertex(serviceName, columnName, id, props.toMap, ts, operation))
     mutateVertices(innerVertices, withWait).map(_.headOption.getOrElse(false))
   }
 
@@ -1122,7 +1122,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) extends TpGraph
   def toVertex(parts: Array[String]): Option[Vertex] = Try {
     val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5))
     val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any]
-    val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
+    val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation)
     Option(vertex)
   } recover {
     case e: Throwable =>
@@ -1130,6 +1130,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) extends TpGraph
       throw e
   } get
 
+
   def newSnapshotEdge(srcVertex: Vertex,
                       tgtVertex: Vertex,
                       label: Label,
@@ -1180,8 +1181,8 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) extends TpGraph
     val srcColId = label.srcColumn.id.get
     val tgtColId = label.tgtColumn.id.get
 
-    val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis())
-    val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())
+    val srcVertex = newVertex(SourceVertexId(label.srcColumn, srcVertexId), System.currentTimeMillis())
+    val tgtVertex = newVertex(TargetVertexId(label.tgtColumn, tgtVertexId), System.currentTimeMillis())
     val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported."))
 
     val labelWithDir = LabelWithDirection(label.id.get, dir)
@@ -1192,6 +1193,31 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) extends TpGraph
     new Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs)
   }
 
+  def newVertex(id: VertexId,
+                ts: Long = System.currentTimeMillis(),
+                props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
+                op: Byte = 0,
+                belongLabelIds: Seq[Int] = Seq.empty): Vertex = {
+    new Vertex(this, id, ts, props, op, belongLabelIds)
+  }
+  def toVertex(serviceName: String,
+               columnName: String,
+               id: Any,
+               props: Map[String, Any] = Map.empty,
+               ts: Long = System.currentTimeMillis(),
+               operation: String = "insert"): Vertex = {
+
+    val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
+    val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
+    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
+
+    val srcVertexId = VertexId(column, toInnerVal(id.toString, column.columnType, column.schemaVersion))
+    val propsInner = column.propsToInnerVals(props) ++
+      Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion))
+
+    new Vertex(this, srcVertexId, ts, propsInner, op)
+  }
+
   override def vertices(objects: AnyRef*): util.Iterator[structure.Vertex] = ???
 
   override def tx(): Transaction = ???

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
index 0ff4f98..57c9824 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala
@@ -29,7 +29,8 @@ import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality
 import org.apache.tinkerpop.gremlin.structure.{Vertex => TpVertex, Direction, Edge, VertexProperty, Graph}
 import play.api.libs.json.Json
 
-case class Vertex(id: VertexId,
+case class Vertex(graph: Graph,
+                  id: VertexId,
                   ts: Long = System.currentTimeMillis(),
                   props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike],
                   op: Byte = 0,
@@ -72,7 +73,7 @@ case class Vertex(id: VertexId,
     meta <- ColumnMeta.findByIdAndSeq(id.colId, seq.toByte)
   } yield (meta.name -> v.toString)
 
-  def toEdgeVertex() = Vertex(SourceVertexId(id.colId, innerId), ts, props, op)
+  def toEdgeVertex() = graph.newVertex(SourceVertexId(id.column, innerId), ts, props, op)
 
 
   override def hashCode() = {
@@ -91,7 +92,7 @@ case class Vertex(id: VertexId,
     }
   }
 
-  def withProps(newProps: Map[Int, InnerValLike]) = Vertex(id, ts, newProps, op)
+  def withProps(newProps: Map[Int, InnerValLike]) = graph.newVertex(id, ts, newProps, op)
 
   def toLogString(): String = {
     val (serviceName, columnName) =
@@ -116,8 +117,6 @@ case class Vertex(id: VertexId,
 
   override def remove(): Unit = ???
 
-  override def graph(): Graph = ???
-
   override def label(): String = ???
 }
 
@@ -129,21 +128,5 @@ object Vertex {
 
   def isLabelId(propKey: Int): Boolean = propKey > Byte.MaxValue
 
-  def toVertex(serviceName: String,
-            columnName: String,
-            id: Any,
-            props: Map[String, Any] = Map.empty,
-            ts: Long = System.currentTimeMillis(),
-            operation: String = "insert"): Vertex = {
-
-    val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found."))
-    val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found."))
-    val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported."))
 
-    val srcVertexId = VertexId(column.id.get, toInnerVal(id.toString, column.columnType, column.schemaVersion))
-    val propsInner = column.propsToInnerVals(props) ++
-      Map(ColumnMeta.timeStampSeq.toInt -> InnerVal.withLong(ts, column.schemaVersion))
-
-    new Vertex(srcVertexId, ts, propsInner, op)
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
index 6fceabc..85b6929 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
@@ -25,10 +25,11 @@ package org.apache.s2graph.core.mysqls
 
 import org.apache.s2graph.core.JSONParser
 import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.types.{InnerValLikeWithTs, InnerValLike}
+import org.apache.s2graph.core.types.{HBaseType, InnerValLikeWithTs, InnerValLike}
 import play.api.libs.json.Json
 import scalikejdbc._
 object ServiceColumn extends Model[ServiceColumn] {
+  val Default = ServiceColumn(Option(HBaseType.DEFAULT_COL_ID), 0, "default", "string", "v4")
 
   def apply(rs: WrappedResultSet): ServiceColumn = {
     ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version"))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 805a544..13e02a0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -268,7 +268,7 @@ class RequestParser(graph: Graph) {
       id <- ids
       innerId <- jsValueToInnerVal(id, serviceColumn.columnType, label.schemaVersion)
     } yield {
-      Vertex(SourceVertexId(serviceColumn.id.get, innerId), System.currentTimeMillis())
+      graph.newVertex(SourceVertexId(serviceColumn, innerId), System.currentTimeMillis())
     }
 
     vertices
@@ -358,7 +358,7 @@ class RequestParser(graph: Graph) {
         idJson = (value \ "id").asOpt[JsValue].map(Seq(_)).getOrElse(Nil)
         idsJson = (value \ "ids").asOpt[Seq[JsValue]].getOrElse(Nil)
         id <- (idJson ++ idsJson).flatMap(jsValueToAny(_).toSeq).distinct
-      } yield Vertex.toVertex(serviceName, columnName, id)
+      } yield graph.toVertex(serviceName, columnName, id)
 
       if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty")
       val steps = parse[Vector[JsValue]](jsValue, "steps")
@@ -586,7 +586,7 @@ class RequestParser(graph: Graph) {
     val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get
     val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get
     val props = fromJsonToProperties((jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()))
-    Vertex.toVertex(sName, cName, id.toString, props, ts, operation)
+    graph.toVertex(sName, cName, id.toString, props, ts, operation)
   }
 
   def toPropElements(jsObj: JsValue) = Try {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index 099a7f9..2d34c7a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -219,7 +219,7 @@ class RestHandler(graph: Graph)(implicit ec: ExecutionContext) {
         idJson <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])
         id <- jsValueToAny(idJson)
       } yield {
-        Vertex.toVertex(serviceName, columnName, id)
+        graph.toVertex(serviceName, columnName, id)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 26d6ad1..efe7a3d 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -133,7 +133,7 @@ abstract class Storage[Q, R](val graph: Graph,
     indexEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}"))
 
   /** create deserializer that can parser stored CanSKeyValue into vertex. */
-  val vertexDeserializer: Deserializable[Vertex] = new VertexDeserializable
+  val vertexDeserializer: Deserializable[Vertex] = new VertexDeserializable(graph)
 
 
   /**
@@ -973,14 +973,14 @@ abstract class Storage[Q, R](val graph: Graph,
         /** we use toSnapshotEdge so dont need to swap src, tgt */
         val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion)
         val tgt = InnerVal.convertVersion(tgtVertexId, tgtColumn.columnType, label.schemaVersion)
-        val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, src), TargetVertexId(tgtColumn.id.get, tgt))
-        val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId))
+        val (srcVId, tgtVId) = (SourceVertexId(srcColumn, src), TargetVertexId(tgtColumn, tgt))
+        val (srcV, tgtV) = (graph.newVertex(srcVId), graph.newVertex(tgtVId))
 
         graph.newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs)
       case None =>
         val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion)
-        val srcVId = SourceVertexId(srcColumn.id.get, src)
-        val srcV = Vertex(srcVId)
+        val srcVId = SourceVertexId(srcColumn, src)
+        val srcV = graph.newVertex(srcVId)
 
         graph.newEdge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges)
     }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index c538e53..e11a5f6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.indexedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable}
 import org.apache.s2graph.core.types._
@@ -61,7 +61,7 @@ class IndexEdgeDeserializable(graph: Graph,
 
      val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
 
-     val srcVertex = Vertex(srcVertexId, version)
+     val srcVertex = graph.newVertex(srcVertexId, version)
      //TODO:
      val edge = graph.newEdge(srcVertex, null,
        label, labelWithDir.dir, GraphUtil.defaultOpByte, version, Edge.EmptyState)
@@ -71,11 +71,11 @@ class IndexEdgeDeserializable(graph: Graph,
        // degree
        //      val degreeVal = Bytes.toLong(kv.value)
        val degreeVal = bytesToLongFunc(kv.value, 0)
-       val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
+       val tgtVertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("0", schemaVer))
 
        edge.property(LabelMeta.timestamp.name, version, version)
        edge.property(LabelMeta.degree.name, degreeVal, version)
-       edge.tgtVertex = Vertex(tgtVertexId, version)
+       edge.tgtVertex = graph.newVertex(tgtVertexId, version)
        edge.op = GraphUtil.defaultOpByte
        edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
        edge
@@ -125,11 +125,11 @@ class IndexEdgeDeserializable(graph: Graph,
        val tgtVertexId =
          if (edge.checkProperty(LabelMeta.to.name)) {
            val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
-           TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+           TargetVertexId(ServiceColumn.Default, vId.innerVal)
          } else tgtVertexIdRaw
 
 
-       edge.tgtVertex = Vertex(tgtVertexId, version)
+       edge.tgtVertex = graph.newVertex(tgtVertexId, version)
        edge.op = op
        edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
        edge

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index 2b620a1..60f7d80 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -19,7 +19,7 @@
 
 package org.apache.s2graph.core.storage.serde.indexedge.wide
 
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage._
 import org.apache.s2graph.core.types._
@@ -37,7 +37,7 @@ class IndexEdgeDeserializable(graph: Graph,
      //    val degree = Bytes.toLong(kv.value)
      val degree = bytesToLongFunc(kv.value, 0)
      val idxPropsRaw = Array(LabelMeta.degree -> InnerVal.withLong(degree, schemaVer))
-     val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
+     val tgtVertexIdRaw = VertexId(ServiceColumn.Default, InnerVal.withStr("0", schemaVer))
      (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
    }
 
@@ -82,7 +82,7 @@ class IndexEdgeDeserializable(graph: Graph,
      val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = parseRow(kv, schemaVer)
 
      val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
-     val srcVertex = Vertex(srcVertexId, version)
+     val srcVertex = graph.newVertex(srcVertexId, version)
      //TODO:
      val edge = graph.newEdge(srcVertex, null,
        label, labelWithDir.dir, GraphUtil.defaultOpByte, version, Edge.EmptyState)
@@ -125,10 +125,10 @@ class IndexEdgeDeserializable(graph: Graph,
      val tgtVertexId =
        if (edge.checkProperty(LabelMeta.to.name)) {
          val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs
-         TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+         TargetVertexId(ServiceColumn.Default, vId.innerVal)
        } else tgtVertexIdRaw
 
-     edge.tgtVertex = Vertex(tgtVertexId, version)
+     edge.tgtVertex = graph.newVertex(tgtVertexId, version)
      edge.op = op
      edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer))
      edge

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index 37aafcf..6c1906e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.storage.serde.snapshotedge.tall
 
 import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
+import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelIndex, LabelMeta}
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable}
 import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId}
@@ -62,8 +62,8 @@ class SnapshotEdgeDeserializable(graph: Graph) extends Deserializable[SnapshotEd
       (e.srcVertex.innerId, e.tgtVertex.innerId, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
     }.getOrElse(parseRowV3(kv, schemaVer))
 
-    val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId)
-    val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId)
+    val srcVertexId = SourceVertexId(ServiceColumn.Default, srcInnerId)
+    val tgtVertexId = SourceVertexId(ServiceColumn.Default, tgtInnerId)
 
     val (props, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) = {
       var pos = 0
@@ -87,8 +87,8 @@ class SnapshotEdgeDeserializable(graph: Graph) extends Deserializable[SnapshotEd
           val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
 
           val pendingEdge =
-            graph.newEdge(Vertex(srcVertexId, cellVersion),
-              Vertex(tgtVertexId, cellVersion),
+            graph.newEdge(graph.newVertex(srcVertexId, cellVersion),
+              graph.newVertex(tgtVertexId, cellVersion),
               label, labelWithDir.dir, pendingEdgeOp,
               cellVersion, pendingEdgeProps.toMap,
               statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
@@ -98,7 +98,7 @@ class SnapshotEdgeDeserializable(graph: Graph) extends Deserializable[SnapshotEd
       (kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
     }
 
-    graph.newSnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
+    graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
       label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
       pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index 1d4e195..64b2e31 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -73,8 +73,8 @@ class SnapshotEdgeDeserializable(graph: Graph) extends Deserializable[SnapshotEd
           val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
 
           val pendingEdge =
-            graph.newEdge(Vertex(srcVertexId, cellVersion),
-              Vertex(tgtVertexId, cellVersion),
+            graph.newEdge(graph.newVertex(srcVertexId, cellVersion),
+              graph.newVertex(tgtVertexId, cellVersion),
               label, labelWithDir.dir, pendingEdgeOp,
               cellVersion, pendingEdgeProps.toMap,
               statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
@@ -84,7 +84,7 @@ class SnapshotEdgeDeserializable(graph: Graph) extends Deserializable[SnapshotEd
       (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
     }
 
-    graph.newSnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
+    graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts),
       label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
       pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
index 3ec17ab..6e2311f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
@@ -23,11 +23,12 @@ import org.apache.s2graph.core.mysqls.Label
 import org.apache.s2graph.core.storage.StorageDeserializable._
 import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable}
 import org.apache.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
-import org.apache.s2graph.core.{QueryParam, Vertex}
+import org.apache.s2graph.core.{Graph, QueryParam, Vertex}
 
 import scala.collection.mutable.ListBuffer
 
-class VertexDeserializable(bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[Vertex] {
+class VertexDeserializable(graph: Graph,
+                           bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[Vertex] {
   def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
                                           _kvs: Seq[T],
                                           version: String,
@@ -61,6 +62,6 @@ class VertexDeserializable(bytesToInt: (Array[Byte], Int) => Int = bytesToInt) e
       }
     }
     assert(maxTs != Long.MinValue)
-    Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds)
+    graph.newVertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
index 24b30fb..a949f3e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/VertexId.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.core.types
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.mysqls.ServiceColumn
 import org.apache.s2graph.core.types.HBaseType._
 
 object VertexId extends HBaseDeserializable {
@@ -35,42 +36,44 @@ object VertexId extends HBaseDeserializable {
     val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true)
     pos += numOfBytesUsed
     val colId = Bytes.toInt(bytes, pos, 4)
-    (VertexId(colId, innerId), GraphUtil.bytesForMurMurHash + numOfBytesUsed + 4)
+    val column = ServiceColumn.findById(colId)
+    (VertexId(column, innerId), GraphUtil.bytesForMurMurHash + numOfBytesUsed + 4)
   }
 
-  def apply(colId: Int, innerId: InnerValLike): VertexId = new VertexId(colId, innerId)
+  def apply(column: ServiceColumn, innerId: InnerValLike): VertexId = new VertexId(column, innerId)
 
   def toSourceVertexId(vid: VertexId) = {
-    SourceVertexId(vid.colId, vid.innerId)
+    SourceVertexId(vid.column, vid.innerId)
   }
 
   def toTargetVertexId(vid: VertexId) = {
-    TargetVertexId(vid.colId, vid.innerId)
+    TargetVertexId(vid.column, vid.innerId)
   }
 }
 
-class VertexId protected (val colId: Int, val innerId: InnerValLike) extends HBaseSerializable {
+class VertexId protected (val column: ServiceColumn, val innerId: InnerValLike) extends HBaseSerializable {
   val storeHash: Boolean = true
   val storeColId: Boolean = true
+  val colId = column.id.get
   lazy val hashBytes =
 //    if (storeHash) Bytes.toBytes(GraphUtil.murmur3(innerId.toString))
     if (storeHash) Bytes.toBytes(GraphUtil.murmur3(innerId.toIdString()))
     else Array.empty[Byte]
 
   lazy val colIdBytes: Array[Byte] =
-    if (storeColId) Bytes.toBytes(colId)
+    if (storeColId) Bytes.toBytes(column.id.get)
     else Array.empty[Byte]
 
   def bytes: Array[Byte] = Bytes.add(hashBytes, innerId.bytes, colIdBytes)
 
   override def toString(): String = {
-    colId.toString() + "," + innerId.toString()
+    column.id.get.toString() + "," + innerId.toString()
 //    s"VertexId($colId, $innerId)"
   }
 
   override def hashCode(): Int = {
     val ret = if (storeColId) {
-      colId * 31 + innerId.hashCode()
+      column.id.get * 31 + innerId.hashCode()
     } else {
       innerId.hashCode()
     }
@@ -105,14 +108,14 @@ object SourceVertexId extends HBaseDeserializable {
     val pos = offset + GraphUtil.bytesForMurMurHash
     val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, len, version, isVertexId = true)
 
-    (SourceVertexId(DEFAULT_COL_ID, innerId), GraphUtil.bytesForMurMurHash + numOfBytesUsed)
+    (SourceVertexId(ServiceColumn.Default, innerId), GraphUtil.bytesForMurMurHash + numOfBytesUsed)
   }
 
 }
 
 
-case class SourceVertexId(override val colId: Int,
-                          override val innerId: InnerValLike) extends VertexId(colId, innerId) {
+case class SourceVertexId(override val column: ServiceColumn,
+                          override val innerId: InnerValLike) extends VertexId(column, innerId) {
   override val storeColId: Boolean = false
 }
 
@@ -124,13 +127,13 @@ object TargetVertexId extends HBaseDeserializable {
                 version: String = DEFAULT_VERSION): (VertexId, Int) = {
     /* murmur has is not prepended so start from offset */
     val (innerId, numOfBytesUsed) = InnerVal.fromBytes(bytes, offset, len, version, isVertexId = true)
-    (TargetVertexId(DEFAULT_COL_ID, innerId), numOfBytesUsed)
+    (TargetVertexId(ServiceColumn.Default, innerId), numOfBytesUsed)
   }
 }
 
-case class TargetVertexId(override val colId: Int,
+case class TargetVertexId(override val column: ServiceColumn,
                           override val innerId: InnerValLike)
-  extends  VertexId(colId, innerId) {
+  extends  VertexId(column, innerId) {
   override val storeColId: Boolean = false
   override val storeHash: Boolean = false
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
index 3032d9e..55b796d 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/EdgeTest.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core
 
 import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.mysqls.{ServiceColumn, LabelMeta}
 import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs, VertexId}
 import org.apache.s2graph.core.utils.logger
 import org.scalatest.FunSuite
@@ -65,8 +65,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
 
   test("buildOperation") {
     val schemaVersion = "v2"
-    val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion))
-    val srcVertex = Vertex(vertexId)
+    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
+    val srcVertex = graph.newVertex(vertexId)
     val tgtVertex = srcVertex
 
     val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
@@ -92,8 +92,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
 
   test("buildMutation: snapshotEdge: None with newProps") {
     val schemaVersion = "v2"
-    val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion))
-    val srcVertex = Vertex(vertexId)
+    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
+    val srcVertex = graph.newVertex(vertexId)
     val tgtVertex = srcVertex
 
     val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
@@ -119,8 +119,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
 
   test("buildMutation: oldPropsWithTs == newPropsWithTs, Drop all requests") {
     val schemaVersion = "v2"
-    val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion))
-    val srcVertex = Vertex(vertexId)
+    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
+    val srcVertex = graph.newVertex(vertexId)
     val tgtVertex = srcVertex
 
     val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
@@ -143,8 +143,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
 
   test("buildMutation: All props older than snapshotEdge's LastDeletedAt") {
     val schemaVersion = "v2"
-    val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion))
-    val srcVertex = Vertex(vertexId)
+    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
+    val srcVertex = graph.newVertex(vertexId)
     val tgtVertex = srcVertex
 
     val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)
@@ -177,8 +177,8 @@ class EdgeTest extends FunSuite with TestCommon with TestCommonWithModels {
 
   test("buildMutation: All props newer than snapshotEdge's LastDeletedAt") {
     val schemaVersion = "v2"
-    val vertexId = VertexId(0, InnerVal.withStr("dummy", schemaVersion))
-    val srcVertex = Vertex(vertexId)
+    val vertexId = VertexId(ServiceColumn.Default, InnerVal.withStr("dummy", schemaVersion))
+    val srcVertex = graph.newVertex(vertexId)
     val tgtVertex = srcVertex
 
     val timestampProp = LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 1)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
index f58b192..9888b7e 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/Integrate/QueryTest.scala
@@ -70,7 +70,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
 
   def getQuery(id: Int, where: String): Query =
     Query(
-      vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)),
+      vertices = Seq(graph.toVertex(testServiceName, testColumnName, id)),
       steps = Vector(
         Step(Seq(QueryParam(testLabelName, where = Where(testLabelName, where))))
       )
@@ -78,7 +78,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
 
   def queryIntervalWithParent(id: Int, index: String, prop: String, value: String) =
     Query(
-      vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)),
+      vertices = Seq(graph.toVertex(testServiceName, testColumnName, id)),
       steps = Vector(
         Step(Seq(QueryParam(testLabelName, indexName = index))),
         Step(Seq(QueryParam(testLabelName, indexName = index,
@@ -91,7 +91,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
                                    prop: String, value: String,
                                    toProp: String, toValue: String) =
     Query(
-      vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)),
+      vertices = Seq(graph.toVertex(testServiceName, testColumnName, id)),
       steps = Vector(
         Step(Seq(QueryParam(testLabelName, indexName = index))),
         Step(Seq(QueryParam(testLabelName, indexName = index,
@@ -102,7 +102,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
 
   def queryWithInterval(id: Int, index: String, prop: String, fromVal: Int, toVal: Int) =
     Query(
-      vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)),
+      vertices = Seq(graph.toVertex(testServiceName, testColumnName, id)),
       steps = Vector(
         Step(Seq(QueryParam(testLabelName, indexName = index,
           intervalOpt = Option(Seq(prop -> JsNumber(fromVal)), Seq(prop -> JsNumber(toVal))))))
@@ -111,7 +111,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
 
   def queryExclude(id: Int) =
     Query(
-      vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)),
+      vertices = Seq(graph.toVertex(testServiceName, testColumnName, id)),
       steps = Vector(
         Step(
           Seq(
@@ -124,7 +124,7 @@ class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
 
   def queryGroupBy(id: Int, props: Seq[String]) =
     Query(
-      vertices = Seq(Vertex.toVertex(testServiceName, testColumnName, id)),
+      vertices = Seq(graph.toVertex(testServiceName, testColumnName, id)),
       steps = Vector(
         Step(
           Seq(QueryParam(testLabelName))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
index 3cb216c..03ea50a 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/benchmark/GraphUtilSpec.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.core.benchmark
 
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.s2graph.core.GraphUtil
+import org.apache.s2graph.core.mysqls.ServiceColumn
 import org.apache.s2graph.core.types.{HBaseType, InnerVal, SourceVertexId}
 
 import scala.collection.mutable
@@ -85,7 +86,7 @@ class GraphUtilSpec extends BenchmarkCommon {
       stats += (0 -> (rangeBytes.head -> 0L))
 
       for (i <- (0L until testNum)) {
-        val vertexId = SourceVertexId(DEFAULT_COL_ID, InnerVal.withLong(i, HBaseType.DEFAULT_VERSION))
+        val vertexId = SourceVertexId(ServiceColumn.Default, InnerVal.withLong(i, HBaseType.DEFAULT_VERSION))
         val bytes = vertexId.bytes
         val shortKey = GraphUtil.murmur3(vertexId.innerId.toIdString())
         val shortVal = counts.getOrElse(shortKey, 0L) + 1L

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/189bc41e/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
index 9576af2..cb6090e 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/parsers/WhereParserTest.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.parsers
 
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.mysqls.{ServiceColumn, Label, LabelMeta}
 import org.apache.s2graph.core.rest.TemplateHelper
 import org.apache.s2graph.core.types._
 import org.apache.s2graph.core.utils.logger
@@ -62,13 +62,13 @@ class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels {
   def ids = for {
     version <- Seq(VERSION1, VERSION2)
   } yield {
-    val srcId = SourceVertexId(0, InnerVal.withLong(1, version))
+    val srcId = SourceVertexId(ServiceColumn.Default, InnerVal.withLong(1, version))
     val tgtId =
-      if (version == VERSION2) TargetVertexId(0, InnerVal.withStr("2", version))
-      else TargetVertexId(0, InnerVal.withLong(2, version))
+      if (version == VERSION2) TargetVertexId(ServiceColumn.Default, InnerVal.withStr("2", version))
+      else TargetVertexId(ServiceColumn.Default, InnerVal.withLong(2, version))
 
-    val srcVertex = Vertex(srcId, ts)
-    val tgtVertex = Vertex(tgtId, ts)
+    val srcVertex = graph.newVertex(srcId, ts)
+    val tgtVertex = graph.newVertex(tgtId, ts)
     val (_label, dir) = if (version == VERSION2) (labelV2, labelWithDirV2.dir) else (label, labelWithDir.dir)
 
     (srcVertex, tgtVertex, _label, dir)