You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/26 03:09:52 UTC
[3/7] incubator-s2graph git commit: - rename mysqls package to
schema. - remove GlobalIndex. - add toBytes, fromBytes on Schema.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
deleted file mode 100644
index 0b16449..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import org.apache.s2graph.core.GraphUtil
-import scalikejdbc._
-
-import scala.util.{Try, Random}
-
-object Experiment extends Model[Experiment] {
- val ImpressionKey = "S2-Impression-Id"
- val ImpressionId = "Impression-Id"
-
- def apply(rs: WrappedResultSet): Experiment = {
- Experiment(rs.intOpt("id"),
- rs.int("service_id"),
- rs.string("name"),
- rs.string("description"),
- rs.string("experiment_type"),
- rs.int("total_modular"))
- }
-
- def finds(serviceId: Int)(implicit session: DBSession = AutoSession): List[Experiment] = {
- val cacheKey = "serviceId=" + serviceId
- withCaches(cacheKey) {
- sql"""select * from experiments where service_id = ${serviceId}"""
- .map { rs => Experiment(rs) }.list().apply()
- }
- }
-
- def findBy(serviceId: Int, name: String)(implicit session: DBSession = AutoSession): Option[Experiment] = {
- val cacheKey = "serviceId=" + serviceId + ":name=" + name
- withCache(cacheKey) {
- sql"""select * from experiments where service_id = ${serviceId} and name = ${name}"""
- .map { rs => Experiment(rs) }.single.apply
- }
- }
-
- def findById(id: Int)(implicit session: DBSession = AutoSession): Option[Experiment] = {
- val cacheKey = "id=" + id
- withCache(cacheKey)(
- sql"""select * from experiments where id = ${id}"""
- .map { rs => Experiment(rs) }.single.apply
- )
- }
-
- def insert(service: Service, name: String, description: String, experimentType: String = "t", totalModular: Int = 100)
- (implicit session: DBSession = AutoSession): Try[Experiment] = {
- Try {
- sql"""INSERT INTO experiments(service_id, service_name, `name`, description, experiment_type, total_modular)
- VALUES(${service.id.get}, ${service.serviceName}, $name, $description, $experimentType, $totalModular)"""
- .updateAndReturnGeneratedKey().apply()
- }.map { newId =>
- Experiment(Some(newId.toInt), service.id.get, name, description, experimentType, totalModular)
- }
- }
-}
-
-case class Experiment(id: Option[Int],
- serviceId: Int,
- name: String,
- description: String,
- experimentType: String,
- totalModular: Int) {
-
- def buckets = Bucket.finds(id.get)
-
- def rangeBuckets = for {
- bucket <- buckets
- range <- bucket.rangeOpt
- } yield range -> bucket
-
-
- def findBucket(uuid: String, impIdOpt: Option[String] = None): Option[Bucket] = {
- impIdOpt match {
- case Some(impId) => Bucket.findByImpressionId(impId)
- case None =>
- val seed = experimentType match {
- case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1
- case _ => Random.nextInt(totalModular) + 1
- }
- findBucket(seed)
- }
- }
-
- def findBucket(uuidMod: Int): Option[Bucket] = {
- rangeBuckets.find { case ((from, to), bucket) =>
- from <= uuidMod && uuidMod <= to
- }.map(_._2)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
deleted file mode 100644
index 501a964..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import scalikejdbc.{AutoSession, DBSession, WrappedResultSet}
-import scalikejdbc._
-
-object GlobalIndex extends Model[GlobalIndex] {
- val vidField = "_vid_"
- val eidField = "_eid_"
- val labelField = "_label_"
- val serviceField = "_service_"
- val serviceColumnField = "_serviceColumn_"
- val EdgeType = "edge"
- val VertexType = "vertex"
- val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField)
-
-// val IndexName = "global_indices"
- val VertexIndexName = "global_vertex_index"
- val EdgeIndexName = "global_edge_index"
- val TypeName = "test"
-
- def apply(rs: WrappedResultSet): GlobalIndex = {
- GlobalIndex(rs.intOpt("id"),
- rs.string("element_type"),
- rs.string("prop_names").split(",").sorted,
- rs.string("index_name"))
- }
-
- def findBy(elementType: String, indexName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[GlobalIndex] = {
- val cacheKey = s"elementType=$elementType:indexName=$indexName"
- lazy val sql = sql"""select * from global_indices where element_type = ${elementType} and index_name = $indexName""".map { rs => GlobalIndex(rs) }.single.apply()
-
- if (useCache) withCache(cacheKey){sql}
- else sql
- }
-
- def insert(elementType: String, indexName: String, propNames: Seq[String])(implicit session: DBSession = AutoSession): Long = {
- val allPropNames = (hiddenIndexFields.toSeq ++ propNames).sorted
- sql"""insert into global_indices(element_type, prop_names, index_name)
- values($elementType, ${allPropNames.mkString(",")}, $indexName)"""
- .updateAndReturnGeneratedKey.apply()
- }
-
- def findAll(elementType: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[GlobalIndex] = {
- lazy val ls = sql"""select * from global_indices where element_type = $elementType""".map { rs => GlobalIndex(rs) }.list.apply
- if (useCache) {
- listCache.withCache(s"findAll:elementType=$elementType") {
- putsToCache(ls.map { globalIndex =>
- val cacheKey = s"elementType=${globalIndex.elementType}:indexName=${globalIndex.indexName}"
- cacheKey -> globalIndex
- })
- ls
- }
- } else {
- ls
- }
- }
-
- def findGlobalIndex(elementType: String, hasContainers: java.util.List[HasContainer])(implicit session: DBSession = AutoSession): Option[GlobalIndex] = {
- import scala.collection.JavaConversions._
- val indices = findAll(elementType, useCache = true)
- val keys = hasContainers.map(_.getKey)
-
- val sorted = indices.map { index =>
- val matched = keys.filter(index.propNamesSet)
- index -> matched.length
- }.filter(_._2 > 0).sortBy(_._2 * -1)
-
- sorted.headOption.map(_._1)
- }
-
-}
-
-case class GlobalIndex(id: Option[Int],
- elementType: String,
- propNames: Seq[String],
- indexName: String) {
- val backendIndexName = indexName + "_" + elementType
- val backendIndexNameWithType = backendIndexName + "/test1"
- lazy val propNamesSet = propNames.toSet
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
deleted file mode 100644
index c128163..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import java.util.Calendar
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
-import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs}
-import org.apache.s2graph.core.utils.logger
-import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
-import scalikejdbc._
-
-object Label extends Model[Label] {
-
- val maxHBaseTableNames = 2
-
- def apply(rs: WrappedResultSet): Label = {
- Label(Option(rs.int("id")), rs.string("label"),
- rs.int("src_service_id"), rs.string("src_column_name"), rs.string("src_column_type"),
- rs.int("tgt_service_id"), rs.string("tgt_column_name"), rs.string("tgt_column_type"),
- rs.boolean("is_directed"), rs.string("service_name"), rs.int("service_id"), rs.string("consistency_level"),
- rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), rs.string("schema_version"), rs.boolean("is_async"),
- rs.string("compressionAlgorithm"), rs.stringOpt("options"))
- }
-
- def deleteAll(label: Label)(implicit session: DBSession) = {
- val id = label.id
- LabelMeta.findAllByLabelId(id.get, false).foreach { x => LabelMeta.delete(x.id.get) }
- LabelIndex.findByLabelIdAll(id.get, false).foreach { x => LabelIndex.delete(x.id.get) }
- Label.delete(id.get)
- }
-
-
- def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = {
- val cacheKey = "label=" + labelName
- lazy val labelOpt =
- sql"""
- select *
- from labels
- where label = ${labelName}
- and deleted_at is null """.map { rs => Label(rs) }.single.apply()
-
- if (useCache) withCache(cacheKey)(labelOpt)
- else labelOpt
- }
-
- def insert(label: String,
- srcServiceId: Int,
- srcColumnName: String,
- srcColumnType: String,
- tgtServiceId: Int,
- tgtColumnName: String,
- tgtColumnType: String,
- isDirected: Boolean,
- serviceName: String,
- serviceId: Int,
- consistencyLevel: String,
- hTableName: String,
- hTableTTL: Option[Int],
- schemaVersion: String,
- isAsync: Boolean,
- compressionAlgorithm: String,
- options: Option[String])(implicit session: DBSession = AutoSession) = {
- sql"""
- insert into labels(label,
- src_service_id, src_column_name, src_column_type,
- tgt_service_id, tgt_column_name, tgt_column_type,
- is_directed, service_name, service_id, consistency_level, hbase_table_name, hbase_table_ttl, schema_version, is_async,
- compressionAlgorithm, options)
- values (${label},
- ${srcServiceId}, ${srcColumnName}, ${srcColumnType},
- ${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType},
- ${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, ${hTableName}, ${hTableTTL},
- ${schemaVersion}, ${isAsync}, ${compressionAlgorithm}, ${options})
- """
- .updateAndReturnGeneratedKey.apply()
- }
-
- def findByIdOpt(id: Int)(implicit session: DBSession = AutoSession): Option[Label] = {
- val cacheKey = "id=" + id
- withCache(cacheKey)(
- sql"""
- select *
- from labels
- where id = ${id}
- and deleted_at is null"""
- .map { rs => Label(rs) }.single.apply())
- }
-
- def findById(id: Int)(implicit session: DBSession = AutoSession): Label = {
- val cacheKey = "id=" + id
- withCache(cacheKey)(
- sql"""
- select *
- from labels
- where id = ${id}
- and deleted_at is null"""
- .map { rs => Label(rs) }.single.apply()).get
- }
-
- def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
- val cacheKey = "tgtColumnId=" + columnId
- val col = ServiceColumn.findById(columnId)
- withCaches(cacheKey)(
- sql"""
- select *
- from labels
- where tgt_column_name = ${col.columnName}
- and service_id = ${col.serviceId}
- and deleted_at is null
- """.map { rs => Label(rs) }.list().apply())
- }
-
- def findBySrcColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
- val cacheKey = "srcColumnId=" + columnId
- val col = ServiceColumn.findById(columnId)
- withCaches(cacheKey)(
- sql"""
- select *
- from labels
- where src_column_name = ${col.columnName}
- and service_id = ${col.serviceId}
- and deleted_at is null
- """.map { rs => Label(rs) }.list().apply())
- }
-
- def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
- val cacheKey = "srcServiceId=" + serviceId
- withCaches(cacheKey)(
- sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
- )
- }
-
- def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
- val cacheKey = "tgtServiceId=" + serviceId
- withCaches(cacheKey)(
- sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
- )
- }
-
- def insertAll(labelName: String, srcServiceName: String, srcColumnName: String, srcColumnType: String,
- tgtServiceName: String, tgtColumnName: String, tgtColumnType: String,
- isDirected: Boolean = true,
- serviceName: String,
- indices: Seq[Index],
- metaProps: Seq[Prop],
- consistencyLevel: String,
- hTableName: Option[String],
- hTableTTL: Option[Int],
- schemaVersion: String,
- isAsync: Boolean,
- compressionAlgorithm: String,
- options: Option[String])(implicit session: DBSession = AutoSession): Label = {
-
- val srcServiceOpt = Service.findByName(srcServiceName, useCache = false)
- val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false)
- val serviceOpt = Service.findByName(serviceName, useCache = false)
- if (srcServiceOpt.isEmpty) throw new RuntimeException(s"source service $srcServiceName is not created.")
- if (tgtServiceOpt.isEmpty) throw new RuntimeException(s"target service $tgtServiceName is not created.")
- if (serviceOpt.isEmpty) throw new RuntimeException(s"service $serviceName is not created.")
-
- val newLabel = for {
- srcService <- srcServiceOpt
- tgtService <- tgtServiceOpt
- service <- serviceOpt
- } yield {
- val srcServiceId = srcService.id.get
- val tgtServiceId = tgtService.id.get
- val serviceId = service.id.get
-
- /* insert serviceColumn */
- val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType))
- val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType))
-
- if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}")
- if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}")
-
- /* create label */
- Label.findByName(labelName, useCache = false).getOrElse {
-
- val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType,
- tgtServiceId, tgtColumnName, tgtColumnType, isDirected, serviceName, serviceId, consistencyLevel,
- hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync,
- compressionAlgorithm, options).toInt
-
- val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType, storeInGlobalIndex) =>
- val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType, storeInGlobalIndex)
- (propName -> labelMeta.seq)
- }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name -> labelMeta.seq).toMap
-
- if (indices.isEmpty) {
- // make default index with _PK, _timestamp, 0
- LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none", None, None)
- } else {
- indices.foreach { index =>
- val metaSeq = index.propNames.map { name => labelMetaMap(name) }
- LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none", index.direction, index.options)
- }
- }
-
- val cacheKeys = List(s"id=$createdId", s"label=$labelName")
- val ret = findByName(labelName, useCache = false).get
- putsToCache(cacheKeys.map(k => k -> ret))
- ret
- }
- }
-
- newLabel.getOrElse(throw new RuntimeException("failed to create label"))
- }
-
- def findAll()(implicit session: DBSession = AutoSession) = {
- val ls = sql"""select * from labels where deleted_at is null""".map { rs => Label(rs) }.list().apply()
-
- putsToCache(ls.map { x =>
- val cacheKey = s"id=${x.id.get}"
- (cacheKey -> x)
- })
-
- putsToCache(ls.map { x =>
- val cacheKey = s"label=${x.label}"
- (cacheKey -> x)
- })
-
- ls
- }
-
- def updateName(oldName: String, newName: String)(implicit session: DBSession = AutoSession) = {
- logger.info(s"rename label: $oldName -> $newName")
- sql"""update labels set label = ${newName} where label = ${oldName}""".update.apply()
- }
-
- def updateHTableName(labelName: String, newHTableName: String)(implicit session: DBSession = AutoSession) = {
- logger.info(s"update HTable of label $labelName to $newHTableName")
- val cnt = sql"""update labels set hbase_table_name = $newHTableName where label = $labelName""".update().apply()
- val label = Label.findByName(labelName, useCache = false).get
-
- val cacheKeys = List(s"id=${label.id}", s"label=${label.label}")
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
- cnt
- }
-
- def delete(id: Int)(implicit session: DBSession = AutoSession) = {
- val label = findById(id)
- logger.info(s"delete label: $label")
- val cnt = sql"""delete from labels where id = ${label.id.get}""".update().apply()
- val cacheKeys = List(s"id=$id", s"label=${label.label}")
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
- cnt
- }
-
- def markDeleted(label: Label)(implicit session: DBSession = AutoSession) = {
-
- logger.info(s"mark deleted label: $label")
- val oldName = label.label
- val now = Calendar.getInstance().getTime
- val newName = s"deleted_${now.getTime}_"+ label.label
- val cnt = sql"""update labels set label = ${newName}, deleted_at = ${now} where id = ${label.id.get}""".update.apply()
- val cacheKeys = List(s"id=${label.id}", s"label=${oldName}")
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
- cnt
- }
-}
-
-case class Label(id: Option[Int], label: String,
- srcServiceId: Int, srcColumnName: String, srcColumnType: String,
- tgtServiceId: Int, tgtColumnName: String, tgtColumnType: String,
- isDirected: Boolean = true, serviceName: String, serviceId: Int, consistencyLevel: String = "strong",
- hTableName: String, hTableTTL: Option[Int],
- schemaVersion: String, isAsync: Boolean = false,
- compressionAlgorithm: String,
- options: Option[String]) {
- def metas(useCache: Boolean = true) = LabelMeta.findAllByLabelId(id.get, useCache = useCache)
-
- def indices(useCache: Boolean = true) = LabelIndex.findByLabelIdAll(id.get, useCache = useCache)
-
- // lazy val firstHBaseTableName = hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)
- lazy val srcService = Service.findById(srcServiceId)
- lazy val tgtService = Service.findById(tgtServiceId)
- lazy val service = Service.findById(serviceId)
- /**
- * TODO
- * change this to apply hbase table from target serviceName
- */
- // lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.tableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
- // lazy val (hbaseZkAddr, hbaseTableName) = (Config.HBASE_ZOOKEEPER_QUORUM, hTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
- // lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").headOption.getOrElse(GraphConnection.getConfVal("hbase.table.name")))
- lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").head)
-
- lazy val srcColumn = ServiceColumn.find(srcServiceId, srcColumnName).getOrElse(throw ModelNotFoundException("Source column not found"))
- lazy val tgtColumn = ServiceColumn.find(tgtServiceId, tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not found"))
-
- lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq)
-
- //TODO: Make sure this is correct
-
-// lazy val metas = metas(useCache = true)
- lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true)
- lazy val labelMetas = LabelMeta.findAllByLabelId(id.get, useCache = true)
- lazy val labelMetaSet = labelMetas.toSet
- lazy val labelMetaMap = (labelMetas ++ LabelMeta.reservedMetas).map(m => m.seq -> m).toMap
-
- lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap
- lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap
- lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap
- lazy val extraIndices = indices.filter(idx => defaultIndex.isDefined && idx.id.get != defaultIndex.get.id.get)
- // indices filterNot (_.id.get == defaultIndex.get.id.get)
- lazy val extraIndicesMap = extraIndices.map(idx => (idx.seq, idx)) toMap
-
- lazy val metaProps = LabelMeta.reservedMetas.map { m =>
- if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
- else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
- else m
- } ::: LabelMeta.findAllByLabelId(id.get, useCache = true)
-
- lazy val metaPropsInner = LabelMeta.reservedMetasInner.map { m =>
- if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
- else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
- else m
- } ::: LabelMeta.findAllByLabelId(id.get, useCache = true)
-
- lazy val metaPropsMap = metaProps.map(x => (x.seq, x)).toMap
- lazy val metaPropsInvMap = metaProps.map(x => (x.name, x)).toMap
- lazy val metaPropNames = metaProps.map(x => x.name)
- lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)) toMap
-
- /** this is used only by edgeToProps */
- lazy val metaPropsDefaultMap = (for {
- prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
- jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
- } yield prop.name -> jsValue).toMap
-
- lazy val metaPropsDefaultMapInnerString = (for {
- prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
- innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
- } yield prop.name -> innerVal).toMap
-
- lazy val metaPropsDefaultMapInner = (for {
- prop <- metaPropsInner
- innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
- } yield prop -> innerVal).toMap
- lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq
- lazy val metaPropsJsValueWithDefault = (for {
- prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
- jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
- } yield prop -> jsValue).toMap
-// lazy val extraOptions = Model.extraOptions(Option("""{
-// "storage": {
-// "s2graph.storage.backend": "rocks",
-// "rocks.db.path": "/tmp/db"
-// }
-// }"""))
-
- lazy val tokens: Set[String] = extraOptions.get("tokens").fold(Set.empty[String]) {
- case JsArray(tokens) => tokens.map(_.as[String]).toSet
- case _ =>
- logger.error("Invalid token JSON")
- Set.empty[String]
- }
-
- lazy val extraOptions = Model.extraOptions(options)
-
- lazy val durability = extraOptions.get("durability").map(_.as[Boolean]).getOrElse(true)
-
- lazy val storageConfigOpt: Option[Config] = toStorageConfig
-
- def toStorageConfig: Option[Config] = {
- Model.toStorageConfig(extraOptions)
- }
-
-
- def srcColumnWithDir(dir: Int) = {
- // GraphUtil.directions("out"
- if (dir == 0) srcColumn else tgtColumn
- }
-
- def tgtColumnWithDir(dir: Int) = {
- // GraphUtil.directions("out"
- if (dir == 0) tgtColumn else srcColumn
- }
-
- lazy val tgtSrc = (tgtColumn, srcColumn)
- lazy val srcTgt = (srcColumn, tgtColumn)
-
- def srcTgtColumn(dir: Int) = if (dir == 1) tgtSrc else srcTgt
-
- lazy val EmptyPropsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 0))
-// def init() = {
-// metas()
-// metaSeqsToNames()
-// service
-// srcColumn
-// tgtColumn
-// defaultIndex
-// indices
-// metaProps
-// }
-
- // def srcColumnInnerVal(jsValue: JsValue) = {
- // jsValueToInnerVal(jsValue, srcColumnType, version)
- // }
- // def tgtColumnInnerVal(jsValue: JsValue) = {
- // jsValueToInnerVal(jsValue, tgtColumnType, version)
- // }
-
- override def toString(): String = {
- val orderByKeys = LabelMeta.findAllByLabelId(id.get)
- super.toString() + orderByKeys.toString()
- }
-
- // def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = {
- // if (scoring.isEmpty) LabelIndex.defaultSeq
- // else {
- // LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
- //
- //// LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
- // }
- // }
- lazy val toJson = {
- val allIdxs = LabelIndex.findByLabelIdAll(id.get, useCache = false)
- val defaultIdxOpt = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq, useCache = false)
- val extraIdxs = allIdxs.filter(idx => defaultIdxOpt.isDefined && idx.id.get != defaultIdxOpt.get.id.get)
- val metaProps = LabelMeta.reservedMetas.map { m =>
- if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
- else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
- else m
- } ::: LabelMeta.findAllByLabelId(id.get, useCache = false)
-
- val defaultIdx = defaultIdxOpt.map(x => x.toJson).getOrElse(Json.obj())
- val optionsJs = try {
- val obj = options.map(Json.parse).getOrElse(Json.obj()).as[JsObject]
- if (!obj.value.contains("tokens")) obj
- else obj ++ Json.obj("tokens" -> obj.value("tokens").as[Seq[String]].map("*" * _.length))
-
- } catch { case e: Exception => Json.obj() }
-
- Json.obj("labelName" -> label,
- "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson,
- "isDirected" -> isDirected,
- "serviceName" -> serviceName,
- "consistencyLevel" -> consistencyLevel,
- "schemaVersion" -> schemaVersion,
- "isAsync" -> isAsync,
- "compressionAlgorithm" -> compressionAlgorithm,
- "defaultIndex" -> defaultIdx,
- "extraIndex" -> extraIdxs.map(exIdx => exIdx.toJson),
- "metaProps" -> metaProps.filter { labelMeta => LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson),
- "options" -> optionsJs
- )
- }
-
- def propsToInnerValsWithTs(props: Map[String, Any],
- ts: Long = System.currentTimeMillis()): Map[LabelMeta, InnerValLikeWithTs] = {
- for {
- (k, v) <- props
- labelMeta <- metaPropsInvMap.get(k)
- innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion)
- } yield labelMeta -> InnerValLikeWithTs(innerVal, ts)
-
- }
-
- def innerValsWithTsToProps(props: Map[LabelMeta, InnerValLikeWithTs],
- selectColumns: Map[Byte, Boolean]): Map[String, Any] = {
- if (selectColumns.isEmpty) {
- for {
- (meta, v) <- metaPropsDefaultMapInner ++ props
- } yield {
- meta.name -> innerValToAny(v.innerVal, meta.dataType)
- }
- } else {
- for {
- (k, _) <- selectColumns
- if k != LabelMeta.toSeq && k != LabelMeta.fromSeq
- labelMeta <- metaPropsMap.get(k)
- } yield {
- val v = props.get(labelMeta).orElse(metaPropsDefaultMapInner.get(labelMeta)).get
- labelMeta.name -> innerValToAny(v.innerVal, labelMeta.dataType)
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
deleted file mode 100644
index 1da0e55..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.mysqls.LabelIndex.LabelIndexMutateOption
-import org.apache.s2graph.core.utils.logger
-import play.api.libs.json.{JsObject, JsString, Json}
-import scalikejdbc._
-
-object LabelIndex extends Model[LabelIndex] {
- val DefaultName = "_PK"
- val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq)
- val DefaultSeq = 1.toByte
- val MaxOrderSeq = 7
-
- def apply(rs: WrappedResultSet): LabelIndex = {
- LabelIndex(rs.intOpt("id"), rs.int("label_id"), rs.string("name"), rs.byte("seq"),
- rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match {
- case metaSeqsList => metaSeqsList
- },
- rs.string("formulars"),
- rs.intOpt("dir"),
- rs.stringOpt("options")
- )
- }
-
- case class LabelIndexMutateOption(dir: Byte,
- method: String,
- rate: Double,
- totalModular: Long,
- storeDegree: Boolean) {
-
- val isBufferIncrement = method == "drop" || method == "sample" || method == "hash_sample"
-
- def sample[T](a: T, hashOpt: Option[Long]): Boolean = {
- if (method == "drop") false
- else if (method == "sample") {
- if (scala.util.Random.nextDouble() < rate) true
- else false
- } else if (method == "hash_sample") {
- val hash = hashOpt.getOrElse(throw new RuntimeException("hash_sample need _from_hash value"))
- if ((hash.abs % totalModular) / totalModular.toDouble < rate) true
- else false
- } else true
- }
- }
-
- def findById(id: Int)(implicit session: DBSession = AutoSession) = {
- val cacheKey = "id=" + id
- withCache(cacheKey) {
- sql"""select * from label_indices where id = ${id}""".map { rs => LabelIndex(rs) }.single.apply
- }.get
- }
-
- def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
- val cacheKey = "labelId=" + labelId
- if (useCache) {
- withCaches(cacheKey)( sql"""
- select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC
- """.map { rs => LabelIndex(rs) }.list.apply)
- } else {
- sql"""
- select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC
- """.map { rs => LabelIndex(rs) }.list.apply
- }
- }
-
- def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String,
- direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): Long = {
- sql"""
- insert into label_indices(label_id, name, seq, meta_seqs, formulars, dir, options)
- values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}, ${direction}, ${options})
- """
- .updateAndReturnGeneratedKey.apply()
- }
-
- def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String,
- direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): LabelIndex = {
- findByLabelIdAndSeqs(labelId, metaSeqs, direction) match {
- case Some(s) => s
- case None =>
- val orders = findByLabelIdAll(labelId, false)
- val seq = (orders.size + 1).toByte
- assert(seq <= MaxOrderSeq)
- val createdId = insert(labelId, indexName, seq, metaSeqs, formulars, direction, options)
- val cacheKeys = List(s"labelId=$labelId:seq=$seq",
- s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId")
-
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
-
- findByLabelIdAndSeq(labelId, seq).get
- }
- }
-
- def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
- val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction
- withCache(cacheKey) {
- sql"""
- select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction}
- """.map { rs => LabelIndex(rs) }.single.apply
- }
- }
-
- def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
- // val cacheKey = s"labelId=$labelId:seq=$seq"
- val cacheKey = "labelId=" + labelId + ":seq=" + seq
- if (useCache) {
- withCache(cacheKey)( sql"""
- select * from label_indices where label_id = ${labelId} and seq = ${seq}
- """.map { rs => LabelIndex(rs) }.single.apply)
- } else {
- sql"""
- select * from label_indices where label_id = ${labelId} and seq = ${seq}
- """.map { rs => LabelIndex(rs) }.single.apply
- }
- }
-
- def delete(id: Int)(implicit session: DBSession = AutoSession) = {
- val labelIndex = findById(id)
- val seqs = labelIndex.metaSeqs.mkString(",")
- val (labelId, seq) = (labelIndex.labelId, labelIndex.seq)
- sql"""delete from label_indices where id = ${id}""".execute.apply()
-
- val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}")
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
- }
-
- def findAll()(implicit session: DBSession = AutoSession) = {
- val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) }.list.apply
- putsToCache(ls.map { x =>
- var cacheKey = s"id=${x.id.get}"
- (cacheKey -> x)
- })
- putsToCache(ls.map { x =>
- var cacheKey = s"labelId=${x.labelId}:seq=${x.seq}}"
- (cacheKey -> x)
- })
- putsToCache(ls.map { x =>
- var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}"
- (cacheKey -> x)
- })
- putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
- val cacheKey = s"labelId=${labelId}"
- (cacheKey -> ls)
- }.toList)
- }
-}
-/**
-mgmt.buildIndex('nameAndAge',Vertex.class)
-.addKey(name,Mapping.TEXT.getParameter())
-.addKey(age,Mapping.TEXT.getParameter())
-.buildMixedIndex("search")
-
-v: {name: abc} - E1: {age: 20}, E2, E3....
-
-Management.createServiceColumn(
- serviceName = serviceName, columnName = "person", columnType = "integer",
- props = Seq(
- Prop("name", "-", "string"),
- Prop("age", "0", "integer"),
- Prop("location", "-", "string")
- )
-)
-
-management.createLabel(
- label = "bought",
- srcServiceName = serviceName, srcColumnName = "person", srcColumnType = "integer",
- tgtServiceName = serviceName, tgtColumnName = "product", tgtColumnType = "integer", idDirected = true,
- serviceName = serviceName,
- indices = Seq(
- Index("PK", Seq("amount", "created_at"), IndexType("mixed", propsMapping: Map[String, String]),
-{"in": {}, "out": {}})
- ),
- props = Seq(
- Prop("amount", "0.0", "double"),
- Prop("created_at", "2000-01-01", "string")
- ),
- consistencyLevel = "strong"
-)
-
-mgmt.buildIndex('PK', Edge.class)
- .addKey(amount, Double)
- .buildCompositeIndex
-
-*/
-case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String,
- dir: Option[Int], options: Option[String]) {
- // both
- lazy val label = Label.findById(labelId)
- lazy val metas = label.metaPropsMap
- lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq))
- lazy val sortKeyTypesArray = sortKeyTypes.toArray
- lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name }
-
- lazy val toJson = {
- val dirJs = dir.map(GraphUtil.fromDirection).getOrElse("both")
- val optionsJs = try { options.map(Json.parse).getOrElse(Json.obj()) } catch { case e: Exception => Json.obj() }
-
- Json.obj(
- "name" -> name,
- "propNames" -> sortKeyTypes.map(x => x.name),
- "dir" -> dirJs,
- "options" -> optionsJs
- )
- }
-
- def parseOption(dir: String): Option[LabelIndexMutateOption] = try {
- options.map { string =>
- val jsObj = Json.parse(string) \ dir
-
- val method = (jsObj \ "method").asOpt[String].getOrElse("default")
- val rate = (jsObj \ "rate").asOpt[Double].getOrElse(1.0)
- val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L)
- val storeDegree = (jsObj \ "storeDegree").asOpt[Boolean].getOrElse(true)
-
- LabelIndexMutateOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree)
- }
- } catch {
- case e: Exception =>
- logger.error(s"Parse failed labelOption: ${this.label}", e)
- None
- }
-
- lazy val inDirOption = parseOption("in")
-
- lazy val outDirOption = parseOption("out")
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
deleted file mode 100644
index 3f54f49..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-/**
- * Created by shon on 6/3/15.
- */
-
-import org.apache.s2graph.core.GraphExceptions.MaxPropSizeReachedException
-import org.apache.s2graph.core.{GraphExceptions, JSONParser}
-import play.api.libs.json.Json
-import scalikejdbc._
-
-import scala.util.Try
-
-object LabelMeta extends Model[LabelMeta] {
-
- /** dummy sequences */
-
- val fromSeq = (-4).toByte
- val toSeq = (-5).toByte
- val lastOpSeq = (-3).toByte
- val lastDeletedAtSeq = (-2).toByte
- val timestampSeq = (0).toByte
- val labelSeq = (-6).toByte
- val directionSeq = -7.toByte
- val fromHashSeq = -8.toByte
-
- val countSeq = (Byte.MaxValue - 2).toByte
- val degreeSeq = (Byte.MaxValue - 1).toByte
- val maxValue = Byte.MaxValue
- val emptySeq = Byte.MaxValue
-
- /** reserved sequences */
- // val deleted = LabelMeta(id = Some(lastDeletedAt), labelId = lastDeletedAt, name = "lastDeletedAt",
- // seq = lastDeletedAt, defaultValue = "", dataType = "long")
- val fromHash = LabelMeta(id = None, labelId = fromHashSeq, name = "_from_hash",
- seq = fromHashSeq, defaultValue = fromHashSeq.toString, dataType = "long")
- val from = LabelMeta(id = Some(fromSeq), labelId = fromSeq, name = "_from",
- seq = fromSeq, defaultValue = fromSeq.toString, dataType = "string")
- val to = LabelMeta(id = Some(toSeq), labelId = toSeq, name = "_to",
- seq = toSeq, defaultValue = toSeq.toString, dataType = "string")
- val timestamp = LabelMeta(id = Some(-1), labelId = -1, name = "_timestamp",
- seq = timestampSeq, defaultValue = "0", dataType = "long")
- val degree = LabelMeta(id = Some(-1), labelId = -1, name = "_degree",
- seq = degreeSeq, defaultValue = "0", dataType = "long")
- val count = LabelMeta(id = Some(-1), labelId = -1, name = "_count",
- seq = countSeq, defaultValue = "-1", dataType = "long")
- val lastDeletedAt = LabelMeta(id = Some(-1), labelId = -1, name = "_lastDeletedAt",
- seq = lastDeletedAtSeq, defaultValue = "-1", dataType = "long")
- val label = LabelMeta(id = Some(-1), labelId = -1, name = "label",
- seq = labelSeq, defaultValue = "", dataType = "string")
- val direction = LabelMeta(id = Some(-1), labelId = -1, name = "direction",
- seq = directionSeq, defaultValue = "out", dataType = "string")
- val empty = LabelMeta(id = Some(-1), labelId = -1, name = "_empty",
- seq = emptySeq, defaultValue = "-1", dataType = "long")
-
- // Each reserved column(_timestamp, timestamp) has same seq number, starts with '_' has high priority
- val reservedMetas = List(empty, label, direction, lastDeletedAt, from, fromHash, to, degree, timestamp, count).flatMap { lm => List(lm, lm.copy(name = lm.name.drop(1))) }.reverse
- val reservedMetasInner = List(empty, label, direction, lastDeletedAt, from, fromHash, to, degree, timestamp, count)
- val reservedMetaNamesSet = reservedMetasInner.map(_.name).toSet
-
- val defaultRequiredMetaNames = Set("from", "_from", "to", "_to", "_from_hash", "label", "direction", "timestamp", "_timestamp")
-
- def apply(rs: WrappedResultSet): LabelMeta = {
- LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), rs.byte("seq"),
- rs.string("default_value"), rs.string("data_type").toLowerCase, rs.boolean("store_in_global_index"))
- }
-
- /** Note: DegreeSeq should not be included in serializer/deserializer.
- * only 0 <= seq <= CountSeq(Int.MaxValue - 2), not DegreeSet(Int.MaxValue - 1) should be
- * included in actual bytes in storage.
- * */
- def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq // || seq == fromHashSeq
-
- def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq // || seq == fromHashSeq
-
- def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta = {
- val cacheKey = "id=" + id
-
- withCache(cacheKey) {
- sql"""select * from label_metas where id = ${id}""".map { rs => LabelMeta(rs) }.single.apply
- }.get
- }
-
- def findAllByLabelId(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[LabelMeta] = {
- val cacheKey = "labelId=" + labelId
- lazy val labelMetas = sql"""select *
- from label_metas
- where label_id = ${labelId} order by seq ASC""".map(LabelMeta(_)).list.apply()
-
- if (useCache) withCaches(cacheKey)(labelMetas)
- else labelMetas
- }
-
- def findByName(labelId: Int, name: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelMeta] = {
- name match {
- case timestamp.name => Some(timestamp)
- case from.name => Some(from)
- case to.name => Some(to)
- case _ =>
- val cacheKey = "labelId=" + labelId + ":name=" + name
- lazy val labelMeta = sql"""
- select *
- from label_metas where label_id = ${labelId} and name = ${name}"""
- .map { rs => LabelMeta(rs) }.single.apply()
-
- if (useCache) withCache(cacheKey)(labelMeta)
- else labelMeta
- }
- }
-
- def insert(labelId: Int, name: String, defaultValue: String, dataType: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = {
- val ls = findAllByLabelId(labelId, useCache = false)
- val seq = ls.size + 1
-
- if (seq < maxValue) {
- sql"""insert into label_metas(label_id, name, seq, default_value, data_type, store_in_global_index)
- select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}, ${storeInGlobalIndex}""".updateAndReturnGeneratedKey.apply()
- } else {
- throw MaxPropSizeReachedException("max property size reached")
- }
- }
-
- def findOrInsert(labelId: Int,
- name: String,
- defaultValue: String,
- dataType: String,
- storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession): LabelMeta = {
-
- findByName(labelId, name) match {
- case Some(c) => c
- case None =>
- insert(labelId, name, defaultValue, dataType, storeInGlobalIndex)
- val cacheKey = "labelId=" + labelId + ":name=" + name
- val cacheKeys = "labelId=" + labelId
- expireCache(cacheKey)
- expireCaches(cacheKeys)
- findByName(labelId, name, useCache = false).get
- }
- }
-
- def delete(id: Int)(implicit session: DBSession = AutoSession) = {
- val labelMeta = findById(id)
- val (labelId, name) = (labelMeta.labelId, labelMeta.name)
- sql"""delete from label_metas where id = ${id}""".execute.apply()
- val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:name=$name")
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
- }
-
- def findAll()(implicit session: DBSession = AutoSession) = {
- val ls = sql"""select * from label_metas""".map { rs => LabelMeta(rs) }.list.apply
- putsToCache(ls.map { x =>
- val cacheKey = s"id=${x.id.get}"
- cacheKey -> x
- })
- putsToCache(ls.map { x =>
- val cacheKey = s"labelId=${x.labelId}:name=${x.name}"
- cacheKey -> x
- })
- putsToCache(ls.map { x =>
- val cacheKey = s"labelId=${x.labelId}:seq=${x.seq}"
- cacheKey -> x
- })
-
- putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
- val cacheKey = s"labelId=${labelId}"
- cacheKey -> ls
- }.toList)
-
- ls
- }
-
- def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit session: DBSession = AutoSession): Try[Long] = Try {
- sql"""
- update label_metas set store_in_global_index = ${storeInGlobalIndex} where id = ${id}
- """.updateAndReturnGeneratedKey.apply()
- }
-}
-
-case class LabelMeta(id: Option[Int],
- labelId: Int,
- name: String,
- seq: Byte,
- defaultValue: String,
- dataType: String,
- storeInGlobalIndex: Boolean = false) {
-
- lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType, "storeInGlobalIndex" -> storeInGlobalIndex)
-
- override def equals(other: Any): Boolean = {
- if (!other.isInstanceOf[LabelMeta]) false
- else {
- val o = other.asInstanceOf[LabelMeta]
-// labelId == o.labelId &&
- seq == o.seq
- }
- }
- override def hashCode(): Int = seq.toInt
-// (labelId, seq).hashCode()
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
deleted file mode 100644
index e21072e..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import java.util.concurrent.Executors
-import java.util.concurrent.atomic.AtomicLong
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.s2graph.core.JSONParser
-import org.apache.s2graph.core.utils.{SafeUpdateCache, logger}
-import play.api.libs.json.{JsObject, JsValue, Json}
-import scalikejdbc._
-
-import scala.concurrent.ExecutionContext
-import scala.io.Source
-import scala.language.{higherKinds, implicitConversions}
-import scala.util.{Failure, Success, Try}
-
-object Model {
- var maxSize = 10000
- var ttl = 60
- val numOfThread = Runtime.getRuntime.availableProcessors()
- val threadPool = Executors.newFixedThreadPool(numOfThread)
- val ec = ExecutionContext.fromExecutor(threadPool)
- val useUTF8Encoding = "?useUnicode=true&characterEncoding=utf8"
-
- private val ModelReferenceCount = new AtomicLong(0L)
-
- def apply(config: Config) = {
- maxSize = config.getInt("cache.max.size")
- ttl = config.getInt("cache.ttl.seconds")
- Class.forName(config.getString("db.default.driver"))
-
- val settings = ConnectionPoolSettings(
- initialSize = 10,
- maxSize = 10,
- connectionTimeoutMillis = 30000L,
- validationQuery = "select 1;")
-
- ConnectionPool.singleton(
- config.getString("db.default.url"),
- config.getString("db.default.user"),
- config.getString("db.default.password"),
- settings)
-
- checkSchema()
-
- ModelReferenceCount.incrementAndGet()
- }
-
- def checkSchema(): Unit = {
- withTx { implicit session =>
- sql"""show tables""".map(rs => rs.string(1)).list.apply()
- } match {
- case Success(tables) =>
- if (tables.isEmpty) {
- // this is a very simple migration tool that only supports creating
- // appropriate tables when there are no tables in the database at all.
- // Ideally, it should be improved to a sophisticated migration tool
- // that supports versioning, etc.
- logger.info("Creating tables ...")
- val schema = getClass.getResourceAsStream("schema.sql")
- val lines = Source.fromInputStream(schema, "UTF-8").getLines
- val sources = lines.map(_.split("-- ").head.trim).mkString("\n")
- val statements = sources.split(";\n")
- withTx { implicit session =>
- statements.foreach(sql => session.execute(sql))
- } match {
- case Success(_) =>
- logger.info("Successfully imported schema")
- case Failure(e) =>
- throw new RuntimeException("Error while importing schema", e)
- }
- }
- case Failure(e) =>
- throw new RuntimeException("Could not list tables in the database", e)
- }
- }
-
- def withTx[T](block: DBSession => T): Try[T] = {
- using(DB(ConnectionPool.borrow())) { conn =>
- Try {
- conn.begin()
- val session = conn.withinTxSession()
- val result = block(session)
-
- conn.commit()
-
- result
- } recoverWith {
- case e: Exception =>
- conn.rollbackIfActive()
- Failure(e)
- }
- }
- }
-
- def shutdown(modelDataDelete: Boolean = false) =
- if (ModelReferenceCount.decrementAndGet() <= 0) {
- // FIXME: When Model is served by embedded database and deleteData is set, Model deletes
- // the underlying database. Its purpose is clearing runtime footprint when running tests.
- if (modelDataDelete) {
- withTx { implicit session =>
- sql"SHOW TABLES"
- .map(rs => rs.string(1))
- .list
- .apply()
- .map { table => s"TRUNCATE TABLE $table" }
- } match {
- case Success(stmts) =>
- val newStmts = List("SET FOREIGN_KEY_CHECKS = 0") ++ stmts ++ List("SET FOREIGN_KEY_CHECKS = 1")
- withTx { implicit session =>
- newStmts.foreach { stmt =>
- session.execute(stmt)
- }
- } match {
- case Success(_) =>
- logger.info(s"Success to truncate models: $stmts")
- case Failure(e) =>
- throw new IllegalStateException(s"Failed to truncate models", e)
- }
- case Failure(e) =>
- throw new IllegalStateException(s"Failed to list models", e)
- }
- }
- clearCache()
- ConnectionPool.closeAll()
- }
-
- def loadCache() = {
- Service.findAll()
- ServiceColumn.findAll()
- Label.findAll()
- LabelMeta.findAll()
- LabelIndex.findAll()
- ColumnMeta.findAll()
- }
-
- def clearCache() = {
- Service.expireAll()
- ServiceColumn.expireAll()
- Label.expireAll()
- LabelMeta.expireAll()
- LabelIndex.expireAll()
- ColumnMeta.expireAll()
- }
-
- def extraOptions(options: Option[String]): Map[String, JsValue] = options match {
- case None => Map.empty
- case Some(v) =>
- try {
- Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
- } catch {
- case e: Exception =>
- logger.error(s"An error occurs while parsing the extra label option", e)
- Map.empty
- }
- }
-
- def toStorageConfig(options: Map[String, JsValue]): Option[Config] = {
- try {
- options.get("storage").map { jsValue =>
- import scala.collection.JavaConverters._
- val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, value) =>
- key -> JSONParser.jsValueToAny(value).getOrElse(throw new RuntimeException("!!"))
- }
- ConfigFactory.parseMap(configMap.asJava)
- }
- } catch {
- case e: Exception =>
- logger.error(s"toStorageConfig error. use default storage", e)
- None
- }
- }
-}
-
-trait Model[V] extends SQLSyntaxSupport[V] {
-
- import Model._
-
- implicit val ec: ExecutionContext = Model.ec
-
- val cName = this.getClass.getSimpleName()
- logger.info(s"LocalCache[$cName]: TTL[$ttl], MaxSize[$maxSize]")
-
- val optionCache = new SafeUpdateCache[Option[V]](cName, maxSize, ttl)
- val listCache = new SafeUpdateCache[List[V]](cName, maxSize, ttl)
-
- val withCache = optionCache.withCache _
-
- val withCaches = listCache.withCache _
-
- val expireCache = optionCache.invalidate _
-
- val expireCaches = listCache.invalidate _
-
- def expireAll() = {
- listCache.invalidateAll()
- optionCache.invalidateAll()
- }
-
- def putsToCache(kvs: List[(String, V)]) = kvs.foreach {
- case (key, value) => optionCache.put(key, Option(value))
- }
-
- def putsToCaches(kvs: List[(String, List[V])]) = kvs.foreach {
- case (key, values) => listCache.put(key, values)
- }
-
- def getAllCacheData() : (List[(String, Option[_])], List[(String, List[_])]) = {
- (optionCache.getAllData(), listCache.getAllData())
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
deleted file mode 100644
index 5b4f494..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import java.util.UUID
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.utils.logger
-import play.api.libs.json.Json
-import scalikejdbc._
-
-object Service extends Model[Service] {
- def valueOf(rs: WrappedResultSet): Service = {
- Service(rs.intOpt("id"), rs.string("service_name"), rs.string("access_token"),
- rs.string("cluster"), rs.string("hbase_table_name"), rs.int("pre_split_size"), rs.intOpt("hbase_table_ttl"))
- }
-
- def findByAccessToken(accessToken: String)(implicit session: DBSession = AutoSession): Option[Service] = {
- val cacheKey = s"accessToken=$accessToken"
- withCache(cacheKey)( sql"""select * from services where access_token = ${accessToken}""".map { rs => Service.valueOf(rs) }.single.apply)
- }
-
- def findById(id: Int)(implicit session: DBSession = AutoSession): Service = {
- val cacheKey = "id=" + id
- withCache(cacheKey)( sql"""select * from services where id = ${id}""".map { rs => Service.valueOf(rs) }.single.apply).get
- }
-
- def findByName(serviceName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Service] = {
- val cacheKey = "serviceName=" + serviceName
- lazy val serviceOpt = sql"""
- select * from services where service_name = ${serviceName}
- """.map { rs => Service.valueOf(rs) }.single.apply()
-
- if (useCache) withCache(cacheKey)(serviceOpt)
- else serviceOpt
- }
-
- def insert(serviceName: String, cluster: String,
- hTableName: String, preSplitSize: Int, hTableTTL: Option[Int],
- compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Unit = {
- logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, $hTableTTL, $compressionAlgorithm")
- val accessToken = UUID.randomUUID().toString()
- sql"""insert into services(service_name, access_token, cluster, hbase_table_name, pre_split_size, hbase_table_ttl)
- values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, ${preSplitSize}, ${hTableTTL})""".execute.apply()
- }
-
- def delete(id: Int)(implicit session: DBSession = AutoSession) = {
- val service = findById(id)
- val serviceName = service.serviceName
- sql"""delete from service_columns where id = ${id}""".execute.apply()
- val cacheKeys = List(s"id=$id", s"serviceName=$serviceName")
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
- }
-
- def findOrInsert(serviceName: String, cluster: String, hTableName: String,
- preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Service = {
- findByName(serviceName, useCache) match {
- case Some(s) => s
- case None =>
- insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
- val cacheKey = "serviceName=" + serviceName
- expireCache(cacheKey)
- findByName(serviceName).get
- }
- }
-
- def findAll()(implicit session: DBSession = AutoSession) = {
- val ls = sql"""select * from services""".map { rs => Service.valueOf(rs) }.list.apply
- putsToCache(ls.map { x =>
- val cacheKey = s"id=${x.id.get}"
- (cacheKey -> x)
- })
-
- putsToCache(ls.map { x =>
- val cacheKey = s"serviceName=${x.serviceName}"
- (cacheKey -> x)
- })
-
- ls
- }
-
- def findAllConn()(implicit session: DBSession = AutoSession): List[String] = {
- sql"""select distinct(cluster) from services""".map { rs => rs.string("cluster") }.list.apply
- }
-}
-
-case class Service(id: Option[Int],
- serviceName: String,
- accessToken: String,
- cluster: String,
- hTableName: String,
- preSplitSize: Int,
- hTableTTL: Option[Int],
- options: Option[String] = None) {
- lazy val toJson =
- id match {
- case Some(_id) =>
- Json.obj("id" -> _id, "name" -> serviceName, "accessToken" -> accessToken, "cluster" -> cluster,
- "hTableName" -> hTableName, "preSplitSize" -> preSplitSize, "hTableTTL" -> hTableTTL)
- case None =>
- Json.parse("{}")
- }
-
- lazy val extraOptions = Model.extraOptions(options)
- lazy val storageConfigOpt: Option[Config] = toStorageConfig
- def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = useCache)
- def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions)
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
deleted file mode 100644
index e8bec06..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import org.apache.s2graph.core.JSONParser
-import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.types.{HBaseType, InnerValLike, InnerValLikeWithTs}
-import play.api.libs.json.Json
-import scalikejdbc._
-
-object ServiceColumn extends Model[ServiceColumn] {
- val Default = ServiceColumn(Option(0), -1, "default", "string", "v4")
-
- def valueOf(rs: WrappedResultSet): ServiceColumn = {
- ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version"))
- }
-
- def findByServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[ServiceColumn] = {
- val cacheKey = "serviceId=" + serviceId
-
- lazy val sql = sql"""select * from service_columns where service_id = ${serviceId}""".map { x => ServiceColumn.valueOf(x) }.list().apply()
-
- if (useCache) withCaches(cacheKey)(sql)
- else sql
- }
-
- def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
- val cacheKey = "id=" + id
-
- if (useCache) {
- withCache(cacheKey)(sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply).get
- } else {
- sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply.get
- }
- }
-
- def find(serviceId: Int, columnName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = {
-// val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
- val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
- if (useCache) {
- withCache(cacheKey) {
- sql"""
- select * from service_columns where service_id = ${serviceId} and column_name = ${columnName}
- """.map { rs => ServiceColumn.valueOf(rs) }.single.apply()
- }
- } else {
- sql"""
- select * from service_columns where service_id = ${serviceId} and column_name = ${columnName}
- """.map { rs => ServiceColumn.valueOf(rs) }.single.apply()
- }
- }
- def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String)(implicit session: DBSession = AutoSession) = {
- sql"""insert into service_columns(service_id, column_name, column_type, schema_version)
- values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion})""".execute.apply()
- }
- def delete(id: Int)(implicit session: DBSession = AutoSession) = {
- val serviceColumn = findById(id, useCache = false)
- val (serviceId, columnName) = (serviceColumn.serviceId, serviceColumn.columnName)
- sql"""delete from service_columns where id = ${id}""".execute.apply()
- val cacheKeys = List(s"id=$id", s"serviceId=$serviceId:columnName=$columnName")
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
- }
- def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
- find(serviceId, columnName, useCache) match {
- case Some(sc) => sc
- case None =>
- insert(serviceId, columnName, columnType, schemaVersion)
-// val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
- val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
- expireCache(cacheKey)
- find(serviceId, columnName).get
- }
- }
- def findAll()(implicit session: DBSession = AutoSession) = {
- val ls = sql"""select * from service_columns""".map { rs => ServiceColumn.valueOf(rs) }.list.apply
- putsToCache(ls.map { x =>
- var cacheKey = s"id=${x.id.get}"
- (cacheKey -> x)
- })
- putsToCache(ls.map { x =>
- var cacheKey = s"serviceId=${x.serviceId}:columnName=${x.columnName}"
- (cacheKey -> x)
- })
- ls
- }
-}
-case class ServiceColumn(id: Option[Int],
- serviceId: Int,
- columnName: String,
- columnType: String,
- schemaVersion: String) {
-
- lazy val service = Service.findById(serviceId)
- lazy val metasWithoutCache = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get, false) :+ ColumnMeta.lastModifiedAtColumn
- lazy val metas = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get) :+ ColumnMeta.lastModifiedAtColumn
- lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap
- lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap
- lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap
- lazy val metaPropsDefaultMap = metas.map { meta =>
- meta -> JSONParser.toInnerVal(meta.defaultValue, meta.dataType, schemaVersion)
- }.toMap
- lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType)
-
- def propsToInnerVals(props: Map[String, Any]): Map[ColumnMeta, InnerValLike] = {
- val ret = for {
- (k, v) <- props
- labelMeta <- metasInvMap.get(k)
- innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion)
- } yield labelMeta -> innerVal
-
- ret
- }
-
- def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = {
- for {
- (k, v) <- props
- columnMeta <- metasMap.get(k)
- } yield {
- columnMeta.name -> v.value
- }
- }
-
- def innerValsWithTsToProps(props: Map[Int, InnerValLikeWithTs]): Map[String, Any] = {
- for {
- (k, v) <- props
- columnMeta <- metasMap.get(k)
- } yield {
- columnMeta.name -> v.innerVal.value
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala
deleted file mode 100644
index 38e1761..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import scalikejdbc.{AutoSession, DBSession, WrappedResultSet, _}
-
-object ServiceColumnIndex extends Model[ServiceColumnIndex] {
- val dbTableName = "service_column_indices"
- val DefaultName = "_PK"
- val DefaultSeq = 1.toByte
- val MaxOrderSeq = 7
-
- def apply(rs: WrappedResultSet): ServiceColumnIndex = {
- ServiceColumnIndex(rs.intOpt("id"), rs.int("service_id"), rs.int("service_column_id"),
- rs.string("name"),
- rs.byte("seq"), rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match {
- case metaSeqsList => metaSeqsList
- },
- rs.stringOpt("options")
- )
- }
-
- def findById(id: Int)(implicit session: DBSession = AutoSession) = {
- val cacheKey = "id=" + id
- lazy val sql = sql"""select * from $dbTableName where id = ${id}"""
- withCache(cacheKey) {
- sql.map { rs => ServiceColumnIndex(rs) }.single.apply
- }.get
- }
-
- def findBySeqs(serviceId: Int, serviceColumnId: Int, seqs: List[Byte])(implicit session: DBSession = AutoSession): Option[ServiceColumnIndex] = {
- val cacheKey = "serviceId=" + serviceId + ":serviceColumnId=" + serviceColumnId + ":seqs=" + seqs.mkString(",")
- lazy val sql =
- sql"""
- select * from $dbTableName where service_id = $serviceId and service_column_id = $serviceColumnId and meta_seqs = ${seqs.mkString(",")}
- """
- withCache(cacheKey) {
- sql.map { rs => ServiceColumnIndex(rs) }.single.apply
- }
- }
-
- def findBySeq(serviceId: Int,
- serviceColumnId: Int,
- seq: Byte,
- useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
- val cacheKey = "serviceId=" + serviceId + ":serviceColumnId=" + serviceColumnId + ":seq=" + seq
- lazy val sql =
- sql"""
- select * from $dbTableName where service_id = $serviceId and service_column_id = $serviceColumnId and seq = ${seq}
- """
- if (useCache) {
- withCache(cacheKey)(sql.map { rs => ServiceColumnIndex(rs) }.single.apply)
- } else {
- sql.map { rs => ServiceColumnIndex(rs) }.single.apply
- }
- }
-
-
- def findAll(serviceId: Int, serviceColumnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
- val cacheKey = s"serviceId=$serviceId:serviceColumnId=$serviceColumnId"
- lazy val sql =
- sql"""
- select * from $dbTableName where service_id = ${serviceId} and seq > 0 order by seq ASC
- """
- if (useCache) {
- withCaches(cacheKey)(
- sql.map { rs => ServiceColumnIndex(rs) }.list.apply
- )
- } else {
- sql.map { rs => LabelIndex(rs) }.list.apply
- }
- }
-
- def insert(serviceId: Int,
- serviceColumnId: Int,
- indexName: String,
- seq: Byte, metaSeqs: List[Byte], options: Option[String])(implicit session: DBSession = AutoSession): Long = {
- sql"""
- insert into $dbTableName(service_id, service_column_id, name, seq, meta_seqs, options)
- values (${serviceId}, ${serviceColumnId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${options})
- """
- .updateAndReturnGeneratedKey.apply()
- }
-
- def findOrInsert(serviceId: Int,
- serviceColumnId: Int,
- indexName: String,
- metaSeqs: List[Byte],
- options: Option[String])(implicit session: DBSession = AutoSession): ServiceColumnIndex = {
- findBySeqs(serviceId, serviceColumnId, metaSeqs) match {
- case Some(s) => s
- case None =>
- val orders = findAll(serviceId, serviceColumnId, false)
- val seq = (orders.size + 1).toByte
- assert(seq <= MaxOrderSeq)
- val createdId = insert(serviceId, serviceColumnId, indexName, seq, metaSeqs, options)
- val cacheKeys = toCacheKeys(createdId.toInt, serviceId, serviceColumnId, seq, metaSeqs)
-
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
- findBySeq(serviceId, serviceColumnId, seq).get
- }
- }
-
- def toCacheKeys(id: Int, serviceId: Int, serviceColumnId: Int, seq: Byte, seqs: Seq[Byte]): Seq[String] = {
- Seq(s"id=$id",
- s"serviceId=$serviceId:serviceColumnId=$serviceColumnId:seq=$seq",
- s"serviceId=$serviceId:serviceColumnId=$serviceColumnId:seqs=$seqs",
- s"serviceId=$serviceId:serviceColumnId=$serviceColumnId")
- }
-
- def delete(id: Int)(implicit session: DBSession = AutoSession) = {
- val me = findById(id)
- val seqs = me.metaSeqs.mkString(",")
- val (serviceId, serviceColumnId, seq) = (me.serviceId, me.serviceColumnId, me.seq)
- lazy val sql = sql"""delete from $dbTableName where id = ${id}"""
-
- sql.execute.apply()
-
- val cacheKeys = toCacheKeys(id, serviceId, serviceColumnId, seq, me.metaSeqs)
-
- cacheKeys.foreach { key =>
- expireCache(key)
- expireCaches(key)
- }
- }
-
-// def findAll()(implicit session: DBSession = AutoSession) = {
-// val ls = sql"""select * from $dbTableName""".map { rs => ServiceColumnIndex(rs) }.list.apply
-// val singles = ls.flatMap { x =>
-// val cacheKeys = toCacheKeys(x.id.get, x.serviceId, x.serviceColumnId, x.seq, x.metaSeqs).dropRight(1)
-// cacheKeys.map { cacheKey =>
-// cacheKey -> x
-// }
-// }
-// val multies = ls.groupBy(x => (x.serviceId, x.serviceColumnId)).map { case ((serviceId, serviceColumnId), ls) =>
-// val cacheKey = s"serviceId=$serviceId:serviceColumnId=$serviceColumnId"
-// cacheKey -> ls
-// }.toList
-//
-// putsToCache(singles)
-// putsToCaches(multies)
-//
-// }
-}
-
-case class ServiceColumnIndex(id: Option[Int],
- serviceId: Int,
- serviceColumnId: Int,
- name: String,
- seq: Byte,
- metaSeqs: Seq[Byte],
- options: Option[String]) {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index 00ef233..75e9657 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -20,7 +20,7 @@
package org.apache.s2graph.core.parsers
import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, WhereParserException}
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
import org.apache.s2graph.core.types.InnerValLike
import org.apache.s2graph.core.{S2EdgeLike}
import org.apache.s2graph.core.JSONParser._
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 6b1ce75..f59abc0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -26,7 +26,7 @@ import com.google.common.cache.CacheBuilder
import org.apache.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException}
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.parsers.{Where, WhereParser}
import org.apache.s2graph.core.types._
import play.api.libs.json._
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index 460c627..c768d81 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -24,7 +24,7 @@ import java.net.URL
import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
+import org.apache.s2graph.core.schema.{Bucket, Experiment, Service}
import org.apache.s2graph.core.utils.logger
import play.api.libs.json._
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala
new file mode 100644
index 0000000..c88f854
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+import play.api.libs.json.{JsValue, Json}
+import scalikejdbc._
+
+import scala.util.Try
+
+object Bucket extends SQLSyntaxSupport[Bucket] {
+ import Schema._
+ val className = Bucket.getClass.getSimpleName
+
+ val rangeDelimiter = "~"
+ val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.")
+ val InActiveModulars = Set("0~0")
+
+ def valueOf(rs: WrappedResultSet): Bucket = {
+ Bucket(rs.intOpt("id"),
+ rs.int("experiment_id"),
+ rs.string("modular"),
+ rs.string("http_verb"),
+ rs.string("api_path"),
+ rs.string("request_body"),
+ rs.int("timeout"),
+ rs.string("impression_id"),
+ rs.boolean("is_graph_query"),
+ rs.boolean("is_empty"))
+ }
+
+ def finds(experimentId: Int)(implicit session: DBSession = AutoSession): List[Bucket] = {
+ val cacheKey = className + "experimentId=" + experimentId
+
+ withCaches(cacheKey, broadcast = false) {
+ sql"""select * from buckets where experiment_id = $experimentId"""
+ .map { rs => Bucket.valueOf(rs) }.list().apply()
+ }
+ }
+
+ def toRange(str: String): Option[(Int, Int)] = {
+ val range = str.split(rangeDelimiter)
+ if (range.length == 2) Option((range.head.toInt, range.last.toInt))
+ else None
+ }
+
+ def findByImpressionId(impressionId: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Bucket] = {
+ val cacheKey = className + "impressionId=" + impressionId
+
+ lazy val sql = sql"""select * from buckets where impression_id=$impressionId"""
+ .map { rs => Bucket.valueOf(rs)}.single().apply()
+
+ if (useCache) withCache(cacheKey)(sql)
+ else sql
+
+ }
+
+ def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Bucket = {
+ val cacheKey = className + "id=" + id
+ lazy val sql = sql"""select * from buckets where id = $id""".map { rs => Bucket.valueOf(rs)}.single().apply()
+ if (useCache) withCache(cacheKey, false) { sql }.get
+ else sql.get
+ }
+
+ def update(id: Int,
+ experimentId: Int,
+ modular: String,
+ httpVerb: String,
+ apiPath: String,
+ requestBody: String,
+ timeout: Int,
+ impressionId: String,
+ isGraphQuery: Boolean,
+ isEmpty: Boolean)(implicit session: DBSession = AutoSession): Try[Bucket] = {
+ Try {
+ sql"""
+ UPDATE buckets set experiment_id = $experimentId, modular = $modular, http_verb = $httpVerb, api_path = $apiPath,
+ request_body = $requestBody, timeout = $timeout, impression_id = $impressionId,
+ is_graph_query = $isGraphQuery, is_empty = $isEmpty WHERE id = $id
+ """
+ .update().apply()
+ }.map { cnt =>
+ findById(id)
+ }
+ }
+
+ def insert(experimentId: Int, modular: String, httpVerb: String, apiPath: String,
+ requestBody: String, timeout: Int, impressionId: String,
+ isGraphQuery: Boolean, isEmpty: Boolean)
+ (implicit session: DBSession = AutoSession): Try[Bucket] = {
+ Try {
+ sql"""
+ INSERT INTO buckets(experiment_id, modular, http_verb, api_path, request_body, timeout, impression_id,
+ is_graph_query, is_empty)
+ VALUES (${experimentId}, $modular, $httpVerb, $apiPath, $requestBody, $timeout, $impressionId,
+ $isGraphQuery, $isEmpty)
+ """
+ .updateAndReturnGeneratedKey().apply()
+ }.map { newId =>
+ Bucket(Some(newId.toInt), experimentId, modular, httpVerb, apiPath, requestBody, timeout, impressionId,
+ isGraphQuery, isEmpty)
+ }
+ }
+}
+
+case class Bucket(id: Option[Int],
+ experimentId: Int,
+ modular: String,
+ httpVerb: String, apiPath: String,
+ requestBody: String, timeout: Int, impressionId: String,
+ isGraphQuery: Boolean = true,
+ isEmpty: Boolean = false) {
+
+ import Bucket._
+
+ lazy val rangeOpt = toRange(modular)
+
+ def toJson(): JsValue =
+ Json.obj("id" -> id, "experimentId" -> experimentId, "modular" -> modular, "httpVerb" -> httpVerb,
+ "requestBody" -> requestBody, "isGraphQuery" -> isGraphQuery, "isEmpty" -> isEmpty)
+
+}