+package com.kakao.s2graph.core.Integrate
+import org.scalatest.BeforeAndAfterEach
+import play.api.libs.json._
+class QueryTest extends IntegrateCommon with BeforeAndAfterEach {
+  import TestUtil._
+  val insert = "insert"
+  val e = "e"
+  val weight = "weight"
+  val is_hidden = "is_hidden"
+  test("interval") {
+    def queryWithInterval(id: Int, index: String, prop: String, fromVal: Int, toVal: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$testColumnName",
+            "id": $id
+           }],
+          "steps": [
+          [ {
+              "label": "$testLabelName",
+              "index": "$index",
+              "interval": {
+                  "from": [ { "$prop": $fromVal } ],
+                  "to": [ { "$prop": $toVal } ]
+              }
+            }
+          ]]
+        }
+        """)
+    var edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 1001)) // test interval on timestamp index
+    (edges \ "size").toString should be("1")
+    edges = getEdgesSync(queryWithInterval(0, index2, "_timestamp", 1000, 2000)) // test interval on timestamp index
+    (edges \ "size").toString should be("2")
+    edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 11)) // test interval on weight index
+    (edges \ "size").toString should be("1")
+    edges = getEdgesSync(queryWithInterval(2, index1, "weight", 10, 20)) // test interval on weight index
+    (edges \ "size").toString should be("2")
+  }
+  test("get edge with where condition") {
+    def queryWhere(id: Int, where: String) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "${testServiceName}",
+            "columnName": "${testColumnName}",
+            "id": ${id}
+           }],
+          "steps": [
+          [ {
+              "label": "${testLabelName}",
+              "direction": "out",
+              "offset": 0,
+              "limit": 100,
+              "where": "${where}"
+            }
+          ]]
+        }""")
+    var result = getEdgesSync(queryWhere(0, "is_hidden=false and _from in (-1, 0)"))
+    (result \ "results").as[List[JsValue]].size should be(1)
+    result = getEdgesSync(queryWhere(0, "is_hidden=true and _to in (1)"))
+    (result \ "results").as[List[JsValue]].size should be(1)
+    result = getEdgesSync(queryWhere(0, "_from=0"))
+    (result \ "results").as[List[JsValue]].size should be(2)
+    result = getEdgesSync(queryWhere(2, "_from=2 or weight in (-1)"))
+    (result \ "results").as[List[JsValue]].size should be(2)
+    result = getEdgesSync(queryWhere(2, "_from=2 and weight in (10, 20)"))
+    (result \ "results").as[List[JsValue]].size should be(2)
+  }
+  test("get edge exclude") {
+    def queryExclude(id: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "${testServiceName}",
+            "columnName": "${testColumnName}",
+            "id": ${id}
+           }],
+          "steps": [
+          [ {
+              "label": "${testLabelName}",
+              "direction": "out",
+              "offset": 0,
+              "limit": 2
+            },
+            {
+              "label": "${testLabelName}",
+              "direction": "in",
+              "offset": 0,
+              "limit": 2,
+              "exclude": true
+            }
+          ]]
+        }""")
+    val result = getEdgesSync(queryExclude(0))
+    (result \ "results").as[List[JsValue]].size should be(1)
+  }
+  test("get edge groupBy property") {
+    def queryGroupBy(id: Int, props: Seq[String]): JsValue = {
+      Json.obj(
+        "groupBy" -> props,
+        "srcVertices" -> Json.arr(
+          Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id)
+        ),
+        "steps" -> Json.arr(
+          Json.obj(
+            "step" -> Json.arr(
+              Json.obj(
+                "label" -> testLabelName
+              )
+            )
+          )
+        )
+      )
+    }
+    val result = getEdgesSync(queryGroupBy(0, Seq("weight")))
+    (result \ "size").as[Int] should be(2)
+    val weights = (result \\ "groupBy").map { js =>
+      (js \ "weight").as[Int]
+    }
+    weights should contain(30)
+    weights should contain(40)
+    weights should not contain (10)
+  }
+  test("edge transform") {
+    def queryTransform(id: Int, transforms: String) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "${testServiceName}",
+            "columnName": "${testColumnName}",
+            "id": ${id}
+           }],
+          "steps": [
+          [ {
+              "label": "${testLabelName}",
+              "direction": "out",
+              "offset": 0,
+              "transform": $transforms
+            }
+          ]]
+        }""")
+    var result = getEdgesSync(queryTransform(0, "[[\"_to\"]]"))
+    (result \ "results").as[List[JsValue]].size should be(2)
+    result = getEdgesSync(queryTransform(0, "[[\"weight\"]]"))
+    (result \\ "to").map(_.toString).sorted should be((result \\ "weight").map(_.toString).sorted)
+    result = getEdgesSync(queryTransform(0, "[[\"_from\"]]"))
+    val results = (result \ "results").as[JsValue]
+    (result \\ "to").map(_.toString).sorted should be((results \\ "from").map(_.toString).sorted)
+  }
+  test("index") {
+    def queryIndex(ids: Seq[Int], indexName: String) = {
+      val $from = Json.arr(
+        Json.obj("serviceName" -> testServiceName,
+          "columnName" -> testColumnName,
+          "ids" -> ids))
+      val $step = Json.arr(Json.obj("label" -> testLabelName, "index" -> indexName))
+      val $steps = Json.arr(Json.obj("step" -> $step))
+      val js = Json.obj("withScore" -> false, "srcVertices" -> $from, "steps" -> $steps)
+      js
+    }
+    // weight order
+    var result = getEdgesSync(queryIndex(Seq(0), "idx_1"))
+    ((result \ "results").as[List[JsValue]].head \\ "weight").head should be(JsNumber(40))
+    // timestamp order
+    result = getEdgesSync(queryIndex(Seq(0), "idx_2"))
+    ((result \ "results").as[List[JsValue]].head \\ "weight").head should be(JsNumber(30))
+  }
+  //    "checkEdges" in {
+  //      running(FakeApplication()) {
+  //        val json = Json.parse( s"""
+  //         [{"from": 0, "to": 1, "label": "$testLabelName"},
+  //          {"from": 0, "to": 2, "label": "$testLabelName"}]
+  //        """)
+  //
+  //        def checkEdges(queryJson: JsValue): JsValue = {
+  //          val ret = route(FakeRequest(POST, "/graphs/checkEdges").withJsonBody(queryJson)).get
+  //          contentAsJson(ret)
+  //        }
+  //
+  //        val res = checkEdges(json)
+  //        val typeRes = res.isInstanceOf[JsArray]
+  //        typeRes must equalTo(true)
+  //
+  //        val fst =[Seq[JsValue]].head \ "to"
+  //[Int] must equalTo(1)
+  //
+  //        val snd =[Seq[JsValue]].last \ "to"
+  //[Int] must equalTo(2)
+  //      }
+  //    }
+  test("return tree") {
+    def queryParents(id: Long) = Json.parse(
+      s"""
+        {
+          "returnTree": true,
+          "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$testColumnName",
+            "id": $id
+           }],
+          "steps": [
+          [ {
+              "label": "$testLabelName",
+              "direction": "out",
+              "offset": 0,
+              "limit": 2
+            }
+          ],[{
+              "label": "$testLabelName",
+              "direction": "in",
+              "offset": 0,
+              "limit": -1
+            }
+          ]]
+        }""".stripMargin)
+    val src = 100
+    val tgt = 200
+    insertEdgesSync(toEdge(1001, "insert", "e", src, tgt, testLabelName))
+    val result = TestUtil.getEdgesSync(queryParents(src))
+    val parents = (result \ "results").as[Seq[JsValue]]
+    val ret = parents.forall {
+      edge => (edge \ "parents").as[Seq[JsValue]].size == 1
+    }
+    ret should be(true)
+  }
+  test("pagination and _to") {
+    def querySingleWithTo(id: Int, offset: Int = 0, limit: Int = 100, to: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "${testServiceName}",
+            "columnName": "${testColumnName}",
+            "id": ${id}
+           }],
+          "steps": [
+          [ {
+              "label": "${testLabelName}",
+              "direction": "out",
+              "offset": $offset,
+              "limit": $limit,
+              "_to": $to
+            }
+          ]]
+        }
+        """)
+    val src = System.currentTimeMillis().toInt
+    val bulkEdges = Seq(
+      toEdge(1001, insert, e, src, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)),
+      toEdge(2002, insert, e, src, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)),
+      toEdge(3003, insert, e, src, 3, testLabelName, Json.obj(weight -> 30)),
+      toEdge(4004, insert, e, src, 4, testLabelName, Json.obj(weight -> 40))
+    )
+    insertEdgesSync(bulkEdges: _*)
+    var result = getEdgesSync(querySingle(src, offset = 0, limit = 2))
+    var edges = (result \ "results").as[List[JsValue]]
+    edges.size should be(2)
+    (edges(0) \ "to").as[Long] should be(4)
+    (edges(1) \ "to").as[Long] should be(3)
+    result = getEdgesSync(querySingle(src, offset = 1, limit = 2))
+    edges = (result \ "results").as[List[JsValue]]
+    edges.size should be(2)
+    (edges(0) \ "to").as[Long] should be(3)
+    (edges(1) \ "to").as[Long] should be(2)
+    result = getEdgesSync(querySingleWithTo(src, offset = 0, limit = -1, to = 1))
+    edges = (result \ "results").as[List[JsValue]]
+    edges.size should be(1)
+  }
+  test("order by") {
+    def queryScore(id: Int, scoring: Map[String, Int]): JsValue = Json.obj(
+      "srcVertices" -> Json.arr(
+        Json.obj(
+          "serviceName" -> testServiceName,
+          "columnName" -> testColumnName,
+          "id" -> id
+        )
+      ),
+      "steps" -> Json.arr(
+        Json.obj(
+          "step" -> Json.arr(
+            Json.obj(
+              "label" -> testLabelName,
+              "scoring" -> scoring
+            )
+          )
+        )
+      )
+    )
+    def queryOrderBy(id: Int, scoring: Map[String, Int], props: Seq[Map[String, String]]): JsValue = Json.obj(
+      "orderBy" -> props,
+      "srcVertices" -> Json.arr(
+        Json.obj("serviceName" -> testServiceName, "columnName" -> testColumnName, "id" -> id)
+      ),
+      "steps" -> Json.arr(
+        Json.obj(
+          "step" -> Json.arr(
+            Json.obj(
+              "label" -> testLabelName,
+              "scoring" -> scoring
+            )
+          )
+        )
+      )
+    )
+    val bulkEdges = Seq(
+      toEdge(1001, insert, e, 0, 1, testLabelName, Json.obj(weight -> 10, is_hidden -> true)),
+      toEdge(2002, insert, e, 0, 2, testLabelName, Json.obj(weight -> 20, is_hidden -> false)),
+      toEdge(3003, insert, e, 2, 0, testLabelName, Json.obj(weight -> 30)),
+      toEdge(4004, insert, e, 2, 1, testLabelName, Json.obj(weight -> 40))
+    )
+    insertEdgesSync(bulkEdges: _*)
+    // get edges
+    val edges = getEdgesSync(queryScore(0, Map("weight" -> 1)))
+    val orderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "DESC", "timestamp" -> "DESC"))))
+    val ascOrderByScore = getEdgesSync(queryOrderBy(0, Map("weight" -> 1), Seq(Map("score" -> "ASC", "timestamp" -> "DESC"))))
+    val edgesTo = edges \ "results" \\ "to"
+    val orderByTo = orderByScore \ "results" \\ "to"
+    val ascOrderByTo = ascOrderByScore \ "results" \\ "to"
+    edgesTo should be(Seq(JsNumber(2), JsNumber(1)))
+    edgesTo should be(orderByTo)
+    ascOrderByTo should be(Seq(JsNumber(1), JsNumber(2)))
+    edgesTo.reverse should be(ascOrderByTo)
+  }
+  test("query with sampling") {
+    def queryWithSampling(id: Int, sample: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$testColumnName",
+            "id": $id
+           }],
+          "steps": [
+            {
+              "step": [{
+                "label": "$testLabelName",
+                "direction": "out",
+                "offset": 0,
+                "limit": 100,
+                "sample": $sample
+                }]
+            }
+          ]
+        }""")
+    def twoStepQueryWithSampling(id: Int, sample: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$testColumnName",
+            "id": $id
+           }],
+          "steps": [
+            {
+              "step": [{
+                "label": "$testLabelName",
+                "direction": "out",
+                "offset": 0,
+                "limit": 100,
+                "sample": $sample
+                }]
+            },
+            {
+               "step": [{
+                 "label": "$testLabelName",
+                 "direction": "out",
+                 "offset": 0,
+                 "limit": 100,
+                 "sample": $sample
+               }]
+            }
+          ]
+        }""")
+    def twoQueryWithSampling(id: Int, sample: Int) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$testColumnName",
+            "id": $id
+           }],
+          "steps": [
+            {
+              "step": [{
+                "label": "$testLabelName",
+                "direction": "out",
+                "offset": 0,
+                "limit": 50,
+                "sample": $sample
+              },
+              {
+                "label": "$testLabelName2",
+                "direction": "out",
+                "offset": 0,
+                "limit": 50
+              }]
+            }
+          ]
+        }""")
+    val sampleSize = 2
+    val ts = "1442985659166"
+    val testId = 22
+    val bulkEdges = Seq(
+      toEdge(ts, insert, e, testId, 122, testLabelName),
+      toEdge(ts, insert, e, testId, 222, testLabelName),
+      toEdge(ts, insert, e, testId, 322, testLabelName),
+      toEdge(ts, insert, e, testId, 922, testLabelName2),
+      toEdge(ts, insert, e, testId, 222, testLabelName2),
+      toEdge(ts, insert, e, testId, 322, testLabelName2),
+      toEdge(ts, insert, e, 122, 1122, testLabelName),
+      toEdge(ts, insert, e, 122, 1222, testLabelName),
+      toEdge(ts, insert, e, 122, 1322, testLabelName),
+      toEdge(ts, insert, e, 222, 2122, testLabelName),
+      toEdge(ts, insert, e, 222, 2222, testLabelName),
+      toEdge(ts, insert, e, 222, 2322, testLabelName),
+      toEdge(ts, insert, e, 322, 3122, testLabelName),
+      toEdge(ts, insert, e, 322, 3222, testLabelName),
+      toEdge(ts, insert, e, 322, 3322, testLabelName)
+    )
+    insertEdgesSync(bulkEdges: _*)
+    val result1 = getEdgesSync(queryWithSampling(testId, sampleSize))
+    (result1 \ "results").as[List[JsValue]].size should be(math.min(sampleSize, bulkEdges.size))
+    val result2 = getEdgesSync(twoStepQueryWithSampling(testId, sampleSize))
+    (result2 \ "results").as[List[JsValue]].size should be(math.min(sampleSize * sampleSize, bulkEdges.size * bulkEdges.size))
+    val result3 = getEdgesSync(twoQueryWithSampling(testId, sampleSize))
+    (result3 \ "results").as[List[JsValue]].size should be(sampleSize + 3) // edges in testLabelName2 = 3
+  }
+  def querySingle(id: Int, offset: Int = 0, limit: Int = 100) = Json.parse(
+    s"""
+          { "srcVertices": [
+            { "serviceName": "$testServiceName",
+              "columnName": "$testColumnName",
+              "id": $id
+             }],
+            "steps": [
+            [ {
+                "label": "$testLabelName",
+                "direction": "out",
+                "offset": $offset,
+                "limit": $limit
+              }
+            ]]
+          }
+          """)
+  // called by each test, each
+  override def beforeEach = initTestData()
+  // called by start test, once
+  override def initTestData(): Unit = {
+    super.initTestData()
+    insertEdgesSync(
+      toEdge(1000, insert, e, 0, 1, testLabelName, Json.obj(weight -> 40, is_hidden -> true)),
+      toEdge(2000, insert, e, 0, 2, testLabelName, Json.obj(weight -> 30, is_hidden -> false)),
+      toEdge(3000, insert, e, 2, 0, testLabelName, Json.obj(weight -> 20)),
+      toEdge(4000, insert, e, 2, 1, testLabelName, Json.obj(weight -> 10)),
+      toEdge(3000, insert, e, 10, 20, testLabelName, Json.obj(weight -> 20)),
+      toEdge(4000, insert, e, 20, 20, testLabelName, Json.obj(weight -> 10)),
+      toEdge(1, insert, e, -1, 1000, testLabelName),
+      toEdge(1, insert, e, -1, 2000, testLabelName),
+      toEdge(1, insert, e, -1, 3000, testLabelName),
+      toEdge(1, insert, e, 1000, 10000, testLabelName),
+      toEdge(1, insert, e, 1000, 11000, testLabelName),
+      toEdge(1, insert, e, 2000, 11000, testLabelName),
+      toEdge(1, insert, e, 2000, 12000, testLabelName),
+      toEdge(1, insert, e, 3000, 12000, testLabelName),
+      toEdge(1, insert, e, 3000, 13000, testLabelName),
+      toEdge(1, insert, e, 10000, 100000, testLabelName),
+      toEdge(2, insert, e, 11000, 200000, testLabelName),
+      toEdge(3, insert, e, 12000, 300000, testLabelName)
+    )
+  }
+package com.kakao.s2graph.core.Integrate
+import java.util.concurrent.TimeUnit
+import play.api.libs.json.{JsValue, Json}
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.Random
+class StrongLabelDeleteTest extends IntegrateCommon {
+  import StrongDeleteUtil._
+  import TestUtil._
+  test("Strong consistency select") {
+    insertEdgesSync(bulkEdges(): _*)
+    var result = getEdgesSync(query(0))
+    (result \ "results").as[List[JsValue]].size should be(2)
+    result = getEdgesSync(query(10))
+    (result \ "results").as[List[JsValue]].size should be(2)
+  }
+  test("Strong consistency deleteAll") {
+    val deletedAt = 100
+    var result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(3)
+    val deleteParam = Json.arr(
+      Json.obj("label" -> testLabelName2,
+        "direction" -> "in",
+        "ids" -> Json.arr("20"),
+        "timestamp" -> deletedAt))
+    deleteAllSync(deleteParam)
+    result = getEdgesSync(query(11, direction = "out"))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(0)
+    result = getEdgesSync(query(12, direction = "out"))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(0)
+    result = getEdgesSync(query(10, direction = "out"))
+    println(result)
+    // 10 -> out -> 20 should not be in result.
+    (result \ "results").as[List[JsValue]].size should be(1)
+    (result \\ "to").size should be(1)
+    (result \\ "to")[String] should be("21")
+    result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(0)
+    insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*)
+    result = getEdgesSync(query(20, direction = "in", columnName = testTgtColumnName))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(3)
+  }
+  test("update delete") {
+    val ret = for {
+      i <- 0 until testNum
+    } yield {
+      val src = System.currentTimeMillis()
+      val (ret, last) = testInner(i, src)
+      ret should be(true)
+      ret
+    }
+    ret.forall(identity)
+  }
+  test("update delete 2") {
+    val src = System.currentTimeMillis()
+    var ts = 0L
+    val ret = for {
+      i <- 0 until testNum
+    } yield {
+      val (ret, lastTs) = testInner(ts, src)
+      val deletedAt = lastTs + 1
+      val deletedAt2 = lastTs + 2
+      ts = deletedAt2 + 1 // nex start ts
+      ret should be(true)
+      val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt))
+      val deleteAllRequest2 = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt2))
+      val deleteRet = deleteAllSync(deleteAllRequest)
+      val deleteRet2 = deleteAllSync(deleteAllRequest2)
+      val result = getEdgesSync(query(id = src))
+      println(result)
+      val resultEdges = (result \ "results").as[Seq[JsValue]]
+      resultEdges.isEmpty should be(true)
+      val degreeAfterDeleteAll = getDegree(result)
+      degreeAfterDeleteAll should be(0)
+      degreeAfterDeleteAll === (0)
+    }
+    ret.forall(identity)
+  }
+  /** This test stress out test on degree
+    * when contention is low but number of adjacent edges are large
+    * Large set of contention test
+  */
+  test("large degrees") {
+    val labelName = testLabelName2
+    val dir = "out"
+    val maxSize = 100
+    val deleteSize = 10
+    val numOfConcurrentBatch = 100
+    val src = System.currentTimeMillis()
+    val tgts = (0 until maxSize).map { ith => src + ith }
+    val deleteTgts = Random.shuffle(tgts).take(deleteSize)
+    val insertRequests = { tgt =>
+      Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
+    }
+    val deleteRequests = deleteTgts.take(deleteSize).map { tgt =>
+      Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t")
+    }
+    val allRequests = Random.shuffle(insertRequests ++ deleteRequests)
+    //        val allRequests = insertRequests ++ deleteRequests
+    val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests =>
+      insertEdgesAsync(bulkRequests: _*)
+    }
+    Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES))
+    val expectedDegree = insertRequests.size - deleteRequests.size
+    val queryJson = query(id = src)
+    val result = getEdgesSync(queryJson)
+    val resultSize = (result \ "size").as[Long]
+    val resultDegree = getDegree(result)
+    //        println(result)
+    val ret = resultSize == expectedDegree && resultDegree == resultSize
+    println(s"[MaxSize]: $maxSize")
+    println(s"[DeleteSize]: $deleteSize")
+    println(s"[ResultDegree]: $resultDegree")
+    println(s"[ExpectedDegree]: $expectedDegree")
+    println(s"[ResultSize]: $resultSize")
+    ret should be(true)
+  }
+  test("deleteAll") {
+    val labelName = testLabelName2
+    val dir = "out"
+    val maxSize = 100
+    val deleteSize = 10
+    val numOfConcurrentBatch = 100
+    val src = System.currentTimeMillis()
+    val tgts = (0 until maxSize).map { ith => src + ith }
+    val deleteTgts = Random.shuffle(tgts).take(deleteSize)
+    val insertRequests = { tgt =>
+      Seq(tgt, "insert", "e", src, tgt, labelName, "{}", dir).mkString("\t")
+    }
+    val deleteRequests = deleteTgts.take(deleteSize).map { tgt =>
+      Seq(tgt + 1000, "delete", "e", src, tgt, labelName, "{}", dir).mkString("\t")
+    }
+    val allRequests = Random.shuffle(insertRequests ++ deleteRequests)
+    val futures = allRequests.grouped(numOfConcurrentBatch).map { bulkRequests =>
+      insertEdgesAsync(bulkRequests: _*)
+    }
+    Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES))
+    val deletedAt = System.currentTimeMillis()
+    val deleteAllRequest = Json.arr(Json.obj("label" -> labelName, "ids" -> Json.arr(src), "timestamp" -> deletedAt))
+    deleteAllSync(deleteAllRequest)
+    val result = getEdgesSync(query(id = src))
+    println(result)
+    val resultEdges = (result \ "results").as[Seq[JsValue]]
+    resultEdges.isEmpty should be(true)
+    val degreeAfterDeleteAll = getDegree(result)
+    degreeAfterDeleteAll should be(0)
+  }
+  object StrongDeleteUtil {
+    val labelName = testLabelName2
+    val maxTgtId = 10
+    val batchSize = 10
+    val testNum = 3
+    val numOfBatch = 10
+    def testInner(startTs: Long, src: Long) = {
+      val labelName = testLabelName2
+      val lastOps = Array.fill(maxTgtId)("none")
+      var currentTs = startTs
+      val allRequests = for {
+        ith <- 0 until numOfBatch
+        jth <- 0 until batchSize
+      } yield {
+        currentTs += 1
+        val tgt = Random.nextInt(maxTgtId)
+        val op = if (Random.nextDouble() < 0.5) "delete" else "update"
+        lastOps(tgt) = op
+        Seq(currentTs, op, "e", src, src + tgt, labelName, "{}").mkString("\t")
+      }
+      allRequests.foreach(println(_))
+      val futures = Random.shuffle(allRequests).grouped(batchSize).map { bulkRequests =>
+        insertEdgesAsync(bulkRequests: _*)
+      }
+      Await.result(Future.sequence(futures), Duration(20, TimeUnit.MINUTES))
+      val expectedDegree = lastOps.count(op => op != "delete" && op != "none")
+      val queryJson = query(id = src)
+      val result = getEdgesSync(queryJson)
+      val resultSize = (result \ "size").as[Long]
+      val resultDegree = getDegree(result)
+      println(lastOps.toList)
+      println(result)
+      val ret = resultDegree == expectedDegree && resultSize == resultDegree
+      if (!ret) System.err.println(s"[Contention Failed]: $resultDegree, $expectedDegree")
+      (ret, currentTs)
+    }
+    def bulkEdges(startTs: Int = 0) = Seq(
+      toEdge(startTs + 1, "insert", "e", "0", "1", testLabelName2, s"""{"time": 10}"""),
+      toEdge(startTs + 2, "insert", "e", "0", "1", testLabelName2, s"""{"time": 11}"""),
+      toEdge(startTs + 3, "insert", "e", "0", "1", testLabelName2, s"""{"time": 12}"""),
+      toEdge(startTs + 4, "insert", "e", "0", "2", testLabelName2, s"""{"time": 10}"""),
+      toEdge(startTs + 5, "insert", "e", "10", "20", testLabelName2, s"""{"time": 10}"""),
+      toEdge(startTs + 6, "insert", "e", "10", "21", testLabelName2, s"""{"time": 11}"""),
+      toEdge(startTs + 7, "insert", "e", "11", "20", testLabelName2, s"""{"time": 12}"""),
+      toEdge(startTs + 8, "insert", "e", "12", "20", testLabelName2, s"""{"time": 13}""")
+    )
+    def query(id: Long, serviceName: String = testServiceName, columnName: String = testColumnName,
+              labelName: String = testLabelName2, direction: String = "out") = Json.parse(
+      s"""
+          { "srcVertices": [
+            { "serviceName": "$serviceName",
+              "columnName": "$columnName",
+              "id": $id
+             }],
+            "steps": [
+            [ {
+                "label": "$labelName",
+                "direction": "${direction}",
+                "offset": 0,
+                "limit": -1,
+                "duplicate": "raw"
+              }
+            ]]
+          }""")
+    def getDegree(jsValue: JsValue): Long = {
+      ((jsValue \ "degrees") \\ "_degree")[Long]).getOrElse(0L)
+    }
+  }
+package com.kakao.s2graph.core.Integrate
+import com.kakao.s2graph.core.PostProcess
+import play.api.libs.json.{JsValue, Json}
+import scala.concurrent.Await
+import scala.util.Random
+class VertexTestHelper extends IntegrateCommon {
+  import TestUtil._
+  import VertexTestHelper._
+  test("vertex") {
+    val ids = (7 until 20).map(tcNum => tcNum * 1000 + 0)
+    val (serviceName, columnName) = (testServiceName, testColumnName)
+    val data = vertexInsertsPayload(serviceName, columnName, ids)
+    val payload = Json.parse(Json.toJson(data).toString)
+    println(payload)
+    val vertices = parser.toVertices(payload, "insert", Option(serviceName), Option(columnName))
+    Await.result(graph.mutateVertices(vertices, withWait = true), HttpRequestWaitingTime)
+    val res = graph.getVertices(vertices).map { vertices =>
+      PostProcess.verticesToJson(vertices)
+    }
+    val ret = Await.result(res, HttpRequestWaitingTime)
+    val fetched =[Seq[JsValue]]
+    for {
+      (d, f) <-
+    } yield {
+      (d \ "id") should be(f \ "id")
+      ((d \ "props") \ "age") should be((f \ "props") \ "age")
+    }
+  }
+  object VertexTestHelper {
+    def vertexQueryJson(serviceName: String, columnName: String, ids: Seq[Int]) = {
+      Json.parse(
+        s"""
+           |[
+           |{"serviceName": "$serviceName", "columnName": "$columnName", "ids": [${ids.mkString(",")}
+         ]}
+           |]
+       """.stripMargin)
+    }
+    def vertexInsertsPayload(serviceName: String, columnName: String, ids: Seq[Int]): Seq[JsValue] = {
+ { id =>
+        Json.obj("id" -> id, "props" -> randomProps, "timestamp" -> System.currentTimeMillis())
+      }
+    }
+    val vertexPropsKeys = List(
+      ("age", "int")
+    )
+    def randomProps() = {
+      (for {
+        (propKey, propType) <- vertexPropsKeys
+      } yield {
+        propKey -> Random.nextInt(100)
+      }).toMap
+    }
+  }
+package com.kakao.s2graph.core.Integrate
+import java.util.concurrent.TimeUnit
+import org.scalatest.BeforeAndAfterEach
+import play.api.libs.json.{JsObject, JsValue, Json}
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+class WeakLabelDeleteTest extends IntegrateCommon with BeforeAndAfterEach {
+  import TestUtil._
+  import WeakLabelDeleteHelper._
+  test("test weak consistency select") {
+    var result = getEdgesSync(query(0))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(4)
+    result = getEdgesSync(query(10))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(2)
+  }
+  test("test weak consistency delete") {
+    var result = getEdgesSync(query(0))
+    println(result)
+    /** expect 4 edges */
+    (result \ "results").as[List[JsValue]].size should be(4)
+    val edges = (result \ "results").as[List[JsObject]]
+    val edgesToStore = parser.toEdges(Json.toJson(edges), "delete")
+    val rets = graph.mutateEdges(edgesToStore, withWait = true)
+    Await.result(rets, Duration(20, TimeUnit.MINUTES))
+    /** expect noting */
+    result = getEdgesSync(query(0))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(0)
+    /** insert should be ignored */
+    val edgesToStore2 = parser.toEdges(Json.toJson(edges), "insert")
+    val rets2 = graph.mutateEdges(edgesToStore2, withWait = true)
+    Await.result(rets2, Duration(20, TimeUnit.MINUTES))
+    result = getEdgesSync(query(0))
+    (result \ "results").as[List[JsValue]].size should be(0)
+  }
+  test("test weak consistency deleteAll") {
+    val deletedAt = 100
+    var result = getEdgesSync(query(20, "in", testTgtColumnName))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(3)
+    val json = Json.arr(Json.obj("label" -> testLabelNameWeak,
+      "direction" -> "in", "ids" -> Json.arr("20"), "timestamp" -> deletedAt))
+    println(json)
+    deleteAllSync(json)
+    result = getEdgesSync(query(11, "out"))
+    (result \ "results").as[List[JsValue]].size should be(0)
+    result = getEdgesSync(query(12, "out"))
+    (result \ "results").as[List[JsValue]].size should be(0)
+    result = getEdgesSync(query(10, "out"))
+    // 10 -> out -> 20 should not be in result.
+    (result \ "results").as[List[JsValue]].size should be(1)
+    (result \\ "to").size should be(1)
+    (result \\ "to")[String] should be("21")
+    result = getEdgesSync(query(20, "in", testTgtColumnName))
+    println(result)
+    (result \ "results").as[List[JsValue]].size should be(0)
+    insertEdgesSync(bulkEdges(startTs = deletedAt + 1): _*)
+    result = getEdgesSync(query(20, "in", testTgtColumnName))
+    (result \ "results").as[List[JsValue]].size should be(3)
+  }
+  // called by each test, each
+  override def beforeEach = initTestData()
+  // called by start test, once
+  override def initTestData(): Unit = {
+    super.initTestData()
+    insertEdgesSync(bulkEdges(): _*)
+  }
+  object WeakLabelDeleteHelper {
+    def bulkEdges(startTs: Int = 0) = Seq(
+      toEdge(startTs + 1, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 10}"""),
+      toEdge(startTs + 2, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 11}"""),
+      toEdge(startTs + 3, "insert", "e", "0", "1", testLabelNameWeak, s"""{"time": 12}"""),
+      toEdge(startTs + 4, "insert", "e", "0", "2", testLabelNameWeak, s"""{"time": 10}"""),
+      toEdge(startTs + 5, "insert", "e", "10", "20", testLabelNameWeak, s"""{"time": 10}"""),
+      toEdge(startTs + 6, "insert", "e", "10", "21", testLabelNameWeak, s"""{"time": 11}"""),
+      toEdge(startTs + 7, "insert", "e", "11", "20", testLabelNameWeak, s"""{"time": 12}"""),
+      toEdge(startTs + 8, "insert", "e", "12", "20", testLabelNameWeak, s"""{"time": 13}""")
+    )
+    def query(id: Int, direction: String = "out", columnName: String = testColumnName) = Json.parse(
+      s"""
+        { "srcVertices": [
+          { "serviceName": "$testServiceName",
+            "columnName": "$columnName",
+            "id": ${id}
+           }],
+          "steps": [
+          [ {
+              "label": "${testLabelNameWeak}",
+              "direction": "${direction}",
+              "offset": 0,
+              "limit": 10,
+              "duplicate": "raw"
+            }
+          ]]
+        }""")
+  }
 import com.kakao.s2graph.core.types.{InnerValLike, InnerVal}
 import org.scalatest.{Matchers, FunSuite}
- * Created by shon on 5/30/15.
- */
 class JsonParserTest extends FunSuite with Matchers with TestCommon with JSONParser {
   import types.HBaseType._
 package com.kakao.s2graph.core
-import com.kakao.s2graph.core.OrderingUtil.MultiValueOrdering
+import com.kakao.s2graph.core.OrderingUtil._
 import org.scalatest.{FunSuite, Matchers}
 import play.api.libs.json.JsString
- * Created by hsleep( on 2015. 11. 5..
- */
 class OrderingUtilTest extends FunSuite with Matchers {
   test("test SeqMultiOrdering") {
     val jsLs: Seq[Seq[Any]] = Seq(
@@ -50,7 +47,7 @@ class OrderingUtilTest extends FunSuite with Matchers {
     val ascendingLs: Seq[Boolean] = Seq(false)
-    val resultJsLs = jsLs.sorted(new TupleMultiOrdering[Any](ascendingLs))
+    val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs))
     resultJsLs.toString() should equal(sortedJsLs.toString())
@@ -74,7 +71,7 @@ class OrderingUtilTest extends FunSuite with Matchers {
     val ascendingLs: Seq[Boolean] = Seq(false, true)
-    val resultJsLs = jsLs.sorted(new TupleMultiOrdering[Any](ascendingLs))
+    val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs))
     resultJsLs.toString() should equal(sortedJsLs.toString())
@@ -99,7 +96,7 @@ class OrderingUtilTest extends FunSuite with Matchers {
     val ascendingLs: Seq[Boolean] = Seq(true, true, false)
-    val resultJsLs = jsLs.sorted(new TupleMultiOrdering[Any](ascendingLs))
+    val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs))
     resultJsLs.toString() should equal(sortedJsLs.toString())
@@ -126,7 +123,7 @@ class OrderingUtilTest extends FunSuite with Matchers {
     val ascendingLs: Seq[Boolean] = Seq(true, true, true, false)
-    val resultJsLs = jsLs.sorted(new TupleMultiOrdering[Any](ascendingLs))
+    val resultJsLs = jsLs.sorted(TupleMultiOrdering[Any](ascendingLs))
     resultJsLs.toString() should equal(sortedJsLs.toString())
 package com.kakao.s2graph.core
- import com.kakao.s2graph.core.mysqls._
+import com.kakao.s2graph.core.mysqls._
 //import com.kakao.s2graph.core.models._
@@ -9,13 +9,7 @@ import org.apache.hadoop.hbase.util.Bytes
 import org.hbase.async.{PutRequest, KeyValue}
- * Created by shon on 6/1/15.
- */
 trait TestCommon {
   val ts = System.currentTimeMillis()
   val testServiceId = 1
   val testColumnId = 1
@@ -104,95 +98,95 @@ trait TestCommon {
   val idxPropsWithTsLsV2 = { idxProps => { case (k, v) => k -> InnerValLikeWithTs(v, ts) }
-//  def testOrder(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]],
-//                innerVals: Iterable[InnerValLike], skipHashBytes: Boolean = false)
-//               (createFunc: (Seq[(Byte, InnerValLike)], InnerValLike) => HBaseSerializable,
-//                fromBytesFunc: Array[Byte] => HBaseSerializable) = {
-//    /** check if increasing target vertex id is ordered properly with same indexProps */
-//    val rets = for {
-//      idxProps <- idxPropsLs
-//    } yield {
-//        val head = createFunc(idxProps, innerVals.head)
-//        val start = head
-//        var prev = head
-//        val rets = for {
-//          innerVal <- innerVals.tail
-//        } yield {
-//            val current = createFunc(idxProps, innerVal)
-//            val bytes = current.bytes
-//            val decoded = fromBytesFunc(bytes)
-//            println(s"current: $current")
-//            println(s"decoded: $decoded")
-//            val prevBytes = if (skipHashBytes) prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes
-//            val currentBytes = if (skipHashBytes) bytes.drop(GraphUtil.bytesForMurMurHash) else bytes
-//            val (isSame, orderPreserved) = (current, decoded) match {
-//              case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if ( =>
-//                /** _to is used in indexProps */
-//                ( == && c.op == d.op, Bytes.compareTo(currentBytes, prevBytes) <= 0)
-//              case _ =>
-//                (current == decoded, lessThan(currentBytes, prevBytes))
-//            }
-//            println(s"$current ${bytes.toList}")
-//            println(s"$prev ${prev.bytes.toList}")
-//            println(s"SerDe[$isSame], Order[$orderPreserved]")
-//            prev = current
-//            isSame && orderPreserved
-//          }
-//        rets.forall(x => x)
-//      }
-//    rets.forall(x => x)
-//  }
-//  def testOrderReverse(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]], innerVals: Iterable[InnerValLike],
-//                       skipHashBytes: Boolean = false)
-//                      (createFunc: (Seq[(Byte, InnerValLike)], InnerValLike) => HBaseSerializable,
-//                       fromBytesFunc: Array[Byte] => HBaseSerializable) = {
-//    /** check if increasing target vertex id is ordered properly with same indexProps */
-//    val rets = for {
-//      innerVal <- innerVals
-//    } yield {
-//        val head = createFunc(idxPropsLs.head, innerVal)
-//        val start = head
-//        var prev = head
-//        val rets = for {
-//          idxProps <- idxPropsLs.tail
-//        } yield {
-//            val current = createFunc(idxProps, innerVal)
-//            val bytes = current.bytes
-//            val decoded = fromBytesFunc(bytes)
-//            println(s"current: $current")
-//            println(s"decoded: $decoded")
-//            val prevBytes = if (skipHashBytes) prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes
-//            val currentBytes = if (skipHashBytes) bytes.drop(GraphUtil.bytesForMurMurHash) else bytes
-//            val (isSame, orderPreserved) = (current, decoded) match {
-//              case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if ( =>
-//                /** _to is used in indexProps */
-//                ( == && c.op == d.op, Bytes.compareTo(currentBytes, prevBytes) <= 0)
-//              case _ =>
-//                (current == decoded, lessThan(currentBytes, prevBytes))
-//            }
-//            println(s"$current ${bytes.toList}")
-//            println(s"$prev ${prev.bytes.toList}")
-//            println(s"SerDe[$isSame], Order[$orderPreserved]")
-//            prev = current
-//            isSame && orderPreserved
-//          }
-//        rets.forall(x => x)
-//      }
-//    rets.forall(x => x)
-//  }
-//  def putToKeyValues(put: PutRequest) = {
-//    val ts = put.timestamp()
-//    for ((q, v) <- put.qualifiers().zip(put.values)) yield {
-//      new KeyValue(put.key(),, q, ts, v)
-//    }
-//  }
+  //
+  //  def testOrder(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]],
+  //                innerVals: Iterable[InnerValLike], skipHashBytes: Boolean = false)
+  //               (createFunc: (Seq[(Byte, InnerValLike)], InnerValLike) => HBaseSerializable,
+  //                fromBytesFunc: Array[Byte] => HBaseSerializable) = {
+  //    /** check if increasing target vertex id is ordered properly with same indexProps */
+  //    val rets = for {
+  //      idxProps <- idxPropsLs
+  //    } yield {
+  //        val head = createFunc(idxProps, innerVals.head)
+  //        val start = head
+  //        var prev = head
+  //        val rets = for {
+  //          innerVal <- innerVals.tail
+  //        } yield {
+  //            val current = createFunc(idxProps, innerVal)
+  //            val bytes = current.bytes
+  //            val decoded = fromBytesFunc(bytes)
+  //            println(s"current: $current")
+  //            println(s"decoded: $decoded")
+  //
+  //            val prevBytes = if (skipHashBytes) prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes
+  //            val currentBytes = if (skipHashBytes) bytes.drop(GraphUtil.bytesForMurMurHash) else bytes
+  //            val (isSame, orderPreserved) = (current, decoded) match {
+  //              case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if ( =>
+  //                /** _to is used in indexProps */
+  //                ( == && c.op == d.op, Bytes.compareTo(currentBytes, prevBytes) <= 0)
+  //              case _ =>
+  //                (current == decoded, lessThan(currentBytes, prevBytes))
+  //            }
+  //
+  //            println(s"$current ${bytes.toList}")
+  //            println(s"$prev ${prev.bytes.toList}")
+  //            println(s"SerDe[$isSame], Order[$orderPreserved]")
+  //            prev = current
+  //            isSame && orderPreserved
+  //          }
+  //        rets.forall(x => x)
+  //      }
+  //    rets.forall(x => x)
+  //  }
+  //  def testOrderReverse(idxPropsLs: Seq[Seq[(Byte, InnerValLike)]], innerVals: Iterable[InnerValLike],
+  //                       skipHashBytes: Boolean = false)
+  //                      (createFunc: (Seq[(Byte, InnerValLike)], InnerValLike) => HBaseSerializable,
+  //                       fromBytesFunc: Array[Byte] => HBaseSerializable) = {
+  //    /** check if increasing target vertex id is ordered properly with same indexProps */
+  //    val rets = for {
+  //      innerVal <- innerVals
+  //    } yield {
+  //        val head = createFunc(idxPropsLs.head, innerVal)
+  //        val start = head
+  //        var prev = head
+  //        val rets = for {
+  //          idxProps <- idxPropsLs.tail
+  //        } yield {
+  //            val current = createFunc(idxProps, innerVal)
+  //            val bytes = current.bytes
+  //            val decoded = fromBytesFunc(bytes)
+  //            println(s"current: $current")
+  //            println(s"decoded: $decoded")
+  //
+  //            val prevBytes = if (skipHashBytes) prev.bytes.drop(GraphUtil.bytesForMurMurHash) else prev.bytes
+  //            val currentBytes = if (skipHashBytes) bytes.drop(GraphUtil.bytesForMurMurHash) else bytes
+  //            val (isSame, orderPreserved) = (current, decoded) match {
+  //              case (c: v2.EdgeQualifier, d: v2.EdgeQualifier) if ( =>
+  //                /** _to is used in indexProps */
+  //                ( == && c.op == d.op, Bytes.compareTo(currentBytes, prevBytes) <= 0)
+  //              case _ =>
+  //                (current == decoded, lessThan(currentBytes, prevBytes))
+  //            }
+  //
+  //            println(s"$current ${bytes.toList}")
+  //            println(s"$prev ${prev.bytes.toList}")
+  //            println(s"SerDe[$isSame], Order[$orderPreserved]")
+  //            prev = current
+  //            isSame && orderPreserved
+  //          }
+  //
+  //        rets.forall(x => x)
+  //      }
+  //
+  //    rets.forall(x => x)
+  //  }
+  //
+  //
+  //  def putToKeyValues(put: PutRequest) = {
+  //    val ts = put.timestamp()
+  //    for ((q, v) <- put.qualifiers().zip(put.values)) yield {
+  //      new KeyValue(put.key(),, q, ts, v)
+  //    }
+  //  }
 import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop}
 import com.kakao.s2graph.core.mysqls._
+import scalikejdbc.AutoSession
 //import com.kakao.s2graph.core.models._
 import com.kakao.s2graph.core.types.{InnerVal, LabelWithDirection}
-import com.typesafe.config.ConfigFactory
+import com.typesafe.config.{Config, ConfigFactory}
 import scala.concurrent.ExecutionContext
@@ -16,9 +16,28 @@ trait TestCommonWithModels {
   import InnerVal._
   import types.HBaseType._
-  val config = ConfigFactory.load()
+  var graph: Graph = _
+  var config: Config = _
+  def initTests() = {
+    config = ConfigFactory.load()
+    graph = new Graph(config)(
+    implicit val session = AutoSession
+    deleteTestLabel()
+    deleteTestService()
+    createTestService()
+    createTestLabel()
+  }
+  def zkQuorum = config.getString("hbase.zookeeper.quorum")
+  def cluster = config.getString("hbase.zookeeper.quorum")
+  implicit val session = AutoSession
-  val zkQuorum = config.getString("hbase.zookeeper.quorum")
   val serviceName = "_test_service"
   val serviceNameV2 = "_test_service_v2"
   val columnName = "user_id"
@@ -31,7 +50,6 @@ trait TestCommonWithModels {
   val tgtColumnType = "string"
   val tgtColumnTypeV2 = "string"
-  val cluster = config.getString("hbase.zookeeper.quorum")
   val hTableName = "_test_cases"
   val preSplitSize = 0
   val labelName = "_test_label"
@@ -54,37 +72,29 @@ trait TestCommonWithModels {
   val consistencyLevel = "strong"
   val hTableTTL = None
-  val graph = new Graph(config)(
-  def initTests() = {
-    deleteTestLabel()
-    deleteTestService()
-    Thread.sleep(1000)
-    createTestService()
-    createTestLabel()
-  }
   def createTestService() = {
+    implicit val session = AutoSession
     Management.createService(serviceName, cluster, hTableName, preSplitSize, hTableTTL = None, "gz")
     Management.createService(serviceNameV2, cluster, hTableName, preSplitSize, hTableTTL = None, "gz")
   def deleteTestService() = {
+    implicit val session = AutoSession
   def deleteTestLabel() = {
+    implicit val session = AutoSession
   def createTestLabel() = {
+    implicit val session = AutoSession
     Management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType,
       isDirected = true, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4")
@@ -98,31 +108,37 @@ trait TestCommonWithModels {
       isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
-  /** */
-  initTests()
+  def service = Service.findByName(serviceName, useCache = false).get
+  def serviceV2 = Service.findByName(serviceNameV2, useCache = false).get
+  def column = ServiceColumn.find(, columnName, useCache = false).get
+  def columnV2 = ServiceColumn.find(, columnNameV2, useCache = false).get
+  def tgtColumn = ServiceColumn.find(, tgtColumnName, useCache = false).get
+  def tgtColumnV2 = ServiceColumn.find(, tgtColumnNameV2, useCache = false).get
+  def label = Label.findByName(labelName, useCache = false).get
+  def labelV2 = Label.findByName(labelNameV2, useCache = false).get
+  def undirectedLabel = Label.findByName(undirectedLabelName, useCache = false).get
-  lazy val service = Service.findByName(serviceName, useCache = false).get
-  lazy val serviceV2 = Service.findByName(serviceNameV2, useCache = false).get
+  def undirectedLabelV2 = Label.findByName(undirectedLabelNameV2, useCache = false).get
-  lazy val column = ServiceColumn.find(, columnName, useCache = false).get
-  lazy val columnV2 = ServiceColumn.find(, columnNameV2, useCache = false).get
+  def dir = GraphUtil.directions("out")
-  lazy val tgtColumn = ServiceColumn.find(, tgtColumnName, useCache = false).get
-  lazy val tgtColumnV2 = ServiceColumn.find(, tgtColumnNameV2, useCache = false).get
+  def op = GraphUtil.operations("insert")
-  lazy val label = Label.findByName(labelName, useCache = false).get
-  lazy val labelV2 = Label.findByName(labelNameV2, useCache = false).get
+  def labelOrderSeq = LabelIndex.DefaultSeq
-  lazy val undirectedLabel = Label.findByName(undirectedLabelName, useCache = false).get
-  lazy val undirectedLabelV2 = Label.findByName(undirectedLabelNameV2, useCache = false).get
+  def labelWithDir = LabelWithDirection(, dir)
-  lazy val dir = GraphUtil.directions("out")
-  lazy val op = GraphUtil.operations("insert")
-  lazy val labelOrderSeq = LabelIndex.DefaultSeq
+  def labelWithDirV2 = LabelWithDirection(, dir)
-  lazy val labelWithDir = LabelWithDirection(, dir)
-  lazy val labelWithDirV2 = LabelWithDirection(, dir)
+  def queryParam = QueryParam(labelWithDir)
-  lazy val queryParam = QueryParam(labelWithDir)
-  lazy val queryParamV2 = QueryParam(labelWithDirV2)
+  def queryParamV2 = QueryParam(labelWithDirV2)
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/models/ModelTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/models/ModelTest.scala
 import com.kakao.s2graph.core.mysqls.{Label, Model}
 import com.kakao.s2graph.core.{TestCommonWithModels, TestCommon, Graph}
 import com.typesafe.config.ConfigFactory
-import org.scalatest.{FunSuite, Matchers}
+import org.scalatest.{BeforeAndAfterAll, Sequential, FunSuite, Matchers}
 import scala.concurrent.ExecutionContext
- * Created by shon on 5/12/15.
- */
-class ModelTest extends FunSuite with Matchers with TestCommonWithModels {
+class ModelTest extends FunSuite with Matchers with TestCommonWithModels with BeforeAndAfterAll {
+  override def beforeAll(): Unit = {
+    initTests()
+  }
+  override def afterAll(): Unit = {
+    graph.shutdown()
+  }
-//  val serviceName = "testService"
-//  val newServiceName = "newTestService"
-//  val cluster = "localhost"
-//  val hbaseTableName = "s2graph-dev"
-//  val columnName = "user_id"
-//  val columnType = "long"
-//  val labelName = "model_test_label"
-//  val newLabelName = "new_model_test_label"
-//  val columnMetaName = "is_valid_user"
-//  val labelMetaName = "is_hidden"
-//  val hbaseTableTTL = -1
-//  val id = 1
-//  val service = HService(Map("id" -> id, "serviceName" -> serviceName, "cluster" -> cluster,
-//    "hbaseTableName" -> hbaseTableName, "preSplitSize" -> 0, "hbaseTableTTL" -> -1))
-//  val serviceColumn = HServiceColumn(Map("id" -> id, "serviceId" ->,
-//    "columnName" -> columnName, "columnType" -> columnType))
-//  val columnMeta = HColumnMeta(Map("id" -> id, "columnId" ->, "name" -> columnMetaName,  "seq" -> 1.toByte))
-//  val label = HLabel(Map("id" -> id, "label" -> labelName,
-//    "srcServiceId" ->, "srcColumnName" -> columnName, "srcColumnType" -> columnType,
-//    "tgtServiceId" ->, "tgtColumnName" -> columnName, "tgtColumnType" -> columnType,
-//    "isDirected" -> true, "serviceName" -> service.serviceName, "serviceId" ->,
-//    "consistencyLevel" -> "weak", "hTableName" -> hbaseTableName, "hTableTTL" -> -1
-//  ))
-//  val labelMeta = HLabelMeta(Map("id" -> id, "labelId" ->, "name" -> labelMetaName, "seq" -> 1.toByte,
-//    "defaultValue" -> false, "dataType" -> "boolean", "usedInIndex" -> false))
-//  val labelIndex = HLabelIndex(Map("id" -> id, "labelId" ->, "seq" -> 1.toByte,
-//    "metaSeqs" -> "0", "formular" -> "none"))
+  //  val serviceName = "testService"
+  //  val newServiceName = "newTestService"
+  //  val cluster = "localhost"
+  //  val hbaseTableName = "s2graph-dev"
+  //  val columnName = "user_id"
+  //  val columnType = "long"
+  //  val labelName = "model_test_label"
+  //  val newLabelName = "new_model_test_label"
+  //  val columnMetaName = "is_valid_user"
+  //  val labelMetaName = "is_hidden"
+  //  val hbaseTableTTL = -1
+  //  val id = 1
+  //
+  //  val service = HService(Map("id" -> id, "serviceName" -> serviceName, "cluster" -> cluster,
+  //    "hbaseTableName" -> hbaseTableName, "preSplitSize" -> 0, "hbaseTableTTL" -> -1))
+  //  val serviceColumn = HServiceColumn(Map("id" -> id, "serviceId" ->,
+  //    "columnName" -> columnName, "columnType" -> columnType))
+  //  val columnMeta = HColumnMeta(Map("id" -> id, "columnId" ->, "name" -> columnMetaName,  "seq" -> 1.toByte))
+  //  val label = HLabel(Map("id" -> id, "label" -> labelName,
+  //    "srcServiceId" ->, "srcColumnName" -> columnName, "srcColumnType" -> columnType,
+  //    "tgtServiceId" ->, "tgtColumnName" -> columnName, "tgtColumnType" -> columnType,
+  //    "isDirected" -> true, "serviceName" -> service.serviceName, "serviceId" ->,
+  //    "consistencyLevel" -> "weak", "hTableName" -> hbaseTableName, "hTableTTL" -> -1
+  //  ))
+  //  val labelMeta = HLabelMeta(Map("id" -> id, "labelId" ->, "name" -> labelMetaName, "seq" -> 1.toByte,
+  //    "defaultValue" -> false, "dataType" -> "boolean", "usedInIndex" -> false))
+  //  val labelIndex = HLabelIndex(Map("id" -> id, "labelId" ->, "seq" -> 1.toByte,
+  //    "metaSeqs" -> "0", "formular" -> "none"))
   test("test Label.findByName") {
     val labelOpt = Label.findByName(labelName, useCache = false)
@@ -66,66 +70,66 @@ class ModelTest extends FunSuite with Matchers with TestCommonWithModels {
     val tgtColumn = labelOpt.get.tgtService
-//  test("test create") {
-//    service.create()
-//    HService.findByName(serviceName, useCache = false) == Some(service)
-//    serviceColumn.create()
-//    HServiceColumn.findsByServiceId(, useCache = false).headOption == Some(serviceColumn)
-//    columnMeta.create()
-//    HColumnMeta.findByName(, columnMetaName, useCache = false) == Some(columnMeta)
-//    label.create()
-//    HLabel.findByName(labelName, useCache = false) == Some(label)
-//    labelMeta.create()
-//    HLabelMeta.findByName(, labelMetaName, useCache = false) == Some(labelMeta)
-//    labelIndex.create()
-//    HLabelIndex.findByLabelIdAll(, useCache = false).headOption == Some(labelIndex)
-//  }
-//  test("test update") {
-//    service.update("cluster", "...")
-//    HService.findById(, useCache = false).cluster == "..."
-//    service.update("serviceName", newServiceName)
-//    assert(HService.findByName(serviceName, useCache = false) == None)
-//    HService.findByName(newServiceName, useCache = false).map { service => ==}
-//    label.update("label", newLabelName)
-//    HLabel.findById(, useCache = false).label == "newLabelName"
-//    label.update("consistencyLevel", "strong")
-//    HLabel.findById(, useCache = false).consistencyLevel == "strong" &&
-//    HLabel.findByName(newLabelName).isDefined &&
-//    HLabel.findByName(labelName) == None
-//  }
-//  test("test read by index") {
-//    val labels = HLabel.findBySrcServiceId(, useCache = false)
-//    val idxs = HLabelIndex.findByLabelIdAll(, useCache = false)
-//    labels.length == 1 &&
-//    labels.head == label
-//    idxs.length == 1 &&
-//    idxs.head == labelIndex
-//  }
-//  test("test delete") {
-////    HLabel.findByName(labelName).foreach { label =>
-////      label.deleteAll()
-////    }
-//    HLabel.findByName(newLabelName).foreach { label =>
-//      label.deleteAll()
-//    }
-//    HLabelMeta.findAllByLabelId(, useCache = false).isEmpty &&
-//    HLabelIndex.findByLabelIdAll(, useCache = false).isEmpty
-//    service.deleteAll()
-//  }
+  //  test("test create") {
+  //    service.create()
+  //    HService.findByName(serviceName, useCache = false) == Some(service)
+  //
+  //    serviceColumn.create()
+  //    HServiceColumn.findsByServiceId(, useCache = false).headOption == Some(serviceColumn)
+  //
+  //    columnMeta.create()
+  //    HColumnMeta.findByName(, columnMetaName, useCache = false) == Some(columnMeta)
+  //
+  //    label.create()
+  //    HLabel.findByName(labelName, useCache = false) == Some(label)
+  //
+  //    labelMeta.create()
+  //    HLabelMeta.findByName(, labelMetaName, useCache = false) == Some(labelMeta)
+  //
+  //    labelIndex.create()
+  //    HLabelIndex.findByLabelIdAll(, useCache = false).headOption == Some(labelIndex)
+  //  }
+  //
+  //  test("test update") {
+  //    service.update("cluster", "...")
+  //    HService.findById(, useCache = false).cluster == "..."
+  //
+  //    service.update("serviceName", newServiceName)
+  //    assert(HService.findByName(serviceName, useCache = false) == None)
+  //    HService.findByName(newServiceName, useCache = false).map { service => ==}
+  //
+  //    label.update("label", newLabelName)
+  //    HLabel.findById(, useCache = false).label == "newLabelName"
+  //
+  //    label.update("consistencyLevel", "strong")
+  //    HLabel.findById(, useCache = false).consistencyLevel == "strong" &&
+  //    HLabel.findByName(newLabelName).isDefined &&
+  //    HLabel.findByName(labelName) == None
+  //
+  //  }
+  //  test("test read by index") {
+  //    val labels = HLabel.findBySrcServiceId(, useCache = false)
+  //    val idxs = HLabelIndex.findByLabelIdAll(, useCache = false)
+  //    labels.length == 1 &&
+  //    labels.head == label
+  //    idxs.length == 1 &&
+  //    idxs.head == labelIndex
+  //  }
+  //  test("test delete") {
+  ////    HLabel.findByName(labelName).foreach { label =>
+  ////      label.deleteAll()
+  ////    }
+  //    HLabel.findByName(newLabelName).foreach { label =>
+  //      label.deleteAll()
+  //    }
+  //    HLabelMeta.findAllByLabelId(, useCache = false).isEmpty &&
+  //    HLabelIndex.findByLabelIdAll(, useCache = false).isEmpty
+  //
+  //    service.deleteAll()
+  //  }
-//  test("test labelIndex") {
-//    println(HLabelIndex.findByLabelIdAll(1))
-//  }
+  //  test("test labelIndex") {
+  //    println(HLabelIndex.findByLabelIdAll(1))
+  //  }
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import scalikejdbc._
-  * Created by hsleep on 2015. 11. 30..
-  */
 class ExperimentSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   val Ttl = 2
   override def beforeAll(): Unit = {
@@ -19,37 +16,7 @@ class ExperimentSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
     val props = new Properties()
     props.setProperty("cache.ttl.seconds", Ttl.toString)
-    /*
-CREATE TABLE `experiments` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `service_id` integer NOT NULL,
-  `service_name` varchar(128) NOT NULL,
-  `name` varchar(64) NOT NULL,
-  `description` varchar(255) NOT NULL,
-  `experiment_type` varchar(8) NOT NULL DEFAULT 'u',
-  `total_modular` int NOT NULL DEFAULT 100,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_service_id_name` (`service_id`, `name`)
-CREATE TABLE `buckets` (
-  `id` integer NOT NULL AUTO_INCREMENT,
-  `experiment_id` integer NOT NULL,
-  `uuid_mods` varchar(64) NOT NULL,
-  `traffic_ratios` varchar(64) NOT NULL,
-  `http_verb` varchar(8) NOT NULL,
-  `api_path` text NOT NULL,
-  `uuid_key` varchar(128),
-  `uuid_placeholder` varchar(64),
-  `request_body` text NOT NULL,
-  `timeout` int NOT NULL DEFAULT 1000,
-  `impression_id` varchar(64) NOT NULL,
-  `is_graph_query` tinyint NOT NULL DEFAULT 1,
-  PRIMARY KEY (`id`),
-  UNIQUE KEY `ux_impression_id` (`impression_id`),
-  INDEX `idx_experiment_id` (`experiment_id`),
-  INDEX `idx_impression_id` (`impression_id`)
-     */
     implicit val session = AutoSession
     sql"""DELETE FROM buckets""".update().apply()
     sql"""DELETE FROM experiments""".update().apply()
@@ -58,6 +25,7 @@ CREATE TABLE `buckets` (
     sql"""INSERT INTO
            buckets(experiment_id, modular, http_verb, api_path, request_body, impression_id)
            VALUES($expId, "1~100", "POST", "/a/b/c", "None", "imp1")""".update().apply()
   "Experiment" should "find bucket list" in {
@@ -70,7 +38,7 @@ CREATE TABLE `buckets` (
   it should "update bucket list after cache ttl time" in {
-     Experiment.findBy(1, "exp1").foreach { exp =>
+    Experiment.findBy(1, "exp1").foreach { exp =>
       val bucket = exp.buckets.head
       bucket.impressionId should equal("imp1")
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
 class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels {
   // dummy data for dummy edge
   import HBaseType.{VERSION1, VERSION2}
   val ts = System.currentTimeMillis()
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseQueryBuilderTest.scala
 import com.kakao.s2graph.core.Graph
+import com.typesafe.config.ConfigFactory
 import org.apache.hadoop.hbase.util.Bytes
 import org.hbase.async.GetRequest
 import org.scalatest.{FunSuite, Matchers}
-  * Created by hsleep( on 2015. 11. 9..
-  */
+import scala.concurrent.ExecutionContext
 class AsynchbaseQueryBuilderTest extends FunSuite with Matchers {
   val dummyRequests = {
     for {
       id <- 0 until 1000
@@ -18,7 +19,8 @@ class AsynchbaseQueryBuilderTest extends FunSuite with Matchers {
-  val config = Graph.DefaultConfig
+  implicit val ec =
+  val config = ConfigFactory.load()
   val graph = new Graph(config)
   val qb = new AsynchbaseQueryBuilder([AsynchbaseStorage])
 package com.kakao.s2graph.core.types
+import com.kakao.s2graph.core.TestCommonWithModels
 import com.kakao.s2graph.core.types._
 import org.apache.hadoop.hbase.util.{Bytes, OrderedBytes, SimplePositionedByteRange}
 import org.scalatest.{Matchers, FunSuite}
 import play.api.libs.json.Json
- * Created by shon on 5/28/15.
- */
-class InnerValTest extends FunSuite with Matchers {
+class InnerValTest extends FunSuite with Matchers with TestCommonWithModels {
+  initTests()
   import HBaseType.{VERSION2, VERSION1}
   val decimals = List(
@@ -44,52 +44,52 @@ class InnerValTest extends FunSuite with Matchers {
     for {
       innerVal <- ranges
     }  {
-        val bytes = innerVal.bytes
-        val (decoded, numOfBytesUsed) = InnerVal.fromBytes(bytes, 0, bytes.length, version)
-        innerVal == decoded shouldBe true
-        bytes.length == numOfBytesUsed shouldBe true
-      }
+      val bytes = innerVal.bytes
+      val (decoded, numOfBytesUsed) = InnerVal.fromBytes(bytes, 0, bytes.length, version)
+      innerVal == decoded shouldBe true
+      bytes.length == numOfBytesUsed shouldBe true
+    }
-//  test("big decimal") {
-//    for {
-//      version <- List(VERSION2, VERSION1)
-//    } {
-//      val innerVals = { num => InnerVal.withNumber(num, version)}
-//      testEncodeDecode(innerVals, version)
-//    }
-//  }
-//  test("text") {
-//    for {
-//      version <- List(VERSION2)
-//    } {
-//      val innerVals = { t => InnerVal.withStr(t, version) }
-//      testEncodeDecode(innerVals, version)
-//    }
-//  }
-//  test("string") {
-//    for {
-//      version <- List(VERSION2, VERSION1)
-//    } {
-//      val innerVals = { t => InnerVal.withStr(t, version) }
-//      testEncodeDecode(innerVals, version)
-//    }
-//  }
-//  test("blob") {
-//    for {
-//      version <- List(VERSION2)
-//    } {
-//      val innerVals = { t => InnerVal.withBlob(t, version) }
-//      testEncodeDecode(innerVals, version)
-//    }
-//  }
-//  test("boolean") {
-//    for {
-//      version <- List(VERSION2, VERSION1)
-//    } {
-//      val innerVals = { t => InnerVal.withBoolean(t, version) }
-//      testEncodeDecode(innerVals, version)
-//    }
-//  }
+  //  test("big decimal") {
+  //    for {
+  //      version <- List(VERSION2, VERSION1)
+  //    } {
+  //      val innerVals = { num => InnerVal.withNumber(num, version)}
+  //      testEncodeDecode(innerVals, version)
+  //    }
+  //  }
+  //  test("text") {
+  //    for {
+  //      version <- List(VERSION2)
+  //    } {
+  //      val innerVals = { t => InnerVal.withStr(t, version) }
+  //      testEncodeDecode(innerVals, version)
+  //    }
+  //  }
+  //  test("string") {
+  //    for {
+  //      version <- List(VERSION2, VERSION1)
+  //    } {
+  //      val innerVals = { t => InnerVal.withStr(t, version) }
+  //      testEncodeDecode(innerVals, version)
+  //    }
+  //  }
+  //  test("blob") {
+  //    for {
+  //      version <- List(VERSION2)
+  //    } {
+  //      val innerVals = { t => InnerVal.withBlob(t, version) }
+  //      testEncodeDecode(innerVals, version)
+  //    }
+  //  }
+  //  test("boolean") {
+  //    for {
+  //      version <- List(VERSION2, VERSION1)
+  //    } {
+  //      val innerVals = { t => InnerVal.withBoolean(t, version) }
+  //      testEncodeDecode(innerVals, version)
+  //    }
+  //  }
   test("korean") {
     val small = InnerVal.withStr("가", VERSION2)
     val large = InnerVal.withStr("나", VERSION2)
@@ -99,32 +99,32 @@ class InnerValTest extends FunSuite with Matchers {
     println (Bytes.compareTo(smallBytes, largeBytes))
-//  test("innerVal") {
-//    val srcVal = InnerVal.withLong(44391298, VERSION2)
-//    val srcValV1 = InnerVal.withLong(44391298, VERSION1)
-//    val tgtVal = InnerVal.withLong(7295564, VERSION2)
-//    val a = VertexId(0, srcVal)
-//    val b = SourceVertexId(0, srcVal)
-//    val c = TargetVertexId(0, srcVal)
-//    val aa = VertexId(0, srcValV1)
-//    val bb = SourceVertexId(0, srcValV1)
-//    val cc = TargetVertexId(0, srcValV1)
-//    println(a.bytes.toList)
-//    println(b.bytes.toList)
-//    println(c.bytes.toList)
-//    println(aa.bytes.toList)
-//    println(bb.bytes.toList)
-//    println(cc.bytes.toList)
-//  }
-//  test("aa") {
-//    val bytes = InnerVal.withLong(Int.MaxValue, VERSION2).bytes
-//    val pbr = new SimplePositionedByteRange(bytes)
-//    pbr.setOffset(1)
-//    println(pbr.getPosition)
-//    val num = OrderedBytes.decodeNumericAsBigDecimal(pbr)
-//    println(pbr.getPosition)
-//    true
-//  }
+  //  test("innerVal") {
+  //    val srcVal = InnerVal.withLong(44391298, VERSION2)
+  //    val srcValV1 = InnerVal.withLong(44391298, VERSION1)
+  //    val tgtVal = InnerVal.withLong(7295564, VERSION2)
+  //
+  //    val a = VertexId(0, srcVal)
+  //    val b = SourceVertexId(0, srcVal)
+  //    val c = TargetVertexId(0, srcVal)
+  //    val aa = VertexId(0, srcValV1)
+  //    val bb = SourceVertexId(0, srcValV1)
+  //    val cc = TargetVertexId(0, srcValV1)
+  //    println(a.bytes.toList)
+  //    println(b.bytes.toList)
+  //    println(c.bytes.toList)
+  //
+  //    println(aa.bytes.toList)
+  //    println(bb.bytes.toList)
+  //    println(cc.bytes.toList)
+  //  }
+  //  test("aa") {
+  //    val bytes = InnerVal.withLong(Int.MaxValue, VERSION2).bytes
+  //    val pbr = new SimplePositionedByteRange(bytes)
+  //    pbr.setOffset(1)
+  //    println(pbr.getPosition)
+  //    val num = OrderedBytes.decodeNumericAsBigDecimal(pbr)
+  //    println(pbr.getPosition)
+  //    true
+  //  }
+import java.util.concurrent.Executors
+import actors.QueueActor
+import com.kakao.s2graph.core.utils.logger
+import com.kakao.s2graph.core.{ExceptionHandler, Graph}
+import config.Config
+import controllers.{AdminController, ApplicationController}
+import play.api.Application
+import play.api.mvc.{WithFilters, _}
+import play.filters.gzip.GzipFilter
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Try
+object Global extends WithFilters(new GzipFilter()) {
+  var s2graph: Graph = _
+  var s2parser: RequestParser = _
+  // Application entry point
+  override def onStart(app: Application) {
+    ApplicationController.isHealthy = false
+    val numOfThread = Runtime.getRuntime.availableProcessors()
+    val threadPool = Executors.newFixedThreadPool(numOfThread)
+    val ec = ExecutionContext.fromExecutor(threadPool)
+    val config = Config.conf.underlying
+    // init s2graph with config
+    s2graph = new Graph(config)(ec)
+    s2parser = new RequestParser(s2graph.config) // merged config
+    QueueActor.init(s2graph)
+    if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) {
+      ExceptionHandler.apply(config)
+    }
+    val defaultHealthOn = Config.conf.getBoolean("").getOrElse(true)
+    ApplicationController.deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get
+    ApplicationController.isHealthy = defaultHealthOn
+"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}")
+  }
+  override def onStop(app: Application) {
+    QueueActor.shutdown()
+    if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) {
+      ExceptionHandler.shutdown()
+    }
+    /**
+     * shutdown hbase client for flush buffers.
+     */
+    s2graph.shutdown()
+  }
+  override def onError(request: RequestHeader, ex: Throwable): Future[Result] = {
+    logger.error(s"onError => ip:${request.remoteAddress}, request:${request}", ex)
+    Future.successful(Results.InternalServerError)
+  }
+  override def onHandlerNotFound(request: RequestHeader): Future[Result] = {
+    logger.error(s"onHandlerNotFound => ip:${request.remoteAddress}, request:${request}")
+    Future.successful(Results.NotFound)
+  }
+  override def onBadRequest(request: RequestHeader, error: String): Future[Result] = {
+    logger.error(s"onBadRequest => ip:${request.remoteAddress}, request:$request, error:$error")
+    Future.successful(Results.BadRequest(error))
+  }
+package actors
+import java.util.concurrent.TimeUnit
+import actors.Protocol.FlushAll
+import com.kakao.s2graph.core.ExceptionHandler._
+import com.kakao.s2graph.core._
+import com.kakao.s2graph.core.utils.logger
+import config.Config
+import play.api.Play.current
+import play.api.libs.concurrent.Akka
+import scala.collection.mutable
+import scala.concurrent.duration.Duration
+ * Created by shon on 9/2/15.
+ */
+object Protocol {
+  case object Flush
+  case object FlushAll
+object QueueActor {
+  /** we are throttling down here so fixed number of actor to constant */
+  var router: ActorRef = _
+  //    Akka.system.actorOf(props(), name = "queueActor")
+  def init(s2: Graph) = {
+    router = Akka.system.actorOf(props(s2))
+  }
+  def shutdown() = {
+    router ! FlushAll
+    Akka.system.shutdown()
+    Thread.sleep(Config.ASYNC_HBASE_CLIENT_FLUSH_INTERVAL * 2)
+  }
+  def props(s2: Graph): Props = Props(classOf[QueueActor], s2)
+class QueueActor(s2: Graph) extends Actor with ActorLogging {
+  import Protocol._
+  implicit val ec = context.system.dispatcher
+  //  logger.error(s"QueueActor: $self")
+  val queue = mutable.Queue.empty[GraphElement]
+  var queueSize = 0L
+  val maxQueueSize = Config.LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE
+  val timeUnitInMillis = 10
+  val rateLimitTimeStep = 1000 / timeUnitInMillis
+  val rateLimit = Config.LOCAL_QUEUE_ACTOR_RATE_LIMIT / rateLimitTimeStep
+  context.system.scheduler.schedule(Duration.Zero, Duration(timeUnitInMillis, TimeUnit.MILLISECONDS), self, Flush)
+  override def receive: Receive = {
+    case element: GraphElement =>
+      if (queueSize > maxQueueSize) {
+        ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_FAIL_TOPIC, element, None))
+      } else {
+        queueSize += 1L
+        queue.enqueue(element)
+      }
+    case Flush =>
+      val elementsToFlush =
+        if (queue.size < rateLimit) queue.dequeueAll(_ => true)
+        else (0 until rateLimit).map(_ => queue.dequeue())
+      val flushSize = elementsToFlush.size
+      queueSize -= elementsToFlush.length
+      s2.mutateElements(elementsToFlush)
+      if (flushSize > 0) {
+"flush: $flushSize, $queueSize")
+      }
+    case FlushAll =>
+      s2.mutateElements(queue)
+      context.stop(self)
+    case _ => logger.error("unknown protocol")
+  }
+package config
+import play.api.Play
+object Config {
+  // HBASE
+  lazy val HBASE_ZOOKEEPER_QUORUM = conf.getString("hbase.zookeeper.quorum").getOrElse("localhost")
+  lazy val ASYNC_HBASE_CLIENT_FLUSH_INTERVAL = conf.getInt("async.hbase.client.flush.interval").getOrElse(1000).toShort
+  lazy val RPC_TIMEOUT = conf.getInt("hbase.client.operation.timeout").getOrElse(1000)
+  lazy val MAX_ATTEMPT = conf.getInt("hbase.client.operation.maxAttempt").getOrElse(3)
+  // PHASE
+  lazy val PHASE = conf.getString("phase").getOrElse("dev")
+  lazy val conf = Play.current.configuration
+  // CACHE
+  lazy val CACHE_TTL_SECONDS = conf.getInt("cache.ttl.seconds").getOrElse(600)
+  lazy val CACHE_MAX_SIZE = conf.getInt("cache.max.size").getOrElse(10000)
+  //KAFKA
+  lazy val KAFKA_METADATA_BROKER_LIST = conf.getString("").getOrElse("localhost")
+  lazy val KAFKA_PRODUCER_POOL_SIZE = conf.getInt("kafka.producer.pool.size").getOrElse(0)
+  lazy val KAFKA_LOG_TOPIC = s"s2graphIn${PHASE}"
+  lazy val KAFKA_LOG_TOPIC_ASYNC = s"s2graphIn${PHASE}Async"
+  lazy val KAFKA_FAIL_TOPIC = s"s2graphIn${PHASE}Failed"
+  // is query or write
+  lazy val IS_QUERY_SERVER = conf.getBoolean("is.query.server").getOrElse(true)
+  lazy val IS_WRITE_SERVER = conf.getBoolean("is.write.server").getOrElse(true)
+  // query limit per step
+  lazy val QUERY_HARD_LIMIT = conf.getInt("query.hard.limit").getOrElse(300)
+  // local queue actor
+  lazy val LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE = conf.getInt("").getOrElse(10000)
+  lazy val LOCAL_QUEUE_ACTOR_RATE_LIMIT = conf.getInt("").getOrElse(1000)
+package config
+ * Created by hsleep( on 15. 9. 3..
+ */
+object CounterConfig {
+  // kafka
+  lazy val KAFKA_TOPIC_COUNTER = s"s2counter-${Config.PHASE}"
+  lazy val KAFKA_TOPIC_COUNTER_TRX = s"s2counter-trx-${Config.PHASE}"