You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2019/01/25 22:48:55 UTC
[03/20] incubator-s2graph git commit: merge S2GRAPH-249
merge S2GRAPH-249
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b8ab86dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b8ab86dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b8ab86dd
Branch: refs/heads/master
Commit: b8ab86dd39fd3810afcf44160324fa2dc6f5ee3f
Parents: 4154bbe
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Nov 29 10:51:20 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Nov 29 10:51:20 2018 +0900
----------------------------------------------------------------------
.../org/apache/s2graph/core/Management.scala | 44 ++++
.../org/apache/s2graph/core/schema/Label.scala | 38 ++--
.../apache/s2graph/http/S2GraphAdminRoute.scala | 224 ++++++++++++++++++-
.../s2graph/http/S2GraphMutateRoute.scala | 119 ++++++++++
.../scala/org/apache/s2graph/http/Server.scala | 4 +-
.../apache/s2graph/http/AdminRouteSpec.scala | 41 +++-
.../apache/s2graph/http/MutateRouteSpec.scala | 52 +++++
.../rest/play/controllers/AdminController.scala | 32 +--
8 files changed, 492 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 0c41ee3..78edf80 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -66,18 +66,56 @@ object Management {
}
object JsonModel {
+ import play.api.libs.functional.syntax._
case class Prop(name: String, defaultValue: String, dataType: String, storeInGlobalIndex: Boolean = false)
object Prop extends ((String, String, String, Boolean) => Prop)
case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None)
+
+ case class HTableParams(cluster: String, hTableName: String,
+ preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: Option[String]) {
+
+ override def toString(): String = {
+ s"""HtableParams
+ |-- cluster : $cluster
+ |-- hTableName : $hTableName
+ |-- preSplitSize : $preSplitSize
+ |-- hTableTTL : $hTableTTL
+ |-- compressionAlgorithm : $compressionAlgorithm
+ |""".stripMargin
+ }
+ }
+
+ implicit object HTableParamsJsonConverter extends Format[HTableParams] {
+ def reads(json: JsValue): JsResult[HTableParams] = (
+ (__ \ "cluster").read[String] and
+ (__ \ "hTableName").read[String] and
+ (__ \ "preSplitSize").read[Int] and
+ (__ \ "hTableTTL").readNullable[Int] and
+ (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply _).reads(json)
+
+ def writes(o: HTableParams): JsValue = Json.obj(
+ "cluster" -> o.cluster,
+ "hTableName" -> o.hTableName,
+ "preSplitSize" -> o.preSplitSize,
+ "hTableTTL" -> o.hTableTTL,
+ "compressionAlgorithm" -> o.compressionAlgorithm
+ )
+ }
}
def findService(serviceName: String) = {
Service.findByName(serviceName, useCache = false)
}
+ def findServiceColumn(serviceName: String, columnName: String): Option[ServiceColumn] = {
+ Service.findByName(serviceName, useCache = false).flatMap { service =>
+ ServiceColumn.find(service.id.get, columnName, useCache = false)
+ }
+ }
+
def deleteService(serviceName: String) = {
Service.findByName(serviceName).foreach { service =>
// service.deleteAll()
@@ -133,6 +171,12 @@ object Management {
Label.findByName(labelName, useCache = useCache)
}
+ def findLabels(serviceName: String, useCache: Boolean = false): Seq[Label] = {
+ Service.findByName(serviceName, useCache = useCache).map { service =>
+ Label.findBySrcServiceId(service.id.get, useCache = useCache)
+ }.getOrElse(Nil)
+ }
+
def deleteLabel(labelName: String): Try[Label] = {
Schema withTx { implicit session =>
val label = Label.findByName(labelName, useCache = false).getOrElse(throw GraphExceptions.LabelNotExistException(labelName))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
index a359958..2780475 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
@@ -126,44 +126,50 @@ object Label extends SQLSyntaxSupport[Label] {
else sql.get
}
- def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
+ def findByTgtColumnId(columnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = {
val cacheKey = className + "tgtColumnId=" + columnId
val col = ServiceColumn.findById(columnId)
- withCaches(cacheKey)(
- sql"""
+ lazy val sql = sql"""
select *
from labels
where tgt_column_name = ${col.columnName}
and service_id = ${col.serviceId}
and deleted_at is null
- """.map { rs => Label(rs) }.list().apply())
+ """.map { rs => Label(rs) }.list().apply()
+
+ if (useCache) withCaches(cacheKey)(sql)
+ else sql
}
- def findBySrcColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
+ def findBySrcColumnId(columnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = {
val cacheKey = className + "srcColumnId=" + columnId
val col = ServiceColumn.findById(columnId)
- withCaches(cacheKey)(
- sql"""
+ lazy val sql = sql"""
select *
from labels
where src_column_name = ${col.columnName}
and service_id = ${col.serviceId}
and deleted_at is null
- """.map { rs => Label(rs) }.list().apply())
+ """.map { rs => Label(rs) }.list().apply()
+
+ if (useCache) withCaches(cacheKey)(sql)
+ else sql
}
- def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
+ def findBySrcServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = {
val cacheKey = className + "srcServiceId=" + serviceId
- withCaches(cacheKey)(
- sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
- )
+ lazy val sql = sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
+
+ if (useCache) withCaches(cacheKey)(sql)
+ else sql
}
- def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
+ def findByTgtServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[Label] = {
val cacheKey = className + "tgtServiceId=" + serviceId
- withCaches(cacheKey)(
- sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
- )
+ lazy val sql = sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
+
+ if (useCache) withCaches(cacheKey)(sql)
+ else sql
}
def insertAll(labelName: String, srcServiceName: String, srcColumnName: String, srcColumnType: String,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala
----------------------------------------------------------------------
diff --git a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala
index 8efd2ab..9bf7eb4 100644
--- a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala
+++ b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphAdminRoute.scala
@@ -3,10 +3,10 @@ package org.apache.s2graph.http
import akka.http.scaladsl.model._
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.{Management, S2Graph}
-
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.Directives._
-
+import org.apache.s2graph.core.Management.JsonModel.HTableParams
+import org.apache.s2graph.core.schema._
import org.slf4j.LoggerFactory
import play.api.libs.json._
@@ -53,6 +53,16 @@ object S2GraphAdminRoute {
toHttpEntity(Option(res), status = status)
}
+
+ def toHttpEntity[A: AdminMessageFormatter](ls: Seq[A], status: StatusCode = StatusCodes.OK): HttpResponse = {
+ val ev = implicitly[AdminMessageFormatter[A]]
+ val res = ls.map(ev.toJson)
+
+ HttpResponse(
+ status = status,
+ entity = HttpEntity(ContentTypes.`application/json`, res.toString)
+ )
+ }
}
trait S2GraphAdminRoute extends PlayJsonSupport {
@@ -66,18 +76,40 @@ trait S2GraphAdminRoute extends PlayJsonSupport {
lazy val requestParser: RequestParser = new RequestParser(s2graph)
// routes impl
+ /* GET */
+ // GET /graphs/getService/:serviceName
+ lazy val getService = path("getService" / Segment) { serviceName =>
+ val serviceOpt = Management.findService(serviceName)
+
+ complete(toHttpEntity(serviceOpt, message = s"Service not found: ${serviceName}"))
+ }
+
+ // GET /graphs/getServiceColumn/:serviceName/:columnName
+ lazy val getServiceColumn = path("getServiceColumn" / Segments) { params =>
+ val (serviceName, columnName) = params match {
+ case s :: c :: Nil => (s, c)
+ }
+
+ val ret = Management.findServiceColumn(serviceName, columnName)
+ complete(toHttpEntity(ret, message = s"ServiceColumn not found: ${serviceName}, ${columnName}"))
+ }
+
+ // GET /graphs/getLabel/:labelName
lazy val getLabel = path("getLabel" / Segment) { labelName =>
val labelOpt = Management.findLabel(labelName)
complete(toHttpEntity(labelOpt, message = s"Label not found: ${labelName}"))
}
- lazy val getService = path("getService" / Segment) { serviceName =>
- val serviceOpt = Management.findService(serviceName)
+ // GET /graphs/getLabels/:serviceName
+ lazy val getLabels = path("getLabels" / Segment) { serviceName =>
+ val ret = Management.findLabels(serviceName)
- complete(toHttpEntity(serviceOpt, message = s"Service not found: ${serviceName}"))
+ complete(toHttpEntity(ret))
}
+ /* POST */
+ // POST /graphs/createService
lazy val createService = path("createService") {
entity(as[JsValue]) { params =>
@@ -91,28 +123,204 @@ trait S2GraphAdminRoute extends PlayJsonSupport {
}
}
+ // POST /graphs/createServiceColumn
+ lazy val createServiceColumn = path("createServiceColumn") {
+ entity(as[JsValue]) { params =>
+
+ val parseTry = requestParser.toServiceColumnElements(params)
+ val serviceColumnTry = for {
+ (serviceName, columnName, columnType, props) <- parseTry
+ serviceColumn <- Try(management.createServiceColumn(serviceName, columnName, columnType, props))
+ } yield serviceColumn
+
+ complete(toHttpEntity(serviceColumnTry))
+ }
+ }
+
+ // POST /graphs/createLabel
lazy val createLabel = path("createLabel") {
entity(as[JsValue]) { params =>
+ val labelTry = requestParser.toLabelElements(params)
+
+ complete(toHttpEntity(labelTry))
+ }
+ }
+ // POST /graphs/addIndex
+ lazy val addIndex = path("addIndex") {
+ entity(as[JsValue]) { params =>
val labelTry = for {
- label <- requestParser.toLabelElements(params)
+ (labelName, indices) <- requestParser.toIndexElements(params)
+ label <- Management.addIndex(labelName, indices)
} yield label
complete(toHttpEntity(labelTry))
}
}
+ // POST /graphs/addProp/:labelName
+ lazy val addProp = path("addProp" / Segment) { labelName =>
+ entity(as[JsValue]) { params =>
+ val labelMetaTry = for {
+ prop <- requestParser.toPropElements(params)
+ labelMeta <- Management.addProp(labelName, prop)
+ } yield labelMeta
+
+ complete(toHttpEntity(labelMetaTry))
+ }
+ }
+
+ // POST /graphs/addServiceColumnProp/:serviceName/:columnName
+ lazy val addServiceColumnProp = path("addServiceColumnProp" / Segments) { params =>
+ val (serviceName, columnName, storeInGlobalIndex) = params match {
+ case s :: c :: Nil => (s, c, false)
+ case s :: c :: i :: Nil => (s, c, i.toBoolean)
+ }
+
+ entity(as[JsValue]) { params =>
+ val columnMetaOpt = for {
+ service <- Service.findByName(serviceName)
+ serviceColumn <- ServiceColumn.find(service.id.get, columnName)
+ prop <- requestParser.toPropElements(params).toOption
+ } yield {
+ ColumnMeta.findOrInsert(serviceColumn.id.get, prop.name, prop.dataType, prop.defaultValue, storeInGlobalIndex)
+ }
+
+ complete(toHttpEntity(columnMetaOpt, message = s"can`t find service with $serviceName or can`t find serviceColumn with $columnName"))
+ }
+ }
+
+ // POST /graphs/createHTable
+ lazy val createHTable = path("createHTable") {
+ entity(as[JsValue]) { params =>
+ params.validate[HTableParams] match {
+ case JsSuccess(hTableParams, _) => {
+ management.createStorageTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"),
+ hTableParams.preSplitSize, hTableParams.hTableTTL,
+ hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm))
+
+ complete(toHttpEntity(None, status = StatusCodes.OK, message = "created"))
+ }
+ case err@JsError(_) => complete(toHttpEntity(None, status = StatusCodes.BadRequest, message = Json.toJson(err).toString))
+ }
+ }
+ }
+
+ // POST /graphs/copyLabel/:oldLabelName/:newLabelName
+ lazy val copyLabel = path("copyLabel" / Segments) { params =>
+ val (oldLabelName, newLabelName) = params match {
+ case oldLabel :: newLabel :: Nil => (oldLabel, newLabel)
+ }
+
+ val copyTry = management.copyLabel(oldLabelName, newLabelName, Some(newLabelName))
+
+ complete(toHttpEntity(copyTry))
+ }
+
+ // POST /graphs/renameLabel/:oldLabelName/:newLabelName
+ lazy val renameLabel = path("renameLabel" / Segments) { params =>
+ val (oldLabelName, newLabelName) = params match {
+ case oldLabel :: newLabel :: Nil => (oldLabel, newLabel)
+ }
+
+ Label.findByName(oldLabelName) match {
+ case None => complete(toHttpEntity(None, status = StatusCodes.NotFound, message = s"Label $oldLabelName not found."))
+ case Some(label) =>
+ Management.updateLabelName(oldLabelName, newLabelName)
+
+ complete(toHttpEntity(None, message = s"${label} was updated."))
+ }
+ }
+
+ // POST /graphs/swapLabels/:leftLabelName/:rightLabelName
+ lazy val swapLabel = path("swapLabel" / Segments) { params =>
+ val (leftLabelName, rightLabelName) = params match {
+ case left :: right :: Nil => (left, right)
+ }
+
+ val left = Label.findByName(leftLabelName, useCache = false)
+ val right = Label.findByName(rightLabelName, useCache = false)
+ // verify same schema
+
+ (left, right) match {
+ case (Some(l), Some(r)) =>
+ Management.swapLabelNames(leftLabelName, rightLabelName)
+
+ complete(toHttpEntity(None, message = s"Labels were swapped."))
+ case _ =>
+ complete(toHttpEntity(None, status = StatusCodes.NotFound, message = s"Label ${leftLabelName} or ${rightLabelName} not found."))
+ }
+ }
+
+ // POST /graphs/updateHTable/:labelName/:newHTableName
+ lazy val updateHTable = path("updateHTable" / Segments) { params =>
+ val (labelName, newHTableName) = params match {
+ case l :: h :: Nil => (l, h)
+ }
+
+ val updateTry = Management.updateHTable(labelName, newHTableName)
+
+ complete(toHttpEntity(updateTry))
+ }
+
+ /* PUT */
+ // PUT /graphs/deleteLabelReally/:labelName
+ lazy val deleteLabelReally = path("deleteLabelReally" / Segment) { labelName =>
+ val ret = Management.deleteLabel(labelName).toOption
+
+ complete(toHttpEntity(ret, message = s"Label not found: ${labelName}"))
+ }
+
+ // PUT /graphs/markDeletedLabel/:labelName
+ lazy val markDeletedLabel = path("markDeletedLabel" / Segment) { labelName =>
+ val ret = Management.markDeletedLabel(labelName).toOption
+
+ complete(toHttpEntity(ret, message = s"Label not found: ${labelName}"))
+ }
+
+ // PUT /graphs/deleteServiceColumn/:serviceName/:columnName
+ lazy val deleteServiceColumn = path("deleteServiceColumn" / Segments) { params =>
+ val (serviceName, columnName) = params match {
+ case s :: c :: Nil => (s, c)
+ }
+
+ val ret = Management.deleteColumn(serviceName, columnName).toOption
+
+ complete(toHttpEntity(ret, message = s"ServiceColumn not found: ${serviceName}, ${columnName}"))
+ }
+
+ // TODO:
+ // delete service?
+ // PUT /graphs/loadCache
+
// expose routes
lazy val adminRoute: Route =
get {
concat(
getService,
- getLabel
+ getServiceColumn,
+ getLabel,
+ getLabels
)
} ~ post {
concat(
createService,
- createLabel
+ createServiceColumn,
+ createLabel,
+ addIndex,
+ addProp,
+ addServiceColumnProp,
+ createHTable,
+ copyLabel,
+ renameLabel,
+ swapLabel,
+ updateHTable
+ )
+ } ~ put {
+ concat(
+ deleteLabelReally,
+ markDeletedLabel,
+ deleteServiceColumn
)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala
----------------------------------------------------------------------
diff --git a/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala
new file mode 100644
index 0000000..fc3b768
--- /dev/null
+++ b/s2http/src/main/scala/org/apache/s2graph/http/S2GraphMutateRoute.scala
@@ -0,0 +1,119 @@
+package org.apache.s2graph.http
+
+import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpResponse, StatusCodes}
+import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server.{ExceptionHandler, Route}
+import com.fasterxml.jackson.core.JsonParseException
+import org.apache.s2graph.core.rest.RequestParser
+import org.apache.s2graph.core.storage.MutateResponse
+import org.apache.s2graph.core.{GraphElement, S2Graph}
+import org.slf4j.LoggerFactory
+import play.api.libs.json.{JsValue, Json}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait S2GraphMutateRoute {
+
+ val s2graph: S2Graph
+ val logger = LoggerFactory.getLogger(this.getClass)
+ lazy val parser = new RequestParser(s2graph)
+
+ // lazy val requestParser = new RequestParser(s2graph)
+ lazy val exceptionHandler = ExceptionHandler {
+ case ex: JsonParseException =>
+ complete(StatusCodes.BadRequest -> ex.getMessage)
+ case ex: java.lang.IllegalArgumentException =>
+ complete(StatusCodes.BadRequest -> ex.getMessage)
+ }
+
+ lazy val mutateVertex = path("vertex" / Segments) { params =>
+ val (operation, serviceNameOpt, columnNameOpt) = params match {
+ case operation :: serviceName :: columnName :: Nil =>
+ (operation, Option(serviceName), Option(columnName))
+ case operation :: Nil =>
+ (operation, None, None)
+ }
+
+ entity(as[String]) { body =>
+ val payload = Json.parse(body)
+
+ implicit val ec = s2graph.ec
+
+ val future = vertexMutate(payload, operation, serviceNameOpt, columnNameOpt).map { mutateResponses =>
+ HttpResponse(
+ status = StatusCodes.OK,
+ entity = HttpEntity(ContentTypes.`application/json`, Json.toJson(mutateResponses).toString)
+ )
+ }
+
+ complete(future)
+ }
+ }
+
+ lazy val mutateEdge = path("edge" / Segment) { operation =>
+ entity(as[String]) { body =>
+ val payload = Json.parse(body)
+
+ implicit val ec = s2graph.ec
+
+ val future = edgeMutate(payload, operation, withWait = true).map { mutateResponses =>
+ HttpResponse(
+ status = StatusCodes.OK,
+ entity = HttpEntity(ContentTypes.`application/json`, Json.toJson(mutateResponses).toString)
+ )
+ }
+
+ complete(future)
+ }
+ }
+
+ def vertexMutate(jsValue: JsValue,
+ operation: String,
+ serviceNameOpt: Option[String] = None,
+ columnNameOpt: Option[String] = None,
+ withWait: Boolean = true)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+ val vertices = parser.toVertices(jsValue, operation, serviceNameOpt, columnNameOpt)
+
+ val verticesToStore = vertices.filterNot(_.isAsync)
+
+ s2graph.mutateVertices(verticesToStore, withWait).map(_.map(_.isSuccess))
+ }
+
+ def edgeMutate(elementsWithTsv: Seq[(GraphElement, String)],
+ withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+ val elementWithIdxs = elementsWithTsv.zipWithIndex
+
+ val (elementSync, elementAsync) = elementWithIdxs.partition { case ((element, tsv), idx) =>
+ !element.isAsync
+ }
+ val retToSkip = elementAsync.map(_._2 -> MutateResponse.Success)
+ val (elementsToStore, _) = elementSync.map(_._1).unzip
+ val elementsIdxToStore = elementSync.map(_._2)
+
+ s2graph.mutateElements(elementsToStore, withWait).map { mutateResponses =>
+ elementsIdxToStore.zip(mutateResponses) ++ retToSkip
+ }.map(_.sortBy(_._1).map(_._2.isSuccess))
+ }
+
+ def edgeMutate(jsValue: JsValue,
+ operation: String,
+ withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+ val edgesWithTsv = parser.parseJsonFormat(jsValue, operation)
+ edgeMutate(edgesWithTsv, withWait)
+ }
+
+ // expose routes
+ lazy val mutateRoute: Route =
+ post {
+ concat(
+ handleExceptions(exceptionHandler) {
+ mutateVertex
+ },
+ handleExceptions(exceptionHandler) {
+ mutateEdge
+ }
+ )
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/main/scala/org/apache/s2graph/http/Server.scala
----------------------------------------------------------------------
diff --git a/s2http/src/main/scala/org/apache/s2graph/http/Server.scala b/s2http/src/main/scala/org/apache/s2graph/http/Server.scala
index 82d2343..c26e314 100644
--- a/s2http/src/main/scala/org/apache/s2graph/http/Server.scala
+++ b/s2http/src/main/scala/org/apache/s2graph/http/Server.scala
@@ -35,7 +35,8 @@ import org.slf4j.LoggerFactory
object Server extends App
with S2GraphTraversalRoute
- with S2GraphAdminRoute {
+ with S2GraphAdminRoute
+ with S2GraphMutateRoute {
implicit val system: ActorSystem = ActorSystem("S2GraphHttpServer")
implicit val materializer: ActorMaterializer = ActorMaterializer()
@@ -54,6 +55,7 @@ object Server extends App
// Allows you to determine routes to expose according to external settings.
lazy val routes: Route = concat(
pathPrefix("graphs")(traversalRoute),
+ pathPrefix("mutate")(mutateRoute),
pathPrefix("admin")(adminRoute),
get(complete(health))
)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala
----------------------------------------------------------------------
diff --git a/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala b/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala
index 9239598..eade7e6 100644
--- a/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala
+++ b/s2http/src/test/scala/org/apache/s2graph/http/AdminRouteSpec.scala
@@ -4,17 +4,16 @@ import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.testkit.ScalatestRouteTest
import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.core.Management.JsonModel.Prop
import org.apache.s2graph.core.S2Graph
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import org.slf4j.LoggerFactory
import play.api.libs.json.{JsString, JsValue, Json}
-class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with ScalatestRouteTest with S2GraphAdminRoute with BeforeAndAfterAll with PlayJsonSupport {
-
+class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with ScalatestRouteTest with S2GraphAdminRoute with BeforeAndAfterAll {
val config = ConfigFactory.load()
val s2graph = new S2Graph(config)
-
override val logger = LoggerFactory.getLogger(this.getClass)
override def afterAll(): Unit = {
@@ -23,14 +22,17 @@ class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with Scal
lazy val routes = adminRoute
+ val serviceName = "kakaoFavorites"
+ val columnName = "userName"
+
"AdminRoute" should {
"be able to create service (POST /createService)" in {
val serviceParam = Json.obj(
- "serviceName" -> "kakaoFavorites",
+ "serviceName" -> serviceName,
"compressionAlgorithm" -> "gz"
)
- val serviceEntity = Marshal(serviceParam).to[MessageEntity].futureValue
+ val serviceEntity = Marshal(serviceParam.toString).to[MessageEntity].futureValue
val request = Post("/createService").withEntity(serviceEntity)
request ~> routes ~> check {
@@ -45,7 +47,7 @@ class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with Scal
}
"return service if present (GET /getService/{serviceName})" in {
- val request = HttpRequest(uri = "/getService/kakaoFavorites")
+ val request = HttpRequest(uri = s"/getService/$serviceName")
request ~> routes ~> check {
status should ===(StatusCodes.OK)
@@ -56,6 +58,33 @@ class AdminRoutesSpec extends WordSpec with Matchers with ScalaFutures with Scal
(response \\ "name").head should ===(JsString("kakaoFavorites"))
}
}
+
+ "be able to create serviceColumn (POST /createServiceColumn)" in {
+ val serviceColumnParam = Json.obj(
+ "serviceName" -> serviceName,
+ "columnName" -> columnName,
+ "columnType" -> "string",
+ "props" -> Json.toJson(
+ Seq(
+ Json.obj("name" -> "age", "defaultValue" -> "-1", "dataType" -> "integer")
+ )
+ )
+ )
+
+ val serviceColumnEntity = Marshal(serviceColumnParam.toString).to[MessageEntity].futureValue
+ val request = Post("/createServiceColumn").withEntity(serviceColumnEntity)
+
+ request ~> routes ~> check {
+ status should ===(StatusCodes.Created)
+ contentType should ===(ContentTypes.`application/json`)
+
+ val response = entityAs[JsValue]
+
+ (response \\ "serviceName").head should ===(JsString("kakaoFavorites"))
+ (response \\ "columnName").head should ===(JsString("userName"))
+ (response \\ "status").head should ===(JsString("ok"))
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala
----------------------------------------------------------------------
diff --git a/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala b/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala
new file mode 100644
index 0000000..f823cd5
--- /dev/null
+++ b/s2http/src/test/scala/org/apache/s2graph/http/MutateRouteSpec.scala
@@ -0,0 +1,52 @@
+package org.apache.s2graph.http
+
+import akka.http.scaladsl.marshalling.Marshal
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.testkit.ScalatestRouteTest
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.core.S2Graph
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
+import org.slf4j.LoggerFactory
+import play.api.libs.json.{JsValue, Json}
+
+class MutateRouteSpec extends WordSpec with Matchers with PlayJsonSupport with ScalaFutures with ScalatestRouteTest with S2GraphMutateRoute with BeforeAndAfterAll {
+ val config = ConfigFactory.load()
+ val s2graph = new S2Graph(config)
+ override val logger = LoggerFactory.getLogger(this.getClass)
+
+ override def afterAll(): Unit = {
+ s2graph.shutdown()
+ }
+
+ lazy val routes = mutateRoute
+
+ val serviceName = "kakaoFavorites"
+ val columnName = "userName"
+
+ "MutateRoute" should {
+ "be able to insert vertex (POST /mutate/vertex/insert)" in {
+ // {"timestamp": 10, "serviceName": "s2graph", "columnName": "user", "id": 1, "props": {}}
+ val param = Json.obj(
+ "timestamp" -> 10,
+ "serviceName" -> serviceName,
+ "columnName" -> columnName,
+ "id" -> "user_a",
+ "props" -> Json.obj(
+ "age" -> 20
+ )
+ )
+ val entity = Marshal(param.toString).to[MessageEntity].futureValue
+ val request = Post("/vertex/insert").withEntity(entity)
+
+ request ~> routes ~> check {
+ status should ===(StatusCodes.OK)
+ contentType should ===(ContentTypes.`application/json`)
+
+ val response = entityAs[JsValue]
+ response should ===(Json.toJson(Seq(true)))
+ }
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8ab86dd/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
index 00702cf..fdd9b63 100644
--- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
+++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/AdminController.scala
@@ -20,6 +20,7 @@
package org.apache.s2graph.rest.play.controllers
import org.apache.s2graph.core.Management
+import org.apache.s2graph.core.Management.JsonModel.HTableParams
import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.rest.RequestParser
import org.apache.s2graph.core.utils.logger
@@ -392,37 +393,6 @@ object AdminController extends Controller {
}
}
- case class HTableParams(cluster: String, hTableName: String,
- preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: Option[String]) {
-
- override def toString(): String = {
- s"""HtableParams
- |-- cluster : $cluster
- |-- hTableName : $hTableName
- |-- preSplitSize : $preSplitSize
- |-- hTableTTL : $hTableTTL
- |-- compressionAlgorithm : $compressionAlgorithm
- |""".stripMargin
- }
- }
-
- implicit object HTableParamsJsonConverter extends Format[HTableParams] {
- def reads(json: JsValue): JsResult[HTableParams] = (
- (__ \ "cluster").read[String] and
- (__ \ "hTableName").read[String] and
- (__ \ "preSplitSize").read[Int] and
- (__ \ "hTableTTL").readNullable[Int] and
- (__ \ "compressionAlgorithm").readNullable[String])(HTableParams.apply _).reads(json)
-
- def writes(o: HTableParams): JsValue = Json.obj(
- "cluster" -> o.cluster,
- "hTableName" -> o.hTableName,
- "preSplitSize" -> o.preSplitSize,
- "hTableTTL" -> o.hTableTTL,
- "compressionAlgorithm" -> o.compressionAlgorithm
- )
- }
-
implicit object JsErrorJsonWriter extends Writes[JsError] {
def writes(o: JsError): JsValue = Json.obj(
"errors" -> JsArray(