You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2019/01/25 22:48:55 UTC

[03/20] incubator-s2graph git commit: merge S2GRAPH-249

merge S2GRAPH-249


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b8ab86dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b8ab86dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b8ab86dd

Branch: refs/heads/master
Commit: b8ab86dd39fd3810afcf44160324fa2dc6f5ee3f
Parents: 4154bbe
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Nov 29 10:51:20 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 29 10:51:20 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/Management.scala    |  44 ++++
 .../org/apache/s2graph/core/schema/Label.scala  |  38 ++--
 .../apache/s2graph/http/S2GraphAdminRoute.scala | 224 ++++++++++++++++++-
 .../s2graph/http/S2GraphMutateRoute.scala       | 119 ++++++++++
 .../scala/org/apache/s2graph/http/Server.scala  |   4 +-
 .../apache/s2graph/http/AdminRouteSpec.scala    |  41 +++-
 .../apache/s2graph/http/MutateRouteSpec.scala   |  52 +++++
 .../rest/play/controllers/AdminController.scala |  32 +--
 8 files changed, 492 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 0c41ee3..78edf80 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -66,18 +66,56 @@ object Management {
   }
 
   object JsonModel {
+    import play.api.libs.functional.syntax._
 
     case class Prop(name: String, defaultValue: String, dataType: String, storeInGlobalIndex: Boolean = false)
 
     object Prop extends ((String, String, String, Boolean) => Prop)
 
     case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None)
+
+    case class HTableParams(cluster: String, hTableName: String,
+                            preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: Option[String]) {
+
+      override def toString(): String = {
+        s"""HtableParams
+           |-- cluster : $cluster
+           |-- hTableName : $hTableName
+           |-- preSplitSize : $preSplitSize
+           |-- hTableTTL : $hTableTTL
+           |-- compressionAlgorithm : $compressionAlgorithm
+           |""".stripMargin
+      }
+    }
+
+    implicit object HTableParamsJsonConverter extends Format[HTableParams] {
+      def reads(json: JsValue): JsResult[HTableParams] = (
+        (__ \ "cluster").read[String] and
+          (__ \ "hTableName").read[String] and
+          (__ \ "preSplitSize").read[Int] and
+          (__ \ "hTableTTL").readNullable[Int] and
+          (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply _).reads(json)
+
+      def writes(o: HTableParams): JsValue = Json.obj(
+        "cluster" -> o.cluster,
+        "hTableName" -> o.hTableName,
+        "preSplitSize" -> o.preSplitSize,
+        "hTableTTL" -> o.hTableTTL,
+        "compressionAlgorithm" -> o.compressionAlgorithm
+      )
+    }
   }
 
   def findService(serviceName: String) = {
     Service.findByName(serviceName, useCache = false)
   }
 
