[7/7] incubator-s2graph git commit: [S2GRAPH-7] Abstract common codes for rest project into s2core

[S2GRAPH-7] Abstract common codes for rest project into s2core

  Move PostProcess.scala to s2core from root project
  Move RequestParser from Root to s2core
  Make RestCaller for abstract over http layer
  Move root project to s2rest_play
  Move Test from root to s2core


diff --git a/CHANGES b/CHANGES
index 11a40e3..3575cd8 100644
@@ -1,5 +1,4 @@
- Change Log 
+ Change Log
 Release 0.12.1 - unreleased
@@ -10,8 +9,9 @@ Release 0.12.1 - unreleased
     S2GRAPH-5: Add Apache RAT to valid LICENSE errors. (Committed by DOYUNG YOON).
+    S2GRAPH-7: Abstract common codes for rest project into s2core. (Committed by daewon).
diff --git a/app/Bootstrap.scala b/app/Bootstrap.scala
deleted file mode 100644
index 1879d90..0000000
--- a/app/Bootstrap.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-import java.util.concurrent.Executors
-import actors.QueueActor
-import com.kakao.s2graph.core.utils.logger
-import com.kakao.s2graph.core.{ExceptionHandler, Graph}
-import config.Config
-import controllers.{AdminController, ApplicationController}
-import play.api.Application
-import play.api.mvc.{WithFilters, _}
-import play.filters.gzip.GzipFilter
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.Try
-object Global extends WithFilters(new GzipFilter()) {
-  var s2graph: Graph = _
-  // Application entry point
-  override def onStart(app: Application) {
-    ApplicationController.isHealthy = false
-    val numOfThread = Runtime.getRuntime.availableProcessors()
-    val threadPool = Executors.newFixedThreadPool(numOfThread)
-    val ec = ExecutionContext.fromExecutor(threadPool)
-    val config = Config.conf.underlying
-    // init s2graph with config
-    s2graph = new Graph(config)(ec)
-    QueueActor.init(s2graph)
-    if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) {
-      ExceptionHandler.apply(config)
-    }
-    val defaultHealthOn = Config.conf.getBoolean("").getOrElse(true)
-    ApplicationController.deployInfo = Try(Source.fromFile("./release_info").mkString("")).recover { case _ => "release info not found\n" }.get
-    AdminController.loadCacheInner()
-    ApplicationController.isHealthy = defaultHealthOn
-"starts with num of thread: $numOfThread, ${threadPool.getClass.getSimpleName}")
-  }
-  override def onStop(app: Application) {
-    QueueActor.shutdown()
-    if (Config.IS_WRITE_SERVER && Config.KAFKA_PRODUCER_POOL_SIZE > 0) {
-      ExceptionHandler.shutdown()
-    }
-    /**
-     * shutdown hbase client for flush buffers.
-     */
-    s2graph.shutdown()
-  }
-  override def onError(request: RequestHeader, ex: Throwable): Future[Result] = {
-    logger.error(s"onError => ip:${request.remoteAddress}, request:${request}", ex)
-    Future.successful(Results.InternalServerError)
-  }
-  override def onHandlerNotFound(request: RequestHeader): Future[Result] = {
-    logger.error(s"onHandlerNotFound => ip:${request.remoteAddress}, request:${request}")
-    Future.successful(Results.NotFound)
-  }
-  override def onBadRequest(request: RequestHeader, error: String): Future[Result] = {
-    logger.error(s"onBadRequest => ip:${request.remoteAddress}, request:$request, error:$error")
-    Future.successful(Results.BadRequest(error))
-  }
diff --git a/app/actors/QueueActor.scala b/app/actors/QueueActor.scala
deleted file mode 100644
index 74bc65d..0000000
--- a/app/actors/QueueActor.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-package actors
-import java.util.concurrent.TimeUnit
-import actors.Protocol.FlushAll
-import com.kakao.s2graph.core.ExceptionHandler._
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.utils.logger
-import config.Config
-import play.api.Play.current
-import play.api.libs.concurrent.Akka
-import scala.collection.mutable
-import scala.concurrent.duration.Duration
- * Created by shon on 9/2/15.
- */
-object Protocol {
-  case object Flush
-  case object FlushAll
-object QueueActor {
-  /** we are throttling down here so fixed number of actor to constant */
-  var router: ActorRef = _
-  //    Akka.system.actorOf(props(), name = "queueActor")
-  def init(s2: Graph) = {
-    router = Akka.system.actorOf(props(s2))
-  }
-  def shutdown() = {
-    router ! FlushAll
-    Akka.system.shutdown()
-    Thread.sleep(Config.ASYNC_HBASE_CLIENT_FLUSH_INTERVAL * 2)
-  }
-  def props(s2: Graph): Props = Props(classOf[QueueActor], s2)
-class QueueActor(s2: Graph) extends Actor with ActorLogging {
-  import Protocol._
-  implicit val ec = context.system.dispatcher
-  //  logger.error(s"QueueActor: $self")
-  val queue = mutable.Queue.empty[GraphElement]
-  var queueSize = 0L
-  val maxQueueSize = Config.LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE
-  val timeUnitInMillis = 10
-  val rateLimitTimeStep = 1000 / timeUnitInMillis
-  val rateLimit = Config.LOCAL_QUEUE_ACTOR_RATE_LIMIT / rateLimitTimeStep
-  context.system.scheduler.schedule(Duration.Zero, Duration(timeUnitInMillis, TimeUnit.MILLISECONDS), self, Flush)
-  override def receive: Receive = {
-    case element: GraphElement =>
-      if (queueSize > maxQueueSize) {
-        ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_FAIL_TOPIC, element, None))
-      } else {
-        queueSize += 1L
-        queue.enqueue(element)
-      }
-    case Flush =>
-      val elementsToFlush =
-        if (queue.size < rateLimit) queue.dequeueAll(_ => true)
-        else (0 until rateLimit).map(_ => queue.dequeue())
-      val flushSize = elementsToFlush.size
-      queueSize -= elementsToFlush.length
-      s2.mutateElements(elementsToFlush)
-      if (flushSize > 0) {
-"flush: $flushSize, $queueSize")
-      }
-    case FlushAll =>
-      s2.mutateElements(queue)
-      context.stop(self)
-    case _ => logger.error("unknown protocol")
-  }
diff --git a/app/config/Config.scala b/app/config/Config.scala
deleted file mode 100644
index 98b87c5..0000000
--- a/app/config/Config.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-package config
-import play.api.Play
-object Config {
-  // HBASE
-  lazy val HBASE_ZOOKEEPER_QUORUM = conf.getString("hbase.zookeeper.quorum").getOrElse("localhost")
-  lazy val ASYNC_HBASE_CLIENT_FLUSH_INTERVAL = conf.getInt("async.hbase.client.flush.interval").getOrElse(1000).toShort
-  lazy val RPC_TIMEOUT = conf.getInt("hbase.client.operation.timeout").getOrElse(1000)
-  lazy val MAX_ATTEMPT = conf.getInt("hbase.client.operation.maxAttempt").getOrElse(3)
-  // PHASE
-  lazy val PHASE = conf.getString("phase").getOrElse("dev")
-  lazy val conf = Play.current.configuration
-  // CACHE
-  lazy val CACHE_TTL_SECONDS = conf.getInt("cache.ttl.seconds").getOrElse(600)
-  lazy val CACHE_MAX_SIZE = conf.getInt("cache.max.size").getOrElse(10000)
-  //KAFKA
-  lazy val KAFKA_METADATA_BROKER_LIST = conf.getString("").getOrElse("localhost")
-  lazy val KAFKA_PRODUCER_POOL_SIZE = conf.getInt("kafka.producer.pool.size").getOrElse(0)
-  lazy val KAFKA_LOG_TOPIC = s"s2graphIn${PHASE}"
-  lazy val KAFKA_LOG_TOPIC_ASYNC = s"s2graphIn${PHASE}Async"
-  lazy val KAFKA_FAIL_TOPIC = s"s2graphIn${PHASE}Failed"
-  // is query or write
-  lazy val IS_QUERY_SERVER = conf.getBoolean("is.query.server").getOrElse(true)
-  lazy val IS_WRITE_SERVER = conf.getBoolean("is.write.server").getOrElse(true)
-  // query limit per step
-  lazy val QUERY_HARD_LIMIT = conf.getInt("query.hard.limit").getOrElse(300)
-  // local queue actor
-  lazy val LOCAL_QUEUE_ACTOR_MAX_QUEUE_SIZE = conf.getInt("").getOrElse(10000)
-  lazy val LOCAL_QUEUE_ACTOR_RATE_LIMIT = conf.getInt("").getOrElse(1000)
diff --git a/app/config/CounterConfig.scala b/app/config/CounterConfig.scala
deleted file mode 100644
index 2569d55..0000000
--- a/app/config/CounterConfig.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package config
- * Created by hsleep( on 15. 9. 3..
- */
-object CounterConfig {
-  // kafka
-  lazy val KAFKA_TOPIC_COUNTER = s"s2counter-${Config.PHASE}"
-  lazy val KAFKA_TOPIC_COUNTER_TRX = s"s2counter-trx-${Config.PHASE}"
diff --git a/app/controllers/AdminController.scala b/app/controllers/AdminController.scala
deleted file mode 100644
index b31f01f..0000000
--- a/app/controllers/AdminController.scala
+++ /dev/null
@@ -1,417 +0,0 @@
-package controllers
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.utils.logger
-import play.api.mvc
-import play.api.mvc.{Action, Controller}
-import play.api.libs.json._
-import play.api.libs.functional.syntax._
-import scala.util.{Failure, Success, Try}
-object AdminController extends Controller with RequestParser {
-  import ApplicationController._
-  /**
-   * admin message formatter
-   * @tparam T
-   */
-  trait AdminMessageFormatter[T] {
-    def toJson(msg: T): JsValue
-  }
-  object AdminMessageFormatter {
-    implicit def jsValueToJson[T <: JsValue] = new AdminMessageFormatter[T] {
-      def toJson(js: T) = js
-    }
-    implicit val stringToJson = new AdminMessageFormatter[String] {
-      def toJson(js: String) = Json.obj("message" -> js)
-    }
-  }
-  def format[T: AdminMessageFormatter](f: JsValue => play.mvc.Result)(message: T) = {
-    val formatter = implicitly[AdminMessageFormatter[T]]
-    f(formatter.toJson(message))
-  }
-  /**
-   * ok response
-   * @param message
-   * @tparam T
-   * @return
-   */
-  def ok[T: AdminMessageFormatter](message: T) = {
-    val formatter = implicitly[AdminMessageFormatter[T]]
-    Ok(formatter.toJson(message)).as(applicationJsonHeader)
-  }
-  /**
-   * bad request response
-   * @param message
-   * @tparam T
-   * @return
-   */
-  def bad[T: AdminMessageFormatter](message: T) = {
-    val formatter = implicitly[AdminMessageFormatter[T]]
-    BadRequest(formatter.toJson(message)).as(applicationJsonHeader)
-  }
-  /**
-   * not found response
-   * @param message
-   * @tparam T
-   * @return
-   */
-  def notFound[T: AdminMessageFormatter](message: T) = {
-    val formatter = implicitly[AdminMessageFormatter[T]]
-    NotFound(formatter.toJson(message)).as(applicationJsonHeader)
-  }
-  private[AdminController] def tryResponse[T, R: AdminMessageFormatter](res: Try[T])(callback: T => R): mvc.Result = res match {
-    case Success(m) =>
-      val ret = callback(m)
-      ok(ret)
-    case Failure(error) =>
-      logger.error(error.getMessage, error)
-      error match {
-        case JsResultException(e) => bad(JsError.toFlatJson(e))
-        case _ => bad(error.getMessage)
-      }
-  }
-  def optionResponse[T, R: AdminMessageFormatter](res: Option[T])(callback: T => R): mvc.Result = res match {
-    case Some(m) => ok(callback(m))
-    case None => notFound("not found")
-  }
-  /**
-   * load all model cache
-   * @return
-   */
-  def loadCache() = Action { request =>
-    val startTs = System.currentTimeMillis()
-    if (!ApplicationController.isHealthy) {
-      loadCacheInner()
-    }
-    ok(s"${System.currentTimeMillis() - startTs}")
-  }
-  def loadCacheInner() = {
-    Service.findAll()
-    ServiceColumn.findAll()
-    Label.findAll()
-    LabelMeta.findAll()
-    LabelIndex.findAll()
-    ColumnMeta.findAll()
-  }
-  /**
-   * read
-   */
-  /**
-   * get service info
-   * @param serviceName
-   * @return
-   */
-  def getService(serviceName: String) = Action { request =>
-    val serviceOpt = Management.findService(serviceName)
-    optionResponse(serviceOpt)(_.toJson)
-  }
-  /**
-   * get label info
-   * @param labelName
-   * @return
-   */
-  def getLabel(labelName: String) = Action { request =>
-    val labelOpt = Management.findLabel(labelName)
-    optionResponse(labelOpt)(_.toJson)
-  }
-  /**
-   * get all labels of service
-   * @param serviceName
-   * @return
-   */
-  def getLabels(serviceName: String) = Action { request =>
-    Service.findByName(serviceName) match {
-      case None => notFound(s"Service $serviceName not found")
-      case Some(service) =>
-        val src = Label.findBySrcServiceId(
-        val tgt = Label.findByTgtServiceId(
-        ok(Json.obj("from" ->, "to" ->
-    }
-  }
-  /**
-   * get service columns
-   * @param serviceName
-   * @param columnName
-   * @return
-   */
-  def getServiceColumn(serviceName: String, columnName: String) = Action { request =>
-    val serviceColumnOpt = for {
-      service <- Service.findByName(serviceName)
-      serviceColumn <- ServiceColumn.find(, columnName, useCache = false)
-    } yield serviceColumn
-    optionResponse(serviceColumnOpt)(_.toJson)
-  }
-  /**
-   * create
-   */
-  /**
-   * create service
-   * @return
-   */
-  def createService() = Action(parse.json) { request =>
-    val serviceTry = createServiceInner(request.body)
-    tryResponse(serviceTry)(_.toJson)
-  }
-  def createServiceInner(jsValue: JsValue) = {
-    val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = toServiceElements(jsValue)
-    Management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
-  }
-  /**
-   * create label
-   * @return
-   */
-  def createLabel() = Action(parse.json) { request =>
-    val ret = createLabelInner(request.body)
-    tryResponse(ret)(_.toJson)
-  }
-  def createLabelInner(json: JsValue) = for {
-    labelArgs <- toLabelElements(json)
-    label <- (Management.createLabel _).tupled(labelArgs)
-  } yield label
-  /**
-   * add index
-   * @return
-   */
-  def addIndex() = Action(parse.json) { request =>
-    val ret = addIndexInner(request.body)
-    tryResponse(ret)(_.label + " is updated")
-  }
-  def addIndexInner(json: JsValue) = for {
-    (labelName, indices) <- toIndexElements(json)
-    label <- Management.addIndex(labelName, indices)
-  } yield label
-  /**
-   * create service column
-   * @return
-   */
-  def createServiceColumn() = Action(parse.json) { request =>
-    val serviceColumnTry = createServiceColumnInner(request.body)
-    tryResponse(serviceColumnTry) { (columns: Seq[ColumnMeta]) => Json.obj("metas" -> }
-  }
-  def createServiceColumnInner(jsValue: JsValue) = for {
-    (serviceName, columnName, columnType, props) <- toServiceColumnElements(jsValue)
-    serviceColumn <- Management.createServiceColumn(serviceName, columnName, columnType, props)
-  } yield serviceColumn
-  /**
-   * delete
-   */
-  /**
-   * delete label
-   * @param labelName
-   * @return
-   */
-  def deleteLabel(labelName: String) = Action { request =>
-    val deleteLabelTry = deleteLabelInner(labelName)
-    tryResponse(deleteLabelTry)(labelName => labelName + " is deleted")
-  }
-  def deleteLabelInner(labelName: String) = Management.deleteLabel(labelName)
-  /**
-   * delete servieColumn
-   * @param serviceName
-   * @param columnName
-   * @return
-   */
-  def deleteServiceColumn(serviceName: String, columnName: String) = Action { request =>
-    val serviceColumnTry = deleteServiceColumnInner(serviceName, columnName)
-    tryResponse(serviceColumnTry)(columnName => columnName + " is deleted")
-  }
-  def deleteServiceColumnInner(serviceName: String, columnName: String) =
-    Management.deleteColumn(serviceName, columnName)
-  /**
-   * update
-   */
-  /**
-   * add Prop to label
-   * @param labelName
-   * @return
-   */
-  def addProp(labelName: String) = Action(parse.json) { request =>
-    val labelMetaTry = addPropInner(labelName, request.body)
-    tryResponse(labelMetaTry)(_.toJson)
-  }
-  def addPropInner(labelName: String, js: JsValue) = for {
-    prop <- toPropElements(js)
-    labelMeta <- Management.addProp(labelName, prop)
-  } yield labelMeta
-  /**
-   * add prop to serviceColumn
-   * @param serviceName
-   * @param columnName
-   * @return
-   */
-  def addServiceColumnProp(serviceName: String, columnName: String) = Action(parse.json) { request =>
-    addServiceColumnPropInner(serviceName, columnName)(request.body) match {
-      case None => bad(s"can`t find service with $serviceName or can`t find serviceColumn with $columnName")
-      case Some(m) => Ok(m.toJson).as(applicationJsonHeader)
-    }
-  }
-  def addServiceColumnPropInner(serviceName: String, columnName: String)(js: JsValue) = {
-    for {
-      service <- Service.findByName(serviceName)
-      serviceColumn <- ServiceColumn.find(, columnName)
-      prop <- toPropElements(js).toOption
-    } yield {
-      ColumnMeta.findOrInsert(,, prop.defaultValue)
-    }
-  }
-  /**
-   * add props to serviceColumn
-   * @param serviecName
-   * @param columnName
-   * @return
-   */
-  def addServiceColumnProps(serviecName: String, columnName: String) = Action(parse.json) { request =>
-    val jsObjs = request.body.asOpt[List[JsObject]].getOrElse(List.empty[JsObject])
-    val newProps = for {
-      js <- jsObjs
-      newProp <- addServiceColumnPropInner(serviecName, columnName)(js)
-    } yield newProp
-    ok(s"${newProps.size} is added.")
-  }
-  /**
-   * copy label
-   * @param oldLabelName
-   * @param newLabelName
-   * @return
-   */
-  def copyLabel(oldLabelName: String, newLabelName: String) = Action { request =>
-    val copyTry = Management.copyLabel(oldLabelName, newLabelName, Some(newLabelName))
-    tryResponse(copyTry)(_.label + "created")
-  }
-  /**
-   * rename label
-   * @param oldLabelName
-   * @param newLabelName
-   * @return
-   */
-  def renameLabel(oldLabelName: String, newLabelName: String) = Action { request =>
-    Label.findByName(oldLabelName) match {
-      case None =>
-      case Some(label) =>
-        Management.updateLabelName(oldLabelName, newLabelName)
-        ok(s"Label was updated")
-    }
-  }
-  /**
-   * update HTable for a label
-   * @param labelName
-   * @param newHTableName
-   * @return
-   */
-  def updateHTable(labelName: String, newHTableName: String) = Action { request =>
-    val updateTry = Management.updateHTable(labelName, newHTableName)
-    tryResponse(updateTry)(_.toString + " label(s) updated.")
-  }
-  case class HTableParams(cluster: String, hTableName: String,
-    preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: Option[String]) {
-    override def toString(): String = {
-      s"""HtableParams
-         |-- cluster : $cluster
-         |-- hTableName : $hTableName
-         |-- preSplitSize : $preSplitSize
-         |-- hTableTTL : $hTableTTL
-         |-- compressionAlgorithm : $compressionAlgorithm
-         |""".stripMargin
-    }
-  }
-  implicit object HTableParamsJsonConverter extends Format[HTableParams] {
-    def reads(json: JsValue): JsResult[HTableParams] = (
-    (__ \ "cluster").read[String] and
-    (__ \ "hTableName").read[String] and
-    (__ \ "preSplitSize").read[Int] and
-    (__ \ "hTableTTL").readNullable[Int] and
-      (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply _).reads(json)
-    def writes(o: HTableParams): JsValue = Json.obj(
-      "cluster" -> o.cluster,
-      "hTableName" -> o.hTableName,
-      "preSplitSize" -> o.preSplitSize,
-      "hTableTTL" -> o.hTableTTL,
-      "compressionAlgorithm" -> o.compressionAlgorithm
-    )
-  }
-  implicit object JsErrorJsonWriter extends Writes[JsError] {
-    def writes(o: JsError): JsValue = Json.obj(
-      "errors" -> JsArray(
- {
-          case (path, validationErrors) => Json.obj(
-            "path" -> Json.toJson(path.toString()),
-            "validationErrors" -> JsArray( => Json.obj(
-              "message" -> JsString(validationError.message),
-              "args" -> JsArray( match {
-                case x: Int => JsNumber(x)
-                case x => JsString(x.toString)
-              }))
-            )))
-          )
-        }
-      )
-    )
-  }
-  def createHTable() = Action { request =>
-    //    Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
-[HTableParams] match {
-      case JsSuccess(hTableParams, _) => {
-        Management.createTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"), hTableParams.preSplitSize, hTableParams.hTableTTL, hTableParams.compressionAlgorithm.getOrElse(Management.defaultCompressionAlgorithm))
-        ok(s"HTable was created.")
-      }
-      case err@JsError(_) => bad(Json.toJson(err))
-    }).getOrElse(bad("Invalid Json."))
-  }
diff --git a/app/controllers/ApplicationController.scala b/app/controllers/ApplicationController.scala
deleted file mode 100644
index fefe93e..0000000
--- a/app/controllers/ApplicationController.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package controllers
-import com.kakao.s2graph.core.utils.logger
-import play.api.libs.iteratee.Enumerator
-import play.api.libs.json.{JsString, JsValue}
-import play.api.mvc._
-import scala.concurrent.{ExecutionContext, Future}
-object ApplicationController extends Controller {
-  var isHealthy = true
-  var deployInfo = ""
-  val applicationJsonHeader = "application/json"
-  val jsonParser: BodyParser[JsValue] = controllers.s2parse.json
-  def updateHealthCheck(isHealthy: Boolean) = Action { request =>
-    this.isHealthy = isHealthy
-    Ok(this.isHealthy + "\n")
-  }
-  def healthCheck() = withHeader(parse.anyContent) { request =>
-    if (isHealthy) Ok(deployInfo)
-    else NotFound
-  }
-  def jsonResponse(json: JsValue, headers: (String, String)*) =
-    if (ApplicationController.isHealthy) {
-      Ok(json).as(applicationJsonHeader).withHeaders(headers: _*)
-    } else {
-      Result(
-        header = ResponseHeader(OK),
-        body = Enumerator(json.toString.getBytes()),
-        connection = HttpConnection.Close
-      ).as(applicationJsonHeader).withHeaders((CONNECTION -> "close") +: headers: _*)
-    }
-  def toLogMessage[A](request: Request[A], result: Result)(startedAt: Long): String = {
-    val duration = System.currentTimeMillis() - startedAt
-    val isQueryRequest = result.header.headers.contains("result_size")
-    val resultSize = result.header.headers.getOrElse("result_size", "-1")
-    try {
-      val body = request.body match {
-        case AnyContentAsJson(jsValue) => jsValue match {
-          case JsString(str) => str
-          case _ => jsValue.toString
-        }
-        case _ => request.body.toString
-      }
-      val str =
-        if (isQueryRequest)
-          s"${request.method} ${request.uri} took ${duration} ms ${result.header.status} ${resultSize} ${body}"
-        else
-          s"${request.method} ${request.uri} took ${duration} ms ${result.header.status} ${resultSize} ${body}"
-"${request.method} ${request.uri} result_size: $resultSize")
-      str
-    } finally {
-      /* pass */
-    }
-  }
-  def withHeaderAsync[A](bodyParser: BodyParser[A])(block: Request[A] => Future[Result])(implicit ex: ExecutionContext) =
-    Action.async(bodyParser) { request =>
-      val startedAt = System.currentTimeMillis()
-      block(request).map { r =>
-, r)(startedAt))
-        r
-      }
-    }
-  def withHeader[A](bodyParser: BodyParser[A])(block: Request[A] => Result) =
-    Action(bodyParser) { request: Request[A] =>
-      val startedAt = System.currentTimeMillis()
-      val r = block(request)
-, r)(startedAt))
-      r
-    }
diff --git a/app/controllers/CounterController.scala b/app/controllers/CounterController.scala
deleted file mode 100644
index c2b4dc2..0000000
--- a/app/controllers/CounterController.scala
+++ /dev/null
@@ -1,747 +0,0 @@
-package controllers
-import com.kakao.s2graph.core.ExceptionHandler
-import com.kakao.s2graph.core.ExceptionHandler.KafkaMessage
-import com.kakao.s2graph.core.mysqls.Label
-import config.CounterConfig
-import models._
-import org.apache.kafka.clients.producer.ProducerRecord
-import play.api.Play
-import play.api.libs.json.Reads._
-import play.api.libs.json._
-import play.api.mvc.{Action, Controller, Request}
-import s2.config.S2CounterConfig
-import s2.counter.core.TimedQualifier.IntervalUnit
-import s2.counter.core._
-import s2.counter.core.v1.{ExactStorageAsyncHBase, RankingStorageRedis}
-import s2.counter.core.v2.{ExactStorageGraph, RankingStorageGraph}
-import s2.models.Counter.ItemType
-import s2.models.{Counter, CounterModel}
-import s2.util.{CartesianProduct, ReduceMapValue, UnitConverter}
-import scala.concurrent.Future
-import scala.util.{Failure, Success, Try}
- * Created by hsleep( on 15. 5. 22..
- */
-object CounterController extends Controller {
-  import play.api.libs.concurrent.Execution.Implicits.defaultContext
-  val config = Play.current.configuration.underlying
-  val s2config = new S2CounterConfig(config)
-  private val exactCounterMap = Map(
-    s2.counter.VERSION_1 -> new ExactCounter(config, new ExactStorageAsyncHBase(config)),
-    s2.counter.VERSION_2 -> new ExactCounter(config, new ExactStorageGraph(config))
-  )
-  private val rankingCounterMap = Map(
-    s2.counter.VERSION_1 -> new RankingCounter(config, new RankingStorageRedis(config)),
-    s2.counter.VERSION_2 -> new RankingCounter(config, new RankingStorageGraph(config))
-  )
-  private val tablePrefixMap = Map (
-    s2.counter.VERSION_1 -> "s2counter",
-    s2.counter.VERSION_2 -> "s2counter_v2"
-  )
-  private def exactCounter(version: Byte): ExactCounter = exactCounterMap(version)
-  private def rankingCounter(version: Byte): RankingCounter = rankingCounterMap(version)
-  lazy val counterModel = new CounterModel(config)
-  def getQueryString[T](key: String, default: String)(implicit request: Request[T]): String = {
-    request.getQueryString(key).getOrElse(default)
-  }
-  implicit val counterWrites = new Writes[Counter] {
-    override def writes(o: Counter): JsValue = Json.obj(
-      "version" -> o.version.toInt,
-      "autoComb" -> o.autoComb,
-      "dimension" -> o.dimension,
-      "useProfile" -> o.useProfile,
-      "bucketImpId" -> o.bucketImpId,
-      "useRank" -> o.useRank,
-      "intervalUnit" -> o.intervalUnit,
-      "ttl" -> o.ttl,
-      "dailyTtl" -> o.dailyTtl,
-      "rateAction" -> o.rateActionId.flatMap { actionId =>
-        counterModel.findById(actionId, useCache = false).map { actionPolicy =>
-          Json.obj("service" -> actionPolicy.service, "action" -> actionPolicy.action)
-        }
-      },
-      "rateBase" -> o.rateBaseId.flatMap { baseId =>
-        counterModel.findById(baseId, useCache = false).map { basePolicy =>
-          Json.obj("service" -> basePolicy.service, "action" -> basePolicy.action)
-        }
-      },
-      "rateThreshold" -> o.rateThreshold
-    )
-  }
-  def createAction(service: String, action: String) = Action(s2parse.json) { implicit request =>
-    counterModel.findByServiceAction(service, action, useCache = false) match {
-      case None =>
-        val body = request.body
-        val version = (body \ "version").asOpt[Int].map(_.toByte).getOrElse(s2.counter.VERSION_2)
-        val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(true)
-        val dimension = (body \ "dimension").asOpt[String].getOrElse("")
-        val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(false)
-        val bucketImpId = (body \ "bucketImpId").asOpt[String]
-        val useExact = (body \ "useExact").asOpt[Boolean].getOrElse(true)
-        val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(true)
-        val intervalUnit = (body \ "intervalUnit").asOpt[String]
-        // 2 day
-        val ttl = (body \ "ttl").asOpt[Int].getOrElse(2 * 24 * 60 * 60)
-        val dailyTtl = (body \ "dailyTtl").asOpt[Int]
-        val regionMultiplier = (body \ "regionMultiplier").asOpt[Int].getOrElse(1)
-        val rateAction = (body \ "rateAction").asOpt[Map[String, String]]
-        val rateBase = (body \ "rateBase").asOpt[Map[String, String]]
-        val rateThreshold = (body \ "rateThreshold").asOpt[Int]
-        val rateActionId = {
-          for {
-            actionMap <- rateAction
-            service <- actionMap.get("service")
-            action <- actionMap.get("action")
-            policy <- counterModel.findByServiceAction(service, action)
-          } yield {
-          }
-        }
-        val rateBaseId = {
-          for {
-            actionMap <- rateBase
-            service <- actionMap.get("service")
-            action <- actionMap.get("action")
-            policy <- counterModel.findByServiceAction(service, action)
-          } yield {
-          }
-        }
-        val hbaseTable = {
-          Seq(tablePrefixMap(version), service, ttl) ++ dailyTtl mkString "_"
-        }
-        // find label
-        val itemType = Label.findByName(action, useCache = false) match {
-          case Some(label) =>
-            ItemType.withName(label.tgtColumnType.toUpperCase)
-          case None =>
-            val strItemType = (body \ "itemType").asOpt[String].getOrElse("STRING")
-            ItemType.withName(strItemType.toUpperCase)
-        }
-        val policy = Counter(useFlag = true, version, service, action, itemType, autoComb = autoComb, dimension,
-          useProfile = useProfile, bucketImpId, useRank = useRank, ttl, dailyTtl, Some(hbaseTable), intervalUnit,
-          rateActionId, rateBaseId, rateThreshold)
-        // prepare exact storage
-        exactCounter(version).prepare(policy)
-        if (useRank) {
-          // prepare ranking storage
-          rankingCounter(version).prepare(policy)
-        }
-        counterModel.createServiceAction(policy)
-        Ok(Json.toJson(Map("msg" -> s"created $service/$action")))
-      case Some(policy) =>
-        Ok(Json.toJson(Map("msg" -> s"already exist $service/$action")))
-    }
-  }
-  def getAction(service: String, action: String) = Action { request =>
-    counterModel.findByServiceAction(service, action, useCache = false) match {
-      case Some(policy) =>
-        Ok(Json.toJson(policy))
-      case None =>
-        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
-    }
-  }
-  def updateAction(service: String, action: String) = Action(s2parse.json) { request =>
-    counterModel.findByServiceAction(service, action, useCache = false) match {
-      case Some(oldPolicy) =>
-        val body = request.body
-        val autoComb = (body \ "autoComb").asOpt[Boolean].getOrElse(oldPolicy.autoComb)
-        val dimension = (body \ "dimension").asOpt[String].getOrElse(oldPolicy.dimension)
-        val useProfile = (body \ "useProfile").asOpt[Boolean].getOrElse(oldPolicy.useProfile)
-        val bucketImpId = (body \ "bucketImpId").asOpt[String] match {
-          case Some(s) => Some(s)
-          case None => oldPolicy.bucketImpId
-        }
-        val useRank = (body \ "useRank").asOpt[Boolean].getOrElse(oldPolicy.useRank)
-        val intervalUnit = (body \ "intervalUnit").asOpt[String] match {
-          case Some(s) => Some(s)
-          case None => oldPolicy.intervalUnit
-        }
-        val rateAction = (body \ "rateAction").asOpt[Map[String, String]]
-        val rateBase = (body \ "rateBase").asOpt[Map[String, String]]
-        val rateThreshold = (body \ "rateThreshold").asOpt[Int] match {
-          case Some(i) => Some(i)
-          case None => oldPolicy.rateThreshold
-        }
-        val rateActionId = {
-          for {
-            actionMap <- rateAction
-            service <- actionMap.get("service")
-            action <- actionMap.get("action")
-            policy <- counterModel.findByServiceAction(service, action, useCache = false)
-          } yield {
-          }
-        } match {
-          case Some(i) => Some(i)
-          case None => oldPolicy.rateActionId
-        }
-        val rateBaseId = {
-          for {
-            actionMap <- rateBase
-            service <- actionMap.get("service")
-            action <- actionMap.get("action")
-            policy <- counterModel.findByServiceAction(service, action, useCache = false)
-          } yield {
-          }
-        } match {
-          case Some(i) => Some(i)
-          case None => oldPolicy.rateBaseId
-        }
-        // new counter
-        val policy = Counter(id =, useFlag = oldPolicy.useFlag, oldPolicy.version, service, action, oldPolicy.itemType, autoComb = autoComb, dimension,
-          useProfile = useProfile, bucketImpId, useRank = useRank, oldPolicy.ttl, oldPolicy.dailyTtl, oldPolicy.hbaseTable, intervalUnit,
-          rateActionId, rateBaseId, rateThreshold)
-        counterModel.updateServiceAction(policy)
-        Ok(Json.toJson(Map("msg" -> s"updated $service/$action")))
-      case None =>
-        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
-    }
-  }
-  def prepareAction(service: String, action: String) = Action(s2parse.json) { request =>
-    // for data migration
-    counterModel.findByServiceAction(service, action, useCache = false) match {
-      case Some(policy) =>
-        val body = request.body
-        val version = (body \ "version").as[Int].toByte
-        if (version != policy.version) {
-          // change table name
-          val newTableName = Seq(tablePrefixMap(version), service, policy.ttl) ++ policy.dailyTtl mkString "_"
-          val newPolicy = policy.copy(version = version, hbaseTable = Some(newTableName))
-          exactCounter(version).prepare(newPolicy)
-          if (newPolicy.useRank) {
-            rankingCounter(version).prepare(newPolicy)
-          }
-          Ok(Json.toJson(Map("msg" -> s"prepare storage v$version $service/$action")))
-        } else {
-          Ok(Json.toJson(Map("msg" -> s"already prepared storage v$version $service/$action")))
-        }
-      case None =>
-        NotFound(Json.toJson(Map("msg" -> s"$service.$action not found")))
-    }
-  }
-  def deleteAction(service: String, action: String) = Action.apply {
-    {
-      for {
-        policy <- counterModel.findByServiceAction(service, action, useCache = false)
-      } yield {
-        Try {
-          exactCounter(policy.version).destroy(policy)
-          if (policy.useRank) {
-            rankingCounter(policy.version).destroy(policy)
-          }
-          counterModel.deleteServiceAction(policy)
-        } match {
-          case Success(v) =>
-            Ok(Json.toJson(Map("msg" -> s"deleted $service/$action")))
-          case Failure(ex) =>
-            throw ex
-        }
-      }
-    }.getOrElse(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))))
-  }
-  def getExactCountAsync(service: String, action: String, item: String) = Action.async { implicit request =>
-    val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq
-      .map(IntervalUnit.withName)
-    val limit = getQueryString("limit", "1").toInt
-    val qsSum = request.getQueryString("sum")
-    val optFrom = request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis)
-    val optTo = request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis)
-    val limitOpt = (optFrom, optTo) match {
-      case (Some(_), Some(_)) =>
-        None
-      case _ =>
-        Some(limit)
-    }
-    // find dimension
-    lazy val dimQueryValues = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) =>
-      (k.substring(1), v.mkString(",").split(',').filter(_.nonEmpty).toSet)
-    }
-//    Logger.warn(s"$dimQueryValues")
-    counterModel.findByServiceAction(service, action) match {
-      case Some(policy) =>
-        val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optFrom, optTo)
-        val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo)
-        try {
-//          Logger.warn(s"$tqs $qsSum")
-          if (tqs.head.length > 1 && qsSum.isDefined) {
-            getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum).map { jsVal =>
-              Ok(jsVal)
-            }
-          } else {
-            getExactCountToJs(policy, item, timeRange, limitOpt, dimQueryValues).map { jsVal =>
-              Ok(jsVal)
-            }
-          }
-        } catch {
-          case e: Exception =>
-            throw e
-//            Future.successful(BadRequest(s"$service, $action, $item"))
-        }
-      case None =>
-        Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))))
-    }
-  }
-  /**
-   * [{
-   *    "service": , "action", "itemIds": [], "interval": string, "limit": int, "from": ts, "to": ts,
-   *    "dimensions": [{"key": list[String]}]
-   * }]
-   * @return
-   */
-  private def parseExactCountParam(jsValue: JsValue) = {
-    val service = (jsValue \ "service").as[String]
-    val action = (jsValue \ "action").as[String]
-    val itemIds = (jsValue \ "itemIds").as[Seq[String]]
-    val intervals = (jsValue \ "intervals").asOpt[Seq[String]].getOrElse(Seq("t"))
-    val limit = (jsValue \ "limit").asOpt[Int].getOrElse(1)
-    val from = (jsValue \ "from").asOpt[Long]
-    val to = (jsValue \ "to").asOpt[Long]
-    val sum = (jsValue \ "sum").asOpt[String]
-    val dimensions = {
-      for {
-        dimension <- (jsValue \ "dimensions").asOpt[Seq[JsObject]].getOrElse(Nil)
-        (k, vs) <- dimension.fields
-      } yield {
-        k ->[Seq[String]].toSet
-      }
-    }.toMap
-    (service, action, itemIds, intervals, limit, from, to, dimensions, sum)
-  }
-  def getExactCountAsyncMulti = Action.async(s2parse.json) { implicit request =>
-    val jsValue = request.body
-    try {
-      val futures = {
-        for {
-          jsObject <- jsValue.asOpt[List[JsObject]].getOrElse(Nil)
-          (service, action, itemIds, intervalUnits, limit, from, to, dimQueryValues, qsSum) = parseExactCountParam(jsObject)
-          optFrom =
-          optTo =
-          timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo)
-          policy <- counterModel.findByServiceAction(service, action).toSeq
-          item <- itemIds
-        } yield {
-          val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optFrom, optTo)
-          val timeRange = TimedQualifier.getTimeRange(intervalUnits, limit, optFrom, optTo)
-          val limitOpt = (optFrom, optTo) match {
-            case (Some(_), Some(_)) =>
-              None
-            case _ =>
-              Some(limit)
-          }
-//          Logger.warn(s"$item, $limit, $optFrom, $optTo, $qsSum, $tqs")
-          if (tqs.head.length > 1 && qsSum.isDefined) {
-            getDecayedCountToJs(policy, item, timeRange, dimQueryValues, qsSum)
-          } else {
-            getExactCountToJs(policy, item, timeRange, limitOpt, dimQueryValues)
-          }
-        }
-      }
-      Future.sequence(futures).map { rets =>
-        Ok(Json.toJson(rets))
-      }
-    } catch {
-      case e: Exception =>
-        throw e
-//        Future.successful(BadRequest(s"$jsValue"))
-    }
-  }
-  private [controllers] def fetchedToResult(fetchedCounts: FetchedCountsGrouped, limitOpt: Option[Int]): Seq[ExactCounterIntervalItem] = {
-    for {
-      ((interval, dimKeyValues), values) <- fetchedCounts.intervalWithCountMap
-    } yield {
-      val counterItems = {
-        val sortedItems = values.toSeq.sortBy { case (eq, v) => -eq.tq.ts }
-        val limited = limitOpt match {
-          case Some(limit) => sortedItems.take(limit)
-          case None => sortedItems
-        }
-        for {
-          (eq, value) <- limited
-        } yield {
-          ExactCounterItem(eq.tq.ts, value, value.toDouble)
-        }
-      }
-      ExactCounterIntervalItem(interval.toString, dimKeyValues, counterItems)
-    }
-  }.toSeq
-  private def decayedToResult(decayedCounts: DecayedCounts): Seq[ExactCounterIntervalItem] = {
-    for {
-      (eq, score) <- decayedCounts.qualifierWithCountMap
-    } yield {
-      ExactCounterIntervalItem(eq.tq.q.toString, eq.dimKeyValues, Seq(ExactCounterItem(eq.tq.ts, score.toLong, score)))
-    }
-  }.toSeq
-  private def getExactCountToJs(policy: Counter,
-                                item: String,
-                                timeRange: Seq[(TimedQualifier, TimedQualifier)],
-                                limitOpt: Option[Int],
-                                dimQueryValues: Map[String, Set[String]]): Future[JsValue] = {
-    exactCounter(policy.version).getCountsAsync(policy, Seq(item), timeRange, dimQueryValues).map { seq =>
-      val items = {
-        for {
-          fetched <- seq
-        } yield {
-          fetchedToResult(fetched, limitOpt)
-        }
-      }.flatten
-      Json.toJson(ExactCounterResult(ExactCounterResultMeta(policy.service, policy.action, item), items))
-    }
-  }
-  private def getDecayedCountToJs(policy: Counter,
-                                  item: String,
-                                  timeRange: Seq[(TimedQualifier, TimedQualifier)],
-                                  dimQueryValues: Map[String, Set[String]],
-                                  qsSum: Option[String]): Future[JsValue] = {
-    exactCounter(policy.version).getDecayedCountsAsync(policy, Seq(item), timeRange, dimQueryValues, qsSum).map { seq =>
-      val decayedCounts = seq.head
-      val meta = ExactCounterResultMeta(policy.service, policy.action, decayedCounts.exactKey.itemKey)
-      val intervalItems = decayedToResult(decayedCounts)
-      Json.toJson(ExactCounterResult(meta, intervalItems))
-    }
-  }
-  def getRankingCountAsync(service: String, action: String) = Action.async { implicit request =>
-    lazy val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq
-      .map(IntervalUnit.withName)
-    lazy val limit = getQueryString("limit", "1").toInt
-    lazy val kValue = getQueryString("k", "10").toInt
-    lazy val qsSum = request.getQueryString("sum")
-    lazy val optFrom = request.getQueryString("from").filter(!_.isEmpty).map(UnitConverter.toMillis)
-    lazy val optTo = request.getQueryString("to").filter(!_.isEmpty).map(UnitConverter.toMillis)
-    // find dimension
-    lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) =>
-      (k.substring(1), v.mkString(",").split(',').toList)
-    }
-    val dimensions = {
-      for {
-        values <- CartesianProduct(dimensionMap.values.toList).toSeq
-      } yield {
-      }
-    }
-    counterModel.findByServiceAction(service, action) match {
-      case Some(policy) =>
-        val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit, optTo)
-        val dimKeys = {
-          for {
-            dimension <- dimensions
-          } yield {
-            dimension -> => RankingKey(, policy.version, ExactQualifier(tq, dimension)))
-          }
-        }
-        // if tqs has only 1 tq, do not apply sum function
-        try {
-          val rankResult = {
-            if (tqs.length > 1 && qsSum.isDefined) {
-              getSumRankCounterResultAsync(policy, dimKeys, kValue, qsSum)
-            } else {
-              // no summary
-              Future.successful(getRankCounterResult(policy, dimKeys, kValue))
-            }
-          }
- { result =>
-            Ok(Json.toJson(result))
-          }
-        } catch {
-          case e: UnsupportedOperationException =>
-            Future.successful(NotImplemented(Json.toJson(
-              Map("msg" -> e.getMessage)
-            )))
-          case e: Throwable =>
-            throw e
-        }
-      case None =>
-        Future.successful(NotFound(Json.toJson(Map("msg" -> s"$service.$action not found"))))
-    }
-  }
-  def deleteRankingCount(service: String, action: String) = Action.async { implicit request =>
-    lazy val intervalUnits = getQueryString("interval", getQueryString("step", "t")).split(',').toSeq
-      .map(IntervalUnit.withName)
-    lazy val limit = getQueryString("limit", "1").toInt
-    // find dimension
-    lazy val dimensionMap = request.queryString.filterKeys { k => k.charAt(0) == ':' }.map { case (k, v) =>
-      (k.substring(1), v.mkString(",").split(',').toList)
-    }
-    val dimensions = {
-      for {
-        values <- CartesianProduct(dimensionMap.values.toList).toSeq
-      } yield {
-      }
-    }
-    Future {
-      counterModel.findByServiceAction(service, action) match {
-        case Some(policy) =>
-          val tqs = TimedQualifier.getQualifiersToLimit(intervalUnits, limit)
-          val keys = {
-            for {
-              dimension <- dimensions
-              tq <- tqs
-            } yield {
-              RankingKey(, policy.version, ExactQualifier(tq, dimension))
-            }
-          }
-          for {
-            key <- keys
-          } {
-            rankingCounter(policy.version).delete(key)
-          }
-          Ok(JsObject(
-            Seq(
-              ("msg", Json.toJson(s"delete ranking in $service.$action")),
-              ("items", Json.toJson({
-                for {
-                  key <- keys
-                } yield {
-                  s"${key.eq.tq.q}.${key.eq.tq.ts}.${key.eq.dimension}"
-                }
-              }))
-            )
-          ))
-        case None =>
-          NotFound(Json.toJson(
-            Map("msg" -> s"$service.$action not found")
-          ))
-      }
-    }
-  }
-  val reduceRateRankingValue = new ReduceMapValue[ExactKeyTrait, RateRankingValue](RateRankingValue.reduce, RateRankingValue(-1, -1))
-  // change format
-  private def getDecayedCountsAsync(policy: Counter,
-                                    items: Seq[String],
-                                    timeRange: (TimedQualifier, TimedQualifier),
-                                    dimension: Map[String, String],
-                                    qsSum: Option[String]): Future[Seq[(ExactKeyTrait, Double)]] = {
-    exactCounter(policy.version).getDecayedCountsAsync(policy, items, Seq(timeRange), dimension.mapValues(s => Set(s)), qsSum).map { seq =>
-      for {
-        DecayedCounts(exactKey, qcMap) <- seq
-        value <- qcMap.values
-      } yield {
-        exactKey -> value
-      }
-    }
-  }
-  def getSumRankCounterResultAsync(policy: Counter,
-                                   dimKeys: Seq[(Map[String, String], Seq[RankingKey])],
-                                   kValue: Int,
-                                   qsSum: Option[String]): Future[RankCounterResult] = {
-    val futures = {
-      for {
-        (dimension, keys) <- dimKeys
-      } yield {
-        val tqs = => rk.eq.tq)
-        val (tqFrom, tqTo) = (tqs.last, tqs.head)
-        val items = rankingCounter(policy.version).getAllItems(keys, kValue)
-//        Logger.warn(s"item count: ${items.length}")
-        val future = {
-          if (policy.isRateCounter) {
-            val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get
-            val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get
-            val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq =>
-     { case (k, score) =>
-                ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1)
-              }.toMap
-            }
-            val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq =>
-     { case (k, score) =>
-                ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score)
-              }.toMap
-            }
-   { case (actionScores, baseScores) =>
-              reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) =>
-//                Logger.warn(s"$k -> $rrv")
-                k -> rrv.rankingValue.score
-              }.toSeq
-            }
-          }
-          else if (policy.isTrendCounter) {
-            val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get
-            val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get
-            val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq =>
-     { case (k, score) =>
-                ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1)
-              }.toMap
-            }
-            val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom.add(-1), tqTo.add(-1)), dimension, qsSum).map { seq =>
-     { case (k, score) =>
-                ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score)
-              }.toMap
-            }
-   { case (actionScores, baseScores) =>
-              reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) =>
-//                Logger.warn(s"$k -> $rrv")
-                k -> rrv.rankingValue.score
-              }.toSeq
-            }
-          }
-          else {
-            getDecayedCountsAsync(policy, items, (tqFrom, tqTo), dimension, qsSum)
-          }
-        }
- { keyWithScore =>
-          val ranking = keyWithScore.sortBy(-_._2).take(kValue)
-          val rankCounterItems = {
-            for {
-              idx <- ranking.indices
-              (exactKey, score) = ranking(idx)
-            } yield {
-              val realId = policy.itemType match {
-                case ItemType.BLOB => exactCounter(policy.version).getBlobValue(policy, exactKey.itemKey)
-                  .getOrElse(throw new Exception(s"not found blob id. ${policy.service}.${policy.action} ${exactKey.itemKey}"))
-                case _ => exactKey.itemKey
-              }
-              RankCounterItem(idx + 1, realId, score)
-            }
-          }
-          val eq = ExactQualifier(tqFrom, dimension)
-          RankCounterDimensionItem(eq.tq.q.toString, eq.tq.ts, eq.dimension, -1, rankCounterItems)
-        }
-      }
-    }
-    Future.sequence(futures).map { dimensionResultList =>
-      RankCounterResult(RankCounterResultMeta(policy.service, policy.action), dimensionResultList)
-    }
-  }
-  def getRankCounterResult(policy: Counter, dimKeys: Seq[(Map[String, String], Seq[RankingKey])], kValue: Int): RankCounterResult = {
-    val dimensionResultList = {
-      for {
-        (dimension, keys) <- dimKeys
-        key <- keys
-      } yield {
-        val rankingValue = rankingCounter(policy.version).getTopK(key, kValue)
-        val ranks = {
-          for {
-            rValue <- rankingValue.toSeq
-            idx <- rValue.values.indices
-            rank = idx + 1
-          } yield {
-            val (id, score) = rValue.values(idx)
-            val realId = policy.itemType match {
-              case ItemType.BLOB =>
-                exactCounter(policy.version)
-                  .getBlobValue(policy, id)
-                  .getOrElse(throw new Exception(s"not found blob id. ${policy.service}.${policy.action} $id"))
-              case _ => id
-            }
-            RankCounterItem(rank, realId, score)
-          }
-        }
-        val eq = key.eq
-        val tq = eq.tq
-        RankCounterDimensionItem(tq.q.toString, tq.ts, eq.dimension, => v.totalScore).getOrElse(0d), ranks)
-      }
-    }
-    RankCounterResult(RankCounterResultMeta(policy.service, policy.action), dimensionResultList)
-  }
-  type Record = ProducerRecord[String, String]
-  def incrementCount(service: String, action: String, item: String) = Action.async(s2parse.json) { request =>
-    Future {
-      /**
-       * {
-       * timestamp: Long
-       * property: {}
-       * value: Int
-       * }
-       */
-      lazy val metaMap = Map("service" -> service, "action" -> action, "item" -> item)
-      counterModel.findByServiceAction(service, action).map { policy =>
-        val body = request.body
-        try {
-          val ts = (body \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()).toString
-          val dimension = (body \ "dimension").asOpt[JsValue].getOrElse(Json.obj())
-          val property = (body \ "property").asOpt[JsValue].getOrElse(Json.obj())
-          val msg = List(ts, service, action, item, dimension, property).mkString("\t")
-          // produce to kafka
-          // hash partitioner by key
-          ExceptionHandler.enqueue(KafkaMessage(new Record(CounterConfig.KAFKA_TOPIC_COUNTER, s"$ts.$item", msg)))
-          Ok(Json.toJson(
-            Map(
-              "meta" -> metaMap
-            )
-          ))
-        }
-        catch {
-          case e: JsResultException =>
-            BadRequest(Json.toJson(
-              Map("msg" -> s"need timestamp.")
-            ))
-        }
-      }.getOrElse {
-        NotFound(Json.toJson(
-          Map("msg" -> s"$service.$action not found")
-        ))
-      }
-    }
-  }
diff --git a/app/controllers/EdgeController.scala b/app/controllers/EdgeController.scala
deleted file mode 100644
index 6279b2c..0000000
--- a/app/controllers/EdgeController.scala
+++ /dev/null
@@ -1,220 +0,0 @@
-package controllers
-import actors.QueueActor
-import com.kakao.s2graph.core.GraphExceptions.BadQueryException
-import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.{LabelMeta, Label}
-import com.kakao.s2graph.core.types.LabelWithDirection
-import com.kakao.s2graph.core.utils.logger
-import config.Config
-import org.apache.kafka.clients.producer.ProducerRecord
-import play.api.libs.json._
-import play.api.mvc.{Controller, Result}
-import scala.collection.Seq
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-object EdgeController extends Controller with RequestParser {
-  import ExceptionHandler._
-  import controllers.ApplicationController._
-  import play.api.libs.concurrent.Execution.Implicits._
-  private val s2: Graph =
-  def tryMutates(jsValue: JsValue, operation: String, withWait: Boolean = false): Future[Result] = {
-    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
-    else {
-      try {
-        logger.debug(s"$jsValue")
-        val edges = toEdges(jsValue, operation)
-        for (edge <- edges) {
-          if (edge.isAsync)
-            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, edge, None))
-          else
-            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, edge, None))
-        }
-        val edgesToStore = edges.filterNot(e => e.isAsync)
-        if (withWait) {
-          val rets = s2.mutateEdges(edgesToStore, withWait = true)
-        } else {
-          val rets = { edge => QueueActor.router ! edge; true }
-          Future.successful(jsonResponse(Json.toJson(rets)))
-        }
-      } catch {
-        case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e"))
-        case e: Exception =>
-          logger.error(s"mutateAndPublish: $e", e)
-          Future.successful(InternalServerError(s"${e.getStackTrace}"))
-      }
-    }
-  }
-  def mutateAndPublish(str: String, withWait: Boolean = false): Future[Result] = {
-    if (!Config.IS_WRITE_SERVER) Future.successful(Unauthorized)
-    logger.debug(s"$str")
-    val edgeStrs = str.split("\\n")
-    var vertexCnt = 0L
-    var edgeCnt = 0L
-    try {
-      val elements =
-        for (edgeStr <- edgeStrs; str <- GraphUtil.parseString(edgeStr); element <- Graph.toGraphElement(str)) yield {
-          element match {
-            case v: Vertex => vertexCnt += 1
-            case e: Edge => edgeCnt += 1
-          }
-          if (element.isAsync) {
-            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC_ASYNC, element, Some(str)))
-          } else {
-            ExceptionHandler.enqueue(toKafkaMessage(Config.KAFKA_LOG_TOPIC, element, Some(str)))
-          }
-          element
-        }
-      //FIXME:
-      val elementsToStore = elements.filterNot(e => e.isAsync)
-      if (withWait) {
-        val rets = s2.mutateElements(elementsToStore, withWait)
-      } else {
-        val rets = { element => QueueActor.router ! element; true }
-        Future.successful(jsonResponse(Json.toJson(rets)))
-      }
-    } catch {
-      case e: GraphExceptions.JsonParseException => Future.successful(BadRequest(s"$e"))
-      case e: Throwable =>
-        logger.error(s"mutateAndPublish: $e", e)
-        Future.successful(InternalServerError(s"${e.getStackTrace}"))
-    }
-  }
-  def mutateBulk() = withHeaderAsync(parse.text) { request =>
-    mutateAndPublish(request.body)
-  }
-  def inserts() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insert")
-  }
-  def insertsWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insert", withWait = true)
-  }
-  def insertsBulk() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "insertBulk")
-  }
-  def deletes() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "delete")
-  }
-  def deletesWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "delete", withWait = true)
-  }
-  def updates() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "update")
-  }
-  def updatesWithWait() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "update", withWait = true)
-  }
-  def increments() = withHeaderAsync(jsonParser) { request =>
-    tryMutates(request.body, "increment")
-  }
-  def incrementCounts() = withHeaderAsync(jsonParser) { request =>
-    val jsValue = request.body
-    val edges = toEdges(jsValue, "incrementCount")
-    s2.incrementCounts(edges).map { results =>
-      val json = { case (isSuccess, resultCount) =>
-        Json.obj("success" -> isSuccess, "result" -> resultCount)
-      }
-      jsonResponse(Json.toJson(json))
-    }
-  }
-  def deleteAll() = withHeaderAsync(jsonParser) { request =>
-    deleteAllInner(request.body, withWait = false)
-  }
-  def deleteAllInner(jsValue: JsValue, withWait: Boolean) = {
-    val deleteResults = Future.sequence([Seq[JsValue]] map { json =>
-      val labelName = (json \ "label").as[String]
-      val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil)
-      val direction = (json \ "direction").asOpt[String].getOrElse("out")
-      val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil)
-      val ts = (json \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis())
-      val vertices = toVertices(labelName, direction, ids)
-      /** logging for delete all request */
-      val kafkaMessages = for {
-        id <- ids
-        label <- labels
-      } yield {
-          val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", direction).mkString("\t")
-          val topic = if (label.isAsync) Config.KAFKA_LOG_TOPIC_ASYNC else Config.KAFKA_LOG_TOPIC
-          val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv))
-          kafkaMsg
-        }
-      ExceptionHandler.enqueues(kafkaMessages)
-      val future = s2.deleteAllAdjacentEdges(vertices.toList, labels, GraphUtil.directions(direction), ts)
-      future.onFailure { case ex: Exception =>
-        logger.error(s"[Error]: deleteAllInner failed.", ex)
-        val kafkaMessages = for {
-          id <- ids
-          label <- labels
-        } yield {
-            val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", direction).mkString("\t")
-            val topic = failTopic
-            val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv))
-            kafkaMsg
-          }
-        ExceptionHandler.enqueues(kafkaMessages)
-        throw ex
-      }
-      if (withWait) {
- { ret =>
-          if (!ret) {
-            logger.error(s"[Error]: deleteAllInner failed.")
-            val kafkaMessages = for {
-              id <- ids
-              label <- labels
-            } yield {
-                val tsv = Seq(ts, "deleteAll", "e", id, id, label.label, "{}", direction).mkString("\t")
-                val topic = failTopic
-                val kafkaMsg = KafkaMessage(new ProducerRecord[Key, Val](topic, null, tsv))
-                kafkaMsg
-              }
-            ExceptionHandler.enqueues(kafkaMessages)
-            false
-          } else {
-            true
-          }
-        }
-      } else {
-        Future.successful(true)
-      }
-    })
- { rst =>
-      logger.debug(s"deleteAllInner: $rst")
-      Ok(s"deleted... ${rst.toString()}")
-    }
-  }
diff --git a/app/controllers/ExperimentController.scala b/app/controllers/ExperimentController.scala
deleted file mode 100644
index 41614ef..0000000
--- a/app/controllers/ExperimentController.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-package controllers
-import com.kakao.s2graph.core.mysqls._
-import com.kakao.s2graph.core.utils.logger
-import play.api.Play.current
-import play.api.libs.json.{JsObject, JsString, JsValue, Json}
-import play.api.mvc._
-import scala.concurrent.Future
-object ExperimentController extends Controller with RequestParser {
-  val impressionKey = "S2-Impression-Id"
-  import ApplicationController._
-  def experiment(accessToken: String, experimentName: String, uuid: String) = withHeaderAsync(parse.anyContent) { request =>
-    val bucketOpt = for {
-      service <- Service.findByAccessToken(accessToken)
-      experiment <- Experiment.findBy(, experimentName)
-      bucket <- experiment.findBucket(uuid)
-    } yield bucket
-    bucketOpt match {
-      case None => Future.successful(NotFound("bucket is not found."))
-      case Some(bucket) =>
-        try {
-          if (bucket.isGraphQuery) buildRequestInner(request, bucket, uuid)
-          else buildRequest(request, bucket, uuid)
-        } catch {
-          case e: Exception =>
-            logger.error(e.toString())
-            Future.successful(BadRequest(s"wrong or missing template parameter: ${e.getMessage}"))
-        }
-    }
-  }
-  def makeRequestJson(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): JsValue = {
-    var body = bucket.requestBody.replace("#uuid", uuid)
-    for {
-      requestKeyJson <- requestKeyJsonOpt
-      jsObj <- requestKeyJson.asOpt[JsObject]
-      (key, value) <- jsObj.fieldSet
-    } {
-      val replacement = value match {
-        case JsString(s) => s
-        case _ => value.toString
-      }
-      body = body.replace(key, replacement)
-    }
-    Json.parse(body)
-  }
-  private def buildRequestInner(request: Request[AnyContent], bucket: Bucket, uuid: String): Future[Result] = {
-    if (bucket.isEmpty) Future.successful(Ok(Json.obj("isEmpty" -> true)).withHeaders(impressionKey -> bucket.impressionId))
-    else {
-      val jsonBody = makeRequestJson(request.body.asJson, bucket, uuid)
-      val url = new URL(bucket.apiPath)
-      val path = url.getPath()
-      // dummy log for sampling
-      val experimentLog = s"POST $path took -1 ms 200 -1 $jsonBody"
-      val response = path match {
-        case "/graphs/getEdges" => controllers.QueryController.getEdgesInner(jsonBody)
-        case "/graphs/getEdges/grouped" => controllers.QueryController.getEdgesWithGroupingInner(jsonBody)
-        case "/graphs/getEdgesExcluded" => controllers.QueryController.getEdgesExcludedInner(jsonBody)
-        case "/graphs/getEdgesExcluded/grouped" => controllers.QueryController.getEdgesExcludedWithGroupingInner(jsonBody)
-        case "/graphs/checkEdges" => controllers.QueryController.checkEdgesInner(jsonBody)
-        case "/graphs/getEdgesGrouped" => controllers.QueryController.getEdgesGroupedInner(jsonBody)
-        case "/graphs/getEdgesGroupedExcluded" => controllers.QueryController.getEdgesGroupedExcludedInner(jsonBody)
-        case "/graphs/getEdgesGroupedExcludedFormatted" => controllers.QueryController.getEdgesGroupedExcludedFormattedInner(jsonBody)
-      }
- { r => r.withHeaders(impressionKey -> bucket.impressionId) }
-    }
-  }
-  private def toSimpleMap(map: Map[String, Seq[String]]): Map[String, String] = {
-    for {
-      (k, vs) <- map
-      headVal <- vs.headOption
-    } yield {
-      k -> headVal
-    }
-  }
-  private def buildRequest(request: Request[AnyContent], bucket: Bucket, uuid: String): Future[Result] = {
-    val jsonBody = makeRequestJson(request.body.asJson, bucket, uuid)
-    val url = bucket.apiPath
-    val headers = request.headers.toSimpleMap.toSeq
-    val verb = bucket.httpVerb.toUpperCase
-    val qs = toSimpleMap(request.queryString).toSeq
-    val ws = WS.url(url)
-      .withMethod(verb)
-      .withBody(jsonBody)
-      .withHeaders(headers: _*)
-      .withQueryString(qs: _*)
- {
-      case (proxyResponse, proxyBody) =>
-        Result(ResponseHeader(proxyResponse.status, proxyResponse.headers.mapValues(_.toList.head)), proxyBody).withHeaders(impressionKey -> bucket.impressionId)
-    }
-  }
diff --git a/app/controllers/JsonBodyParser.scala b/app/controllers/JsonBodyParser.scala
deleted file mode 100644
index 3e3d40c..0000000
--- a/app/controllers/JsonBodyParser.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-package controllers
-import com.kakao.s2graph.core.utils.logger
-import play.api.Play
-import play.api.libs.iteratee.Iteratee
-import play.api.libs.json.{JsValue, Json}
-import play.api.mvc._
-import scala.concurrent.Future
-import scala.util.control.NonFatal
- * Created by hsleep( on 15. 9. 1..
- */
-object s2parse extends BodyParsers {
-  import parse._
-  val defaultMaxTextLength = 1024 * 512
-  val defaultMaxJsonLength = 1024 * 512
-//  def json: BodyParser[JsValue] = json(DEFAULT_MAX_TEXT_LENGTH)
-  def json: BodyParser[JsValue] = json(defaultMaxTextLength)
-  def json(maxLength: Int): BodyParser[JsValue] = when(
-    _.contentType.exists(m => m.equalsIgnoreCase("text/json") || m.equalsIgnoreCase("application/json")),
-    tolerantJson(maxLength),
-    createBadResult("Expecting text/json or application/json body")
-  )
-  def tolerantJson(maxLength: Int): BodyParser[JsValue] =
-    tolerantBodyParser[JsValue]("json", maxLength, "Invalid Json") { (request, bytes) =>
-      // Encoding notes: RFC 4627 requires that JSON be encoded in Unicode, and states that whether that's
-      // UTF-8, UTF-16 or UTF-32 can be auto detected by reading the first two bytes. So we ignore the declared
-      // charset and don't decode, we passing the byte array as is because Jackson supports auto detection.
-      Json.parse(bytes)
-    }
-  private def tolerantBodyParser[A](name: String, maxLength: Int, errorMessage: String)(parser: (RequestHeader, Array[Byte]) => A): BodyParser[A] =
-    BodyParser(name + ", maxLength=" + maxLength) { request =>
-      import play.api.libs.iteratee.Execution.Implicits.trampoline
-      import play.api.libs.iteratee.Traversable
-      import scala.util.control.Exception._
-      val bodyParser: Iteratee[Array[Byte], Either[Result, Either[Future[Result], A]]] =
-        Traversable.takeUpTo[Array[Byte]](maxLength).transform(
-          Iteratee.consume[Array[Byte]]().map { bytes =>
-            allCatch[A].either {
-              parser(request, bytes)
-            } {
-              case NonFatal(e) =>
-                val txt = new String(bytes)
-                logger.error(s"$errorMessage: $txt", e)
-                createBadResult(s"$errorMessage: $e")(request)
-              case t => throw t
-            }
-          }
-        ).flatMap(Iteratee.eofOrElse(Results.EntityTooLarge))
-      bodyParser.mapM {
-        case Left(tooLarge) => Future.successful(Left(tooLarge))
-        case Right(Left(badResult)) =>
-        case Right(Right(body)) => Future.successful(Right(body))
-      }
-    }
-  private def createBadResult(msg: String): RequestHeader => Future[Result] = { request =>
-, msg))
-      .getOrElse(Future.successful(Results.BadRequest))
-  }