+  def findServiceColumn(serviceName: String, columnName: String): Option[ServiceColumn] = {
+    Service.findByName(serviceName, useCache = false).flatMap { service =>
+      ServiceColumn.find(service.id.get, columnName, useCache = false)
+    }
+  }
+
   def deleteService(serviceName: String) = {
     Service.findByName(serviceName).foreach { service =>
       //      service.deleteAll()
@@ -133,6 +171,12 @@ object Management {
     Label.findByName(labelName, useCache = useCache)
   }
 
+  def findLabels(serviceName: String, useCache: Boolean = false): Seq[Label] = {
+    Service.findByName(serviceName, useCache = useCache).map { service =>
+      Label.findBySrcServiceId(service.id.get, useCache = useCache)
+    }.getOrElse(Nil)
+  }
+
   def deleteLabel(labelName: String): Try[Label] = {
     Schema withTx { implicit session =>
       val label = Label.findByName(labelName, useCache = false).getOrElse(throw GraphExceptions.LabelNotExistException(labelName))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
index a359958..2780475 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
@@ -126,44 +126,50 @@ object Label extends SQLSyntaxSupport[Label] {
     else sql.get
   }
 
-  def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
+  def findByTgtColumnId(columnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = {
     val cacheKey = className + "tgtColumnId=" + columnId
     val col = ServiceColumn.findById(columnId)
-    withCaches(cacheKey)(
-      sql"""
+    lazy val sql = sql"""
           select	*
           from	labels
           where	tgt_column_name = ${col.columnName}
           and service_id = ${col.serviceId}
           and deleted_at is null
-        """.map { rs => Label(rs) }.list().apply())
+        """.map { rs => Label(rs) }.list().apply()
+
+    if (useCache) withCaches(cacheKey)(sql)
+    else sql
   }
 
-  def findBySrcColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
+  def findBySrcColumnId(columnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = {
     val cacheKey = className + "srcColumnId=" + columnId
     val col = ServiceColumn.findById(columnId)
-    withCaches(cacheKey)(
-      sql"""
+    lazy val sql = sql"""
           select 	*
           from	labels
           where	src_column_name = ${col.columnName}
           and service_id = ${col.serviceId}
           and deleted_at is null
-        """.map { rs => Label(rs) }.list().apply())
+        """.map { rs => Label(rs) }.list().apply()
+
+    if (useCache) withCaches(cacheKey)(sql)
+    else sql
   }
 
-  def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
+  def findBySrcServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = {
     val cacheKey = className + "srcServiceId=" + serviceId
-    withCaches(cacheKey)(
-      sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
-    )
+    lazy val sql = sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
+
+    if (useCache) withCaches(cacheKey)(sql)
+    else sql
   }
 
-  def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
+  def findByTgtServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = {
     val cacheKey = className + "tgtServiceId=" + serviceId
-    withCaches(cacheKey)(
-      sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
-    )
+    lazy val sql = sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
+
+    if (useCache) withCaches(cacheKey)(sql)
+    else sql
   }
 
   def insertAll(labelName: String, srcServiceName: String, srcColumnName: String, srcColumnType: String,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala
----------------------------------------------------------------------
diff --git a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala
index 8efd2ab..9bf7eb4 100644
--- a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala
+++ b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala
@@ -3,10 +3,10 @@ package org.apache.s2graph.http
 import akka.http.scaladsl.model._
 import org.apache.s2graph.core.rest.RequestParser
 import org.apache.s2graph.core.{Management, S2Graph}
-
 import akka.http.scaladsl.server.Route
 import akka.http.scaladsl.server.Directives._
-
+import org.apache.s2graph.core.Management.JsonModel.HTableParams
+import org.apache.s2graph.core.schema._
 import org.slf4j.LoggerFactory
 import play.api.libs.json._
 
@@ -53,6 +53,16 @@ object S2GraphAdminRoute {
 
     toHttpEntity(Option(res), status = status)
   }
+
+  def toHttpEntity[A: AdminMessageFormatter](ls: Seq[A], status: StatusCode = StatusCodes.OK): HttpResponse = {
+    val ev = implicitly[AdminMessageFormatter[A]]
+    val res = ls.map(ev.toJson)
+
+    HttpResponse(
+      status = status,
+      entity = HttpEntity(ContentTypes.`application/json`, res.toString)
+    )
+  }
 }
 
 trait S2GraphAdminRoute extends PlayJsonSupport {
@@ -66,18 +76,40 @@ trait S2GraphAdminRoute extends PlayJsonSupport {
   lazy val requestParser: RequestParser = new RequestParser(s2graph)
 
   // routes impl
+  /* GET */
+  //  GET /graphs/getService/:serviceName
+  lazy val getService = path("getService" / Segment) { serviceName =>
+    val serviceOpt = Management.findService(serviceName)
+
+    complete(toHttpEntity(serviceOpt, message = s"Service not found: ${serviceName}"))
+  }
+
+  //  GET /graphs/getServiceColumn/:serviceName/:columnName
+  lazy val getServiceColumn = path("getServiceColumn" / Segments) { params =>
+    val (serviceName, columnName) = params match {
+      case s :: c :: Nil => (s, c)
+    }
+
+    val ret = Management.findServiceColumn(serviceName, columnName)
+    complete(toHttpEntity(ret, message = s"ServiceColumn not found: ${serviceName}, ${columnName}"))
+  }
+
+  //  GET /graphs/getLabel/:labelName
   lazy val getLabel = path("getLabel" / Segment) { labelName =>
     val labelOpt = Management.findLabel(labelName)
 
     complete(toHttpEntity(labelOpt, message = s"Label not found: ${labelName}"))
   }
 
-  lazy val getService = path("getService" / Segment) { serviceName =>
-    val serviceOpt = Management.findService(serviceName)
+  //  GET /graphs/getLabels/:serviceName
+  lazy val getLabels = path("getLabels" / Segment) { serviceName =>
+    val ret = Management.findLabels(serviceName)
 
-    complete(toHttpEntity(serviceOpt, message = s"Service not found: ${serviceName}"))
+    complete(toHttpEntity(ret))
   }
 
+  /* POST */
+  //  POST /graphs/createService
   lazy val createService = path("createService") {
     entity(as[JsValue]) { params =>
 
@@ -91,28 +123,204 @@ trait S2GraphAdminRoute extends PlayJsonSupport {
     }
   }
 
+  //  POST /graphs/createServiceColumn
+  lazy val createServiceColumn = path("createServiceColumn") {
+    entity(as[JsValue]) { params =>
+
+      val parseTry = requestParser.toServiceColumnElements(params)
+      val serviceColumnTry = for {
+        (serviceName, columnName, columnType, props) <- parseTry
+        serviceColumn <- Try(management.createServiceColumn(serviceName, columnName, columnType, props))
+      } yield serviceColumn
+
+      complete(toHttpEntity(serviceColumnTry))
+    }
+  }
+
+  //  POST /graphs/createLabel
   lazy val createLabel = path("createLabel") {
     entity(as[JsValue]) { params =>
+      val labelTry = requestParser.toLabelElements(params)
+
+      complete(toHttpEntity(labelTry))
+    }
+  }
 
+  //  POST /graphs/addIndex
+  lazy val addIndex = path("addIndex") {
+    entity(as[JsValue]) { params =>
       val labelTry = for {
-        label <- requestParser.toLabelElements(params)
+        (labelName, indices) <- requestParser.toIndexElements(params)
+        label <- Management.addIndex(labelName, indices)
       } yield label
 
       complete(toHttpEntity(labelTry))
     }
   }
 
+  //  POST /graphs/addProp/:labelName
+  lazy val addProp = path("addProp" / Segment) { labelName =>
+    entity(as[JsValue]) { params =>
+      val labelMetaTry = for {
+        prop <- requestParser.toPropElements(params)
+        labelMeta <- Management.addProp(labelName, prop)
+      } yield labelMeta
+
+      complete(toHttpEntity(labelMetaTry))
+    }
+  }
+
+  //  POST /graphs/addServiceColumnProp/:serviceName/:columnName
+  lazy val addServiceColumnProp = path("addServiceColumnProp" / Segments) { params =>
+    val (serviceName, columnName, storeInGlobalIndex) = params match {
+      case s :: c :: Nil => (s, c, false)
+      case s :: c :: i :: Nil => (s, c, i.toBoolean)
+    }
+
+    entity(as[JsValue]) { params =>
+      val columnMetaOpt = for {
+        service <- Service.findByName(serviceName)
+        serviceColumn <- ServiceColumn.find(service.id.get, columnName)
+        prop <- requestParser.toPropElements(params).toOption
+      } yield {
+        ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, prop.dataType, prop.defaultValue, storeInGlobalIndex)
+      }
+
+      complete(toHttpEntity(columnMetaOpt, message = s"can`t find service with $serviceName or can`t find serviceColumn with $columnName"))
+    }
+  }
+
+  //  POST /graphs/createHTable
+  lazy val createHTable = path("createHTable") {
+    entity(as[JsValue]) { params =>
+      params.validate[HTableParams] match {
+        case JsSuccess(hTableParams, _) => {
+          management.createStorageTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"),
+            hTableParams.preSplitSize, hTableParams.hTableTTL,
+            hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm))
+
+          complete(toHttpEntity(None, status = StatusCodes.OK, message = "created"))
+        }
+        case err@JsError(_) => complete(toHttpEntity(None, status = StatusCodes.BadRequest, message = Json.toJson(err).toString))
+      }
+    }
+  }
+
+  //  POST /graphs/copyLabel/:oldLabelName/:newLabelName
+  lazy val copyLabel = path("copyLabel" / Segments) { params =>
+    val (oldLabelName, newLabelName) = params match {
+      case oldLabel :: newLabel :: Nil => (oldLabel, newLabel)
+    }
+
+    val copyTry = management.copyLabel(oldLabelName, newLabelName, Some(newLabelName))
+
+    complete(toHttpEntity(copyTry))
+  }
+
+  //  POST /graphs/renameLabel/:oldLabelName/:newLabelName
+  lazy val renameLabel = path("renameLabel" / Segments) { params =>
+    val (oldLabelName, newLabelName) = params match {
+      case oldLabel :: newLabel :: Nil => (oldLabel, newLabel)
+    }
+
+    Label.findByName(oldLabelName) match {
+      case None => complete(toHttpEntity(None, status = StatusCodes.NotFound, message = s"Label $oldLabelName not found."))
+      case Some(label) =>
+        Management.updateLabelName(oldLabelName, newLabelName)
+
+        complete(toHttpEntity(None, message = s"${label} was updated."))
+    }
+  }
+
+  //  POST /graphs/swapLabels/:leftLabelName/:rightLabelName
+  lazy val swapLabel = path("swapLabel" / Segments) { params =>
+    val (leftLabelName, rightLabelName) = params match {
+      case left :: right :: Nil => (left, right)
+    }
+
+    val left = Label.findByName(leftLabelName, useCache = false)
+    val right = Label.findByName(rightLabelName, useCache = false)
+    // verify same schema
+
+    (left, right) match {
+      case (Some(l), Some(r)) =>
+        Management.swapLabelNames(leftLabelName, rightLabelName)
+
+        complete(toHttpEntity(None, message = s"Labels were swapped."))
+      case _ =>
+        complete(toHttpEntity(None, status = StatusCodes.NotFound, message = s"Label ${leftLabelName} or ${rightLabelName} not found."))
+    }
+  }
+
+  //  POST /graphs/updateHTable/:labelName/:newHTableName
+  lazy val updateHTable = path("updateHTable" / Segments) { params =>
+    val (labelName, newHTableName) = params match {
+      case l :: h :: Nil => (l, h)
+    }
+
+    val updateTry = Management.updateHTable(labelName, newHTableName)
+
+    complete(toHttpEntity(updateTry))
+  }
+
+  /* PUT */
+  //  PUT /graphs/deleteLabelReally/:labelName
+  lazy val deleteLabelReally = path("deleteLabelReally" / Segment) { labelName =>
+    val ret = Management.deleteLabel(labelName).toOption
+
+    complete(toHttpEntity(ret, message = s"Label not found: ${labelName}"))
+  }
+
+  //  PUT /graphs/markDeletedLabel/:labelName
+  lazy val markDeletedLabel = path("markDeletedLabel" / Segment) { labelName =>
+    val ret = Management.markDeletedLabel(labelName).toOption
+
+    complete(toHttpEntity(ret, message = s"Label not found: ${labelName}"))
+  }
+
+  //  PUT /graphs/deleteServiceColumn/:serviceName/:columnName
+  lazy val deleteServiceColumn = path("deleteServiceColumn" / Segments) { params =>
+    val (serviceName, columnName) = params match {
+      case s :: c :: Nil => (s, c)
+    }
+
+    val ret = Management.deleteColumn(serviceName, columnName).toOption
+
+    complete(toHttpEntity(ret, message = s"ServiceColumn not found: ${serviceName}, ${columnName}"))
+  }
+
+  //  TODO:
+  // delete service?
+  //  PUT /graphs/loadCache
+
   // expose routes
   lazy val adminRoute: Route =
     get {
       concat(
         getService,
-        getLabel
+        getServiceColumn,
+        getLabel,
+        getLabels
       )
     } ~ post {
       concat(
         createService,
-        createLabel
+        createServiceColumn,
+        createLabel,
+        addIndex,
+        addProp,
+        addServiceColumnProp,
+        createHTable,
+        copyLabel,
+        renameLabel,
+        swapLabel,
+        updateHTable
+      )
+    } ~ put {
+      concat(
+        deleteLabelReally,
+        markDeletedLabel,
+        deleteServiceColumn
       )
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala
----------------------------------------------------------------------
diff --git a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala
new file mode 100644
index 0000000..fc3b768
--- /dev/null
+++ b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala
@@ -0,0 +1,119 @@
+package org.apache.s2graph.http
+
+import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpResponse, StatusCodes}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.{ExceptionHandler, Route}
+import com.fasterxml.jackson.core.JsonParseException
+import org.apache.s2graph.core.rest.RequestParser
+import org.apache.s2graph.core.storage.MutateResponse
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.slf4j.LoggerFactory
+import play.api.libs.json.{JsValue, Json}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait S2GraphMutateRoute {
+
+  val s2graph: S2Graph
+  val logger = LoggerFactory.getLogger(this.getClass)
+  lazy val parser = new RequestParser(s2graph)
+
+  //  lazy val requestParser = new RequestParser(s2graph)
+  lazy val exceptionHandler = ExceptionHandler {
+    case ex: JsonParseException =>
+      complete(StatusCodes.BadRequest -> ex.getMessage)
+    case ex: java.lang.IllegalArgumentException =>
+      complete(StatusCodes.BadRequest -> ex.getMessage)
+  }
+
+  lazy val mutateVertex = path("vertex" / Segments) { params =>
+    val (operation, serviceNameOpt, columnNameOpt) = params match {
+      case operation :: serviceName :: columnName :: Nil =>
+        (operation, Option(serviceName), Option(columnName))
+      case operation :: Nil =>
+        (operation, None, None)
+    }
+
+    entity(as[String]) { body =>
+      val payload = Json.parse(body)
+
+      implicit val ec = s2graph.ec
+
+      val future = vertexMutate(payload, operation, serviceNameOpt, columnNameOpt).map { mutateResponses =>
+        HttpResponse(
+          status = StatusCodes.OK,
+          entity = HttpEntity(ContentTypes.`application/json`, Json.toJson(mutateResponses).toString)
+        )
+      }
+
+      complete(future)
+    }
+  }
+
+  lazy val mutateEdge = path("edge" / Segment) { operation =>
+    entity(as[String]) { body =>
+      val payload = Json.parse(body)
+
+      implicit val ec = s2graph.ec
+
+      val future = edgeMutate(payload, operation, withWait = true).map { mutateResponses =>
+        HttpResponse(
+          status = StatusCodes.OK,
+          entity = HttpEntity(ContentTypes.`application/json`, Json.toJson(mutateResponses).toString)
+        )
+      }
+
+      complete(future)
+    }
+  }
+
+  def vertexMutate(jsValue: JsValue,
+                   operation: String,
+                   serviceNameOpt: Option[String] = None,
+                   columnNameOpt: Option[String] = None,
+                   withWait: Boolean = true)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+    val vertices = parser.toVertices(jsValue, operation, serviceNameOpt, columnNameOpt)
+
+    val verticesToStore = vertices.filterNot(_.isAsync)
+
+    s2graph.mutateVertices(verticesToStore, withWait).map(_.map(_.isSuccess))
+  }
+
+  def edgeMutate(elementsWithTsv: Seq[(GraphElement, String)],
+                 withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+    val elementWithIdxs = elementsWithTsv.zipWithIndex
+
+    val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) =>
+      !element.isAsync
+    }
+    val retToSkip = elementAsync.map(_._2 -> MutateResponse.Success)
+    val (elementsToStore, _) = elementSync.map(_._1).unzip
+    val elementsIdxToStore = elementSync.map(_._2)
+
+    s2graph.mutateElements(elementsToStore, withWait).map { mutateResponses =>
+      elementsIdxToStore.zip(mutateResponses) ++ retToSkip
+    }.map(_.sortBy(_._1).map(_._2.isSuccess))
+  }
+
+  def edgeMutate(jsValue: JsValue,
+                 operation: String,
+                 withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+    val edgesWithTsv = parser.parseJsonFormat(jsValue, operation)
+    edgeMutate(edgesWithTsv, withWait)
+  }
+
+  // expose routes
+  lazy val mutateRoute: Route =
+    post {
+      concat(
+        handleExceptions(exceptionHandler) {
+          mutateVertex
+        },
+        handleExceptions(exceptionHandler) {
+          mutateEdge
+        }
+      )
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/main/scala/org/apache/s2graph/http/Server.scala
----------------------------------------------------------------------
diff --git a/s2http/src/main/scala/org/apache/s2graph/http/Server.scala b/s2http/src/main/scala/org/apache/s2graph/http/Server.scala
index 82d2343..c26e314 100644
--- a/s2http/src/main/scala/org/apache/s2graph/http/Server.scala
+++ b/s2http/src/main/scala/org/apache/s2graph/http/Server.scala
@@ -35,7 +35,8 @@ import org.slf4j.LoggerFactory
 
 object Server extends App
   with S2GraphTraversalRoute
-  with S2GraphAdminRoute {
+  with S2GraphAdminRoute
+  with S2GraphMutateRoute {
 
   implicit val system: ActorSystem = ActorSystem("S2GraphHttpServer")
   implicit val materializer: ActorMaterializer = ActorMaterializer()
@@ -54,6 +55,7 @@ object Server extends App
   // Allows you to determine routes to expose according to external settings.
   lazy val routes: Route = concat(
     pathPrefix("graphs")(traversalRoute),
+    pathPrefix("mutate")(mutateRoute),
     pathPrefix("admin")(adminRoute),
     get(complete(health))
   )

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala
----------------------------------------------------------------------
diff --git a/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala b/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala
index 9239598..eade7e6 100644
--- a/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala
+++ b/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala
@@ -4,17 +4,16 @@ import akka.http.scaladsl.marshalling.Marshal
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.testkit.ScalatestRouteTest
 import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.core.Management.JsonModel.Prop
 import org.apache.s2graph.core.S2Graph
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
 import org.slf4j.LoggerFactory
 import play.api.libs.json.{JsString, JsValue, Json}
 
-class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with ScalatestRouteTest with S2GraphAdminRoute with BeforeAndAfterAll with PlayJsonSupport {
-
+class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with ScalatestRouteTest with S2GraphAdminRoute with BeforeAndAfterAll {
   val config = ConfigFactory.load()
   val s2graph = new S2Graph(config)
-
   override val logger = LoggerFactory.getLogger(this.getClass)
 
   override def afterAll(): Unit = {
@@ -23,14 +22,17 @@ class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with Scal
 
   lazy val routes = adminRoute
 
+  val serviceName = "kakaoFavorites"
+  val columnName = "userName"
+
   "AdminRoute" should {
     "be able to create service (POST /createService)" in {
       val serviceParam = Json.obj(
-        "serviceName" -> "kakaoFavorites",
+        "serviceName" -> serviceName,
         "compressionAlgorithm" -> "gz"
       )
 
-      val serviceEntity = Marshal(serviceParam).to[MessageEntity].futureValue
+      val serviceEntity = Marshal(serviceParam.toString).to[MessageEntity].futureValue
       val request = Post("/createService").withEntity(serviceEntity)
 
       request ~> routes ~> check {
@@ -45,7 +47,7 @@ class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with Scal
     }
 
     "return service if present (GET /getService/{serviceName})" in {
-      val request = HttpRequest(uri = "/getService/kakaoFavorites")
+      val request = HttpRequest(uri = s"/getService/$serviceName")
 
       request ~> routes ~> check {
         status should ===(StatusCodes.OK)
@@ -56,6 +58,33 @@ class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with Scal
         (response \\ "name").head should ===(JsString("kakaoFavorites"))
       }
     }
+
+    "be able to create serviceColumn (POST /createServiceColumn)" in {
+      val serviceColumnParam = Json.obj(
+        "serviceName" -> serviceName,
+        "columnName" -> columnName,
+        "columnType" -> "string",
+        "props" -> Json.toJson(
+          Seq(
+            Json.obj("name" -> "age", "defaultValue" -> "-1", "dataType" -> "integer")
+          )
+        )
+      )
+
+      val serviceColumnEntity = Marshal(serviceColumnParam.toString).to[MessageEntity].futureValue
+      val request = Post("/createServiceColumn").withEntity(serviceColumnEntity)
+
+      request ~> routes ~> check {
+        status should ===(StatusCodes.Created)
+        contentType should ===(ContentTypes.`application/json`)
+
+        val response = entityAs[JsValue]
+
+        (response \\ "serviceName").head should ===(JsString("kakaoFavorites"))
+        (response \\ "columnName").head should ===(JsString("userName"))
+        (response \\ "status").head should ===(JsString("ok"))
+      }
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala
----------------------------------------------------------------------
diff --git a/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala b/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala
new file mode 100644
index 0000000..f823cd5
--- /dev/null
+++ b/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala
@@ -0,0 +1,52 @@
+package org.apache.s2graph.http
+
+import akka.http.scaladsl.marshalling.Marshal
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.core.S2Graph
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
+import org.slf4j.LoggerFactory
+import play.api.libs.json.{JsValue, Json}
+
+class MutateRouteSpec extends WordSpec with Matchers with PlayJsonSupport with ScalaFutures with ScalatestRouteTest with S2GraphMutateRoute with BeforeAndAfterAll {
+  val config = ConfigFactory.load()
+  val s2graph = new S2Graph(config)
+  override val logger = LoggerFactory.getLogger(this.getClass)
+
+  override def afterAll(): Unit = {
+    s2graph.shutdown()
+  }
+
+  lazy val routes = mutateRoute
+
+  val serviceName = "kakaoFavorites"
+  val columnName = "userName"
+
+  "MutateRoute" should {
+    "be able to insert vertex (POST /mutate/vertex/insert)" in {
+      //      {"timestamp": 10, "serviceName": "s2graph", "columnName": "user", "id": 1, "props": {}}
+      val param = Json.obj(
+        "timestamp" -> 10,
+        "serviceName" -> serviceName,
+        "columnName" -> columnName,
+        "id" -> "user_a",
+        "props" -> Json.obj(
+          "age" -> 20
+        )
+      )
+      val entity = Marshal(param.toString).to[MessageEntity].futureValue
+      val request = Post("/vertex/insert").withEntity(entity)
+
+      request ~> routes ~> check {
+        status should ===(StatusCodes.OK)
+        contentType should ===(ContentTypes.`application/json`)
+
+        val response = entityAs[JsValue]
+        response should ===(Json.toJson(Seq(true)))
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
index 00702cf..fdd9b63 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
@@ -20,6 +20,7 @@
 package org.apache.s2graph.rest.play.controllers
 
 import org.apache.s2graph.core.Management
+import org.apache.s2graph.core.Management.JsonModel.HTableParams
 import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.rest.RequestParser
 import org.apache.s2graph.core.utils.logger
@@ -392,37 +393,6 @@ object AdminController extends Controller {
     }
   }
 
-  case class HTableParams(cluster: String, hTableName: String,
-    preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: Option[String]) {
-
-    override def toString(): String = {
-      s"""HtableParams
-         |-- cluster : $cluster
-         |-- hTableName : $hTableName
-         |-- preSplitSize : $preSplitSize
-         |-- hTableTTL : $hTableTTL
-         |-- compressionAlgorithm : $compressionAlgorithm
-         |""".stripMargin
-    }
-  }
-
-  implicit object HTableParamsJsonConverter extends Format[HTableParams] {
-    def reads(json: JsValue): JsResult[HTableParams] = (
-    (__ \ "cluster").read[String] and
-    (__ \ "hTableName").read[String] and
-    (__ \ "preSplitSize").read[Int] and
-    (__ \ "hTableTTL").readNullable[Int] and
-      (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply _).reads(json)
-
-    def writes(o: HTableParams): JsValue = Json.obj(
-      "cluster" -> o.cluster,
-      "hTableName" -> o.hTableName,
-      "preSplitSize" -> o.preSplitSize,
-      "hTableTTL" -> o.hTableTTL,
-      "compressionAlgorithm" -> o.compressionAlgorithm
-    )
-  }
-
   implicit object JsErrorJsonWriter extends Writes[JsError] {
     def writes(o: JsError): JsValue = Json.obj(
       "errors" -> JsArray(