You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/04/26 03:09:52 UTC

[3/7] incubator-s2graph git commit: - rename mysqls package to schema. - remove GlobalIndex. - add toBytes, fromBytes on Schema.

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
deleted file mode 100644
index 0b16449..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Experiment.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import org.apache.s2graph.core.GraphUtil
-import scalikejdbc._
-
-import scala.util.{Try, Random}
-
-object Experiment extends Model[Experiment] {
-  val ImpressionKey = "S2-Impression-Id"
-  val ImpressionId = "Impression-Id"
-
-  def apply(rs: WrappedResultSet): Experiment = {
-    Experiment(rs.intOpt("id"),
-      rs.int("service_id"),
-      rs.string("name"),
-      rs.string("description"),
-      rs.string("experiment_type"),
-      rs.int("total_modular"))
-  }
-
-  def finds(serviceId: Int)(implicit session: DBSession = AutoSession): List[Experiment] = {
-    val cacheKey = "serviceId=" + serviceId
-    withCaches(cacheKey) {
-      sql"""select * from experiments where service_id = ${serviceId}"""
-        .map { rs => Experiment(rs) }.list().apply()
-    }
-  }
-
-  def findBy(serviceId: Int, name: String)(implicit session: DBSession = AutoSession): Option[Experiment] = {
-    val cacheKey = "serviceId=" + serviceId + ":name=" + name
-    withCache(cacheKey) {
-      sql"""select * from experiments where service_id = ${serviceId} and name = ${name}"""
-        .map { rs => Experiment(rs) }.single.apply
-    }
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession): Option[Experiment] = {
-    val cacheKey = "id=" + id
-    withCache(cacheKey)(
-      sql"""select * from experiments where id = ${id}"""
-        .map { rs => Experiment(rs) }.single.apply
-    )
-  }
-
-  def insert(service: Service, name: String, description: String, experimentType: String = "t", totalModular: Int = 100)
-            (implicit session: DBSession = AutoSession): Try[Experiment] = {
-    Try {
-      sql"""INSERT INTO experiments(service_id, service_name, `name`, description, experiment_type, total_modular)
-         VALUES(${service.id.get}, ${service.serviceName}, $name, $description, $experimentType, $totalModular)"""
-        .updateAndReturnGeneratedKey().apply()
-    }.map { newId =>
-      Experiment(Some(newId.toInt), service.id.get, name, description, experimentType, totalModular)
-    }
-  }
-}
-
-case class Experiment(id: Option[Int],
-                      serviceId: Int,
-                      name: String,
-                      description: String,
-                      experimentType: String,
-                      totalModular: Int) {
-
-  def buckets = Bucket.finds(id.get)
-
-  def rangeBuckets = for {
-    bucket <- buckets
-    range <- bucket.rangeOpt
-  } yield range -> bucket
-
-
-  def findBucket(uuid: String, impIdOpt: Option[String] = None): Option[Bucket] = {
-    impIdOpt match {
-      case Some(impId) => Bucket.findByImpressionId(impId)
-      case None =>
-        val seed = experimentType match {
-          case "u" => (GraphUtil.murmur3(uuid) % totalModular) + 1
-          case _ => Random.nextInt(totalModular) + 1
-        }
-        findBucket(seed)
-    }
-  }
-
-  def findBucket(uuidMod: Int): Option[Bucket] = {
-    rangeBuckets.find { case ((from, to), bucket) =>
-      from <= uuidMod && uuidMod <= to
-    }.map(_._2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
deleted file mode 100644
index 501a964..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/GlobalIndex.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
-import scalikejdbc.{AutoSession, DBSession, WrappedResultSet}
-import scalikejdbc._
-
-object GlobalIndex extends Model[GlobalIndex] {
-  val vidField = "_vid_"
-  val eidField = "_eid_"
-  val labelField = "_label_"
-  val serviceField = "_service_"
-  val serviceColumnField = "_serviceColumn_"
-  val EdgeType = "edge"
-  val VertexType = "vertex"
-  val hiddenIndexFields = Set(vidField, eidField, labelField, serviceField, serviceColumnField)
-
-//  val IndexName = "global_indices"
-  val VertexIndexName = "global_vertex_index"
-  val EdgeIndexName = "global_edge_index"
-  val TypeName = "test"
-
-  def apply(rs: WrappedResultSet): GlobalIndex = {
-    GlobalIndex(rs.intOpt("id"),
-      rs.string("element_type"),
-      rs.string("prop_names").split(",").sorted,
-      rs.string("index_name"))
-  }
-
-  def findBy(elementType: String, indexName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[GlobalIndex] = {
-    val cacheKey = s"elementType=$elementType:indexName=$indexName"
-    lazy val sql = sql"""select * from global_indices where element_type = ${elementType} and index_name = $indexName""".map { rs => GlobalIndex(rs) }.single.apply()
-
-    if (useCache) withCache(cacheKey){sql}
-    else sql
-  }
-
-  def insert(elementType: String, indexName: String, propNames: Seq[String])(implicit session: DBSession = AutoSession): Long = {
-    val allPropNames = (hiddenIndexFields.toSeq ++ propNames).sorted
-    sql"""insert into global_indices(element_type, prop_names, index_name)
-         values($elementType, ${allPropNames.mkString(",")}, $indexName)"""
-      .updateAndReturnGeneratedKey.apply()
-  }
-
-  def findAll(elementType: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[GlobalIndex] = {
-    lazy val ls = sql"""select * from global_indices where element_type = $elementType""".map { rs => GlobalIndex(rs) }.list.apply
-    if (useCache) {
-      listCache.withCache(s"findAll:elementType=$elementType") {
-        putsToCache(ls.map { globalIndex =>
-          val cacheKey = s"elementType=${globalIndex.elementType}:indexName=${globalIndex.indexName}"
-          cacheKey -> globalIndex
-        })
-        ls
-      }
-    } else {
-      ls
-    }
-  }
-
-  def findGlobalIndex(elementType: String, hasContainers: java.util.List[HasContainer])(implicit session: DBSession = AutoSession): Option[GlobalIndex] = {
-    import scala.collection.JavaConversions._
-    val indices = findAll(elementType, useCache = true)
-    val keys = hasContainers.map(_.getKey)
-
-    val sorted = indices.map { index =>
-      val matched = keys.filter(index.propNamesSet)
-      index -> matched.length
-    }.filter(_._2 > 0).sortBy(_._2 * -1)
-
-    sorted.headOption.map(_._1)
-  }
-
-}
-
-case class GlobalIndex(id: Option[Int],
-                       elementType: String,
-                       propNames: Seq[String],
-                       indexName: String)  {
-  val backendIndexName = indexName + "_" + elementType
-  val backendIndexNameWithType = backendIndexName + "/test1"
-  lazy val propNamesSet = propNames.toSet
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
deleted file mode 100644
index c128163..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Label.scala
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import java.util.Calendar
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.GraphExceptions.ModelNotFoundException
-import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
-import org.apache.s2graph.core.types.{InnerVal, InnerValLike, InnerValLikeWithTs}
-import org.apache.s2graph.core.utils.logger
-import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
-import scalikejdbc._
-
-object Label extends Model[Label] {
-
-  val maxHBaseTableNames = 2
-
-  def apply(rs: WrappedResultSet): Label = {
-    Label(Option(rs.int("id")), rs.string("label"),
-      rs.int("src_service_id"), rs.string("src_column_name"), rs.string("src_column_type"),
-      rs.int("tgt_service_id"), rs.string("tgt_column_name"), rs.string("tgt_column_type"),
-      rs.boolean("is_directed"), rs.string("service_name"), rs.int("service_id"), rs.string("consistency_level"),
-      rs.string("hbase_table_name"), rs.intOpt("hbase_table_ttl"), rs.string("schema_version"), rs.boolean("is_async"),
-      rs.string("compressionAlgorithm"), rs.stringOpt("options"))
-  }
-
-  def deleteAll(label: Label)(implicit session: DBSession) = {
-    val id = label.id
-    LabelMeta.findAllByLabelId(id.get, false).foreach { x => LabelMeta.delete(x.id.get) }
-    LabelIndex.findByLabelIdAll(id.get, false).foreach { x => LabelIndex.delete(x.id.get) }
-    Label.delete(id.get)
-  }
-
-
-  def findByName(labelName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Label] = {
-    val cacheKey = "label=" + labelName
-    lazy val labelOpt =
-      sql"""
-        select *
-        from labels
-        where label = ${labelName}
-        and deleted_at is null """.map { rs => Label(rs) }.single.apply()
-
-    if (useCache) withCache(cacheKey)(labelOpt)
-    else labelOpt
-  }
-
-  def insert(label: String,
-             srcServiceId: Int,
-             srcColumnName: String,
-             srcColumnType: String,
-             tgtServiceId: Int,
-             tgtColumnName: String,
-             tgtColumnType: String,
-             isDirected: Boolean,
-             serviceName: String,
-             serviceId: Int,
-             consistencyLevel: String,
-             hTableName: String,
-             hTableTTL: Option[Int],
-             schemaVersion: String,
-             isAsync: Boolean,
-             compressionAlgorithm: String,
-             options: Option[String])(implicit session: DBSession = AutoSession) = {
-    sql"""
-    	insert into labels(label,
-    src_service_id, src_column_name, src_column_type,
-    tgt_service_id, tgt_column_name, tgt_column_type,
-    is_directed, service_name, service_id, consistency_level, hbase_table_name, hbase_table_ttl, schema_version, is_async,
-    compressionAlgorithm, options)
-    	values (${label},
-    ${srcServiceId}, ${srcColumnName}, ${srcColumnType},
-    ${tgtServiceId}, ${tgtColumnName}, ${tgtColumnType},
-    ${isDirected}, ${serviceName}, ${serviceId}, ${consistencyLevel}, ${hTableName}, ${hTableTTL},
-    ${schemaVersion}, ${isAsync}, ${compressionAlgorithm}, ${options})
-    """
-      .updateAndReturnGeneratedKey.apply()
-  }
-
-  def findByIdOpt(id: Int)(implicit session: DBSession = AutoSession): Option[Label] = {
-    val cacheKey = "id=" + id
-    withCache(cacheKey)(
-      sql"""
-        select 	*
-        from 	labels
-        where 	id = ${id}
-        and deleted_at is null"""
-        .map { rs => Label(rs) }.single.apply())
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession): Label = {
-    val cacheKey = "id=" + id
-    withCache(cacheKey)(
-      sql"""
-        select 	*
-        from 	labels
-        where 	id = ${id}
-        and deleted_at is null"""
-        .map { rs => Label(rs) }.single.apply()).get
-  }
-
-  def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
-    val cacheKey = "tgtColumnId=" + columnId
-    val col = ServiceColumn.findById(columnId)
-    withCaches(cacheKey)(
-      sql"""
-          select	*
-          from	labels
-          where	tgt_column_name = ${col.columnName}
-          and service_id = ${col.serviceId}
-          and deleted_at is null
-        """.map { rs => Label(rs) }.list().apply())
-  }
-
-  def findBySrcColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
-    val cacheKey = "srcColumnId=" + columnId
-    val col = ServiceColumn.findById(columnId)
-    withCaches(cacheKey)(
-      sql"""
-          select 	*
-          from	labels
-          where	src_column_name = ${col.columnName}
-          and service_id = ${col.serviceId}
-          and deleted_at is null
-        """.map { rs => Label(rs) }.list().apply())
-  }
-
-  def findBySrcServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
-    val cacheKey = "srcServiceId=" + serviceId
-    withCaches(cacheKey)(
-      sql"""select * from labels where src_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
-    )
-  }
-
-  def findByTgtServiceId(serviceId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
-    val cacheKey = "tgtServiceId=" + serviceId
-    withCaches(cacheKey)(
-      sql"""select * from labels where tgt_service_id = ${serviceId} and deleted_at is null""".map { rs => Label(rs) }.list().apply
-    )
-  }
-
-  def insertAll(labelName: String, srcServiceName: String, srcColumnName: String, srcColumnType: String,
-                tgtServiceName: String, tgtColumnName: String, tgtColumnType: String,
-                isDirected: Boolean = true,
-                serviceName: String,
-                indices: Seq[Index],
-                metaProps: Seq[Prop],
-                consistencyLevel: String,
-                hTableName: Option[String],
-                hTableTTL: Option[Int],
-                schemaVersion: String,
-                isAsync: Boolean,
-                compressionAlgorithm: String,
-                options: Option[String])(implicit session: DBSession = AutoSession): Label = {
-
-    val srcServiceOpt = Service.findByName(srcServiceName, useCache = false)
-    val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false)
-    val serviceOpt = Service.findByName(serviceName, useCache = false)
-    if (srcServiceOpt.isEmpty) throw new RuntimeException(s"source service $srcServiceName is not created.")
-    if (tgtServiceOpt.isEmpty) throw new RuntimeException(s"target service $tgtServiceName is not created.")
-    if (serviceOpt.isEmpty) throw new RuntimeException(s"service $serviceName is not created.")
-
-    val newLabel = for {
-      srcService <- srcServiceOpt
-      tgtService <- tgtServiceOpt
-      service <- serviceOpt
-    } yield {
-        val srcServiceId = srcService.id.get
-        val tgtServiceId = tgtService.id.get
-        val serviceId = service.id.get
-
-        /* insert serviceColumn */
-        val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType))
-        val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType))
-
-        if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}")
-        if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}")
-
-        /* create label */
-        Label.findByName(labelName, useCache = false).getOrElse {
-
-          val createdId = insert(labelName, srcServiceId, srcColumnName, srcColumnType,
-            tgtServiceId, tgtColumnName, tgtColumnType, isDirected, serviceName, serviceId, consistencyLevel,
-            hTableName.getOrElse(service.hTableName), hTableTTL.orElse(service.hTableTTL), schemaVersion, isAsync,
-            compressionAlgorithm, options).toInt
-
-          val labelMetaMap = metaProps.map { case Prop(propName, defaultValue, dataType, storeInGlobalIndex) =>
-            val labelMeta = LabelMeta.findOrInsert(createdId, propName, defaultValue, dataType, storeInGlobalIndex)
-            (propName -> labelMeta.seq)
-          }.toMap ++ LabelMeta.reservedMetas.map (labelMeta => labelMeta.name -> labelMeta.seq).toMap
-
-          if (indices.isEmpty) {
-            // make default index with _PK, _timestamp, 0
-            LabelIndex.findOrInsert(createdId, LabelIndex.DefaultName, LabelIndex.DefaultMetaSeqs.toList, "none", None, None)
-          } else {
-            indices.foreach { index =>
-              val metaSeq = index.propNames.map { name => labelMetaMap(name) }
-              LabelIndex.findOrInsert(createdId, index.name, metaSeq.toList, "none", index.direction, index.options)
-            }
-          }
-
-          val cacheKeys = List(s"id=$createdId", s"label=$labelName")
-          val ret = findByName(labelName, useCache = false).get
-          putsToCache(cacheKeys.map(k => k -> ret))
-          ret
-        }
-      }
-
-    newLabel.getOrElse(throw new RuntimeException("failed to create label"))
-  }
-
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from labels where deleted_at is null""".map { rs => Label(rs) }.list().apply()
-
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
-    })
-
-    putsToCache(ls.map { x =>
-      val cacheKey = s"label=${x.label}"
-      (cacheKey -> x)
-    })
-
-    ls
-  }
-
-  def updateName(oldName: String, newName: String)(implicit session: DBSession = AutoSession) = {
-    logger.info(s"rename label: $oldName -> $newName")
-    sql"""update labels set label = ${newName} where label = ${oldName}""".update.apply()
-  }
-
-  def updateHTableName(labelName: String, newHTableName: String)(implicit session: DBSession = AutoSession) = {
-    logger.info(s"update HTable of label $labelName to $newHTableName")
-    val cnt = sql"""update labels set hbase_table_name = $newHTableName where label = $labelName""".update().apply()
-    val label = Label.findByName(labelName, useCache = false).get
-
-    val cacheKeys = List(s"id=${label.id}", s"label=${label.label}")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-    cnt
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val label = findById(id)
-    logger.info(s"delete label: $label")
-    val cnt = sql"""delete from labels where id = ${label.id.get}""".update().apply()
-    val cacheKeys = List(s"id=$id", s"label=${label.label}")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-    cnt
-  }
-
-  def markDeleted(label: Label)(implicit session: DBSession = AutoSession) = {
-
-    logger.info(s"mark deleted label: $label")
-    val oldName = label.label
-    val now = Calendar.getInstance().getTime
-    val newName = s"deleted_${now.getTime}_"+ label.label
-    val cnt = sql"""update labels set label = ${newName}, deleted_at = ${now} where id = ${label.id.get}""".update.apply()
-    val cacheKeys = List(s"id=${label.id}", s"label=${oldName}")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-    cnt
-  }
-}
-
-case class Label(id: Option[Int], label: String,
-                 srcServiceId: Int, srcColumnName: String, srcColumnType: String,
-                 tgtServiceId: Int, tgtColumnName: String, tgtColumnType: String,
-                 isDirected: Boolean = true, serviceName: String, serviceId: Int, consistencyLevel: String = "strong",
-                 hTableName: String, hTableTTL: Option[Int],
-                 schemaVersion: String, isAsync: Boolean = false,
-                 compressionAlgorithm: String,
-                 options: Option[String]) {
-  def metas(useCache: Boolean = true) = LabelMeta.findAllByLabelId(id.get, useCache = useCache)
-
-  def indices(useCache: Boolean = true) = LabelIndex.findByLabelIdAll(id.get, useCache = useCache)
-
-  //  lazy val firstHBaseTableName = hbaseTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME)
-  lazy val srcService = Service.findById(srcServiceId)
-  lazy val tgtService = Service.findById(tgtServiceId)
-  lazy val service = Service.findById(serviceId)
-  /**
-   * TODO
-   * change this to apply hbase table from target serviceName
-   */
-  //  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, service.tableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
-  //  lazy val (hbaseZkAddr, hbaseTableName) = (Config.HBASE_ZOOKEEPER_QUORUM, hTableName.split(",").headOption.getOrElse(Config.HBASE_TABLE_NAME))
-  //  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").headOption.getOrElse(GraphConnection.getConfVal("hbase.table.name")))
-  lazy val (hbaseZkAddr, hbaseTableName) = (service.cluster, hTableName.split(",").head)
-
-  lazy val srcColumn = ServiceColumn.find(srcServiceId, srcColumnName).getOrElse(throw ModelNotFoundException("Source column not found"))
-  lazy val tgtColumn = ServiceColumn.find(tgtServiceId, tgtColumnName).getOrElse(throw ModelNotFoundException("Target column not found"))
-
-  lazy val defaultIndex = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq)
-
-  //TODO: Make sure this is correct
-
-//  lazy val metas = metas(useCache = true)
-  lazy val indices = LabelIndex.findByLabelIdAll(id.get, useCache = true)
-  lazy val labelMetas = LabelMeta.findAllByLabelId(id.get, useCache = true)
-  lazy val labelMetaSet = labelMetas.toSet
-  lazy val labelMetaMap = (labelMetas ++ LabelMeta.reservedMetas).map(m => m.seq -> m).toMap
-
-  lazy val indicesMap = indices.map(idx => (idx.seq, idx)) toMap
-  lazy val indexSeqsMap = indices.map(idx => (idx.metaSeqs, idx)) toMap
-  lazy val indexNameMap = indices.map(idx => (idx.name, idx)) toMap
-  lazy val extraIndices = indices.filter(idx => defaultIndex.isDefined && idx.id.get != defaultIndex.get.id.get)
-  //      indices filterNot (_.id.get == defaultIndex.get.id.get)
-  lazy val extraIndicesMap = extraIndices.map(idx => (idx.seq, idx)) toMap
-
-  lazy val metaProps = LabelMeta.reservedMetas.map { m =>
-    if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
-    else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
-    else m
-  } ::: LabelMeta.findAllByLabelId(id.get, useCache = true)
-
-  lazy val metaPropsInner = LabelMeta.reservedMetasInner.map { m =>
-    if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
-    else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
-    else m
-  } ::: LabelMeta.findAllByLabelId(id.get, useCache = true)
-
-  lazy val metaPropsMap = metaProps.map(x => (x.seq, x)).toMap
-  lazy val metaPropsInvMap = metaProps.map(x => (x.name, x)).toMap
-  lazy val metaPropNames = metaProps.map(x => x.name)
-  lazy val metaPropNamesMap = metaProps.map(x => (x.seq, x.name)) toMap
-
-  /** this is used only by edgeToProps */
-  lazy val metaPropsDefaultMap = (for {
-    prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
-    jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
-  } yield prop.name -> jsValue).toMap
-
-  lazy val metaPropsDefaultMapInnerString = (for {
-    prop <- metaPropsInner if LabelMeta.isValidSeq(prop.seq)
-    innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
-  } yield prop.name -> innerVal).toMap
-
-  lazy val metaPropsDefaultMapInner = (for {
-    prop <- metaPropsInner
-    innerVal = InnerValLikeWithTs(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), System.currentTimeMillis())
-  } yield prop -> innerVal).toMap
-  lazy val metaPropsDefaultMapInnerSeq = metaPropsDefaultMapInner.toSeq
-  lazy val metaPropsJsValueWithDefault = (for {
-    prop <- metaProps if LabelMeta.isValidSeq(prop.seq)
-    jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType)
-  } yield prop -> jsValue).toMap
-//  lazy val extraOptions = Model.extraOptions(Option("""{
-//    "storage": {
-//      "s2graph.storage.backend": "rocks",
-//      "rocks.db.path": "/tmp/db"
-//    }
-//  }"""))
-
-  lazy val tokens: Set[String] = extraOptions.get("tokens").fold(Set.empty[String]) {
-    case JsArray(tokens) => tokens.map(_.as[String]).toSet
-    case _ =>
-      logger.error("Invalid token JSON")
-      Set.empty[String]
-  }
-
-  lazy val extraOptions = Model.extraOptions(options)
-
-  lazy val durability = extraOptions.get("durability").map(_.as[Boolean]).getOrElse(true)
-
-  lazy val storageConfigOpt: Option[Config] = toStorageConfig
-
-  def toStorageConfig: Option[Config] = {
-    Model.toStorageConfig(extraOptions)
-  }
-
-
-  def srcColumnWithDir(dir: Int) = {
-    // GraphUtil.directions("out"
-    if (dir == 0) srcColumn else tgtColumn
-  }
-
-  def tgtColumnWithDir(dir: Int) = {
-    // GraphUtil.directions("out"
-    if (dir == 0) tgtColumn else srcColumn
-  }
-
-  lazy val tgtSrc = (tgtColumn, srcColumn)
-  lazy val srcTgt = (srcColumn, tgtColumn)
-
-  def srcTgtColumn(dir: Int) = if (dir == 1) tgtSrc else srcTgt
-
-  lazy val EmptyPropsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(0, schemaVersion), 0))
-//  def init() = {
-//    metas()
-//    metaSeqsToNames()
-//    service
-//    srcColumn
-//    tgtColumn
-//    defaultIndex
-//    indices
-//    metaProps
-//  }
-
-  //  def srcColumnInnerVal(jsValue: JsValue) = {
-  //    jsValueToInnerVal(jsValue, srcColumnType, version)
-  //  }
-  //  def tgtColumnInnerVal(jsValue: JsValue) = {
-  //    jsValueToInnerVal(jsValue, tgtColumnType, version)
-  //  }
-
-  override def toString(): String = {
-    val orderByKeys = LabelMeta.findAllByLabelId(id.get)
-    super.toString() + orderByKeys.toString()
-  }
-
-  //  def findLabelIndexSeq(scoring: List[(Byte, Double)]): Byte = {
-  //    if (scoring.isEmpty) LabelIndex.defaultSeq
-  //    else {
-  //      LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
-  //
-  ////      LabelIndex.findByLabelIdAndSeqs(id.get, scoring.map(_._1).sorted).map(_.seq).getOrElse(LabelIndex.defaultSeq)
-  //    }
-  //  }
-  lazy val toJson = {
-    val allIdxs = LabelIndex.findByLabelIdAll(id.get, useCache = false)
-    val defaultIdxOpt = LabelIndex.findByLabelIdAndSeq(id.get, LabelIndex.DefaultSeq, useCache = false)
-    val extraIdxs = allIdxs.filter(idx => defaultIdxOpt.isDefined && idx.id.get != defaultIdxOpt.get.id.get)
-    val metaProps = LabelMeta.reservedMetas.map { m =>
-      if (m == LabelMeta.to) m.copy(dataType = tgtColumnType)
-      else if (m == LabelMeta.from) m.copy(dataType = srcColumnType)
-      else m
-    } ::: LabelMeta.findAllByLabelId(id.get, useCache = false)
-
-    val defaultIdx = defaultIdxOpt.map(x => x.toJson).getOrElse(Json.obj())
-    val optionsJs = try {
-      val obj = options.map(Json.parse).getOrElse(Json.obj()).as[JsObject]
-      if (!obj.value.contains("tokens")) obj
-      else obj ++ Json.obj("tokens" -> obj.value("tokens").as[Seq[String]].map("*" * _.length))
-
-    } catch { case e: Exception => Json.obj() }
-
-    Json.obj("labelName" -> label,
-      "from" -> srcColumn.toJson, "to" -> tgtColumn.toJson,
-      "isDirected" -> isDirected,
-      "serviceName" -> serviceName,
-      "consistencyLevel" -> consistencyLevel,
-      "schemaVersion" -> schemaVersion,
-      "isAsync" -> isAsync,
-      "compressionAlgorithm" -> compressionAlgorithm,
-      "defaultIndex" -> defaultIdx,
-      "extraIndex" -> extraIdxs.map(exIdx => exIdx.toJson),
-      "metaProps" -> metaProps.filter { labelMeta => LabelMeta.isValidSeqForAdmin(labelMeta.seq) }.map(_.toJson),
-      "options" -> optionsJs
-    )
-  }
-
-  def propsToInnerValsWithTs(props: Map[String, Any],
-                             ts: Long = System.currentTimeMillis()): Map[LabelMeta, InnerValLikeWithTs] = {
-    for {
-      (k, v) <- props
-      labelMeta <- metaPropsInvMap.get(k)
-      innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion)
-    } yield labelMeta -> InnerValLikeWithTs(innerVal, ts)
-
-  }
-
-  def innerValsWithTsToProps(props: Map[LabelMeta, InnerValLikeWithTs],
-                             selectColumns: Map[Byte, Boolean]): Map[String, Any] = {
-    if (selectColumns.isEmpty) {
-      for {
-        (meta, v) <- metaPropsDefaultMapInner ++ props
-      } yield {
-        meta.name -> innerValToAny(v.innerVal, meta.dataType)
-      }
-    } else {
-      for {
-        (k, _) <- selectColumns
-        if k != LabelMeta.toSeq && k != LabelMeta.fromSeq
-        labelMeta <- metaPropsMap.get(k)
-      } yield {
-        val v = props.get(labelMeta).orElse(metaPropsDefaultMapInner.get(labelMeta)).get
-        labelMeta.name -> innerValToAny(v.innerVal, labelMeta.dataType)
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
deleted file mode 100644
index 1da0e55..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import org.apache.s2graph.core.GraphUtil
-import org.apache.s2graph.core.mysqls.LabelIndex.LabelIndexMutateOption
-import org.apache.s2graph.core.utils.logger
-import play.api.libs.json.{JsObject, JsString, Json}
-import scalikejdbc._
-
-object LabelIndex extends Model[LabelIndex] {
-  val DefaultName = "_PK"
-  val DefaultMetaSeqs = Seq(LabelMeta.timestampSeq)
-  val DefaultSeq = 1.toByte
-  val MaxOrderSeq = 7
-
-  def apply(rs: WrappedResultSet): LabelIndex = {
-    LabelIndex(rs.intOpt("id"), rs.int("label_id"), rs.string("name"), rs.byte("seq"),
-      rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match {
-        case metaSeqsList => metaSeqsList
-      },
-      rs.string("formulars"),
-      rs.intOpt("dir"),
-      rs.stringOpt("options")
-    )
-  }
-
-  case class LabelIndexMutateOption(dir: Byte,
-                                    method: String,
-                                    rate: Double,
-                                    totalModular: Long,
-                                    storeDegree: Boolean) {
-
-    val isBufferIncrement = method == "drop" || method == "sample" || method == "hash_sample"
-
-    def sample[T](a: T, hashOpt: Option[Long]): Boolean = {
-      if (method == "drop") false
-      else if (method == "sample") {
-        if (scala.util.Random.nextDouble() < rate) true
-        else false
-      } else if (method == "hash_sample") {
-        val hash = hashOpt.getOrElse(throw new RuntimeException("hash_sample need _from_hash value"))
-        if ((hash.abs % totalModular) / totalModular.toDouble < rate) true
-        else false
-      } else true
-    }
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "id=" + id
-    withCache(cacheKey) {
-      sql"""select * from label_indices where id = ${id}""".map { rs => LabelIndex(rs) }.single.apply
-    }.get
-  }
-
-  def findByLabelIdAll(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "labelId=" + labelId
-    if (useCache) {
-      withCaches(cacheKey)( sql"""
-        select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC
-      """.map { rs => LabelIndex(rs) }.list.apply)
-    } else {
-      sql"""
-        select * from label_indices where label_id = ${labelId} and seq > 0 order by seq ASC
-      """.map { rs => LabelIndex(rs) }.list.apply
-    }
-  }
-
-  def insert(labelId: Int, indexName: String, seq: Byte, metaSeqs: List[Byte], formulars: String,
-             direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): Long = {
-    sql"""
-    	insert into label_indices(label_id, name, seq, meta_seqs, formulars, dir, options)
-    	values (${labelId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${formulars}, ${direction}, ${options})
-    """
-      .updateAndReturnGeneratedKey.apply()
-  }
-
-  def findOrInsert(labelId: Int, indexName: String, metaSeqs: List[Byte], formulars: String,
-                   direction: Option[Int], options: Option[String])(implicit session: DBSession = AutoSession): LabelIndex = {
-    findByLabelIdAndSeqs(labelId, metaSeqs, direction) match {
-      case Some(s) => s
-      case None =>
-        val orders = findByLabelIdAll(labelId, false)
-        val seq = (orders.size + 1).toByte
-        assert(seq <= MaxOrderSeq)
-        val createdId = insert(labelId, indexName, seq, metaSeqs, formulars, direction, options)
-        val cacheKeys = List(s"labelId=$labelId:seq=$seq",
-          s"labelId=$labelId:seqs=$metaSeqs:dir=$direction", s"labelId=$labelId:seq=$seq", s"id=$createdId")
-
-        cacheKeys.foreach { key =>
-          expireCache(key)
-          expireCaches(key)
-        }
-
-        findByLabelIdAndSeq(labelId, seq).get
-    }
-  }
-
-  def findByLabelIdAndSeqs(labelId: Int, seqs: List[Byte], direction: Option[Int])(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
-    val cacheKey = "labelId=" + labelId + ":seqs=" + seqs.mkString(",") + ":dir=" + direction
-    withCache(cacheKey) {
-      sql"""
-      select * from label_indices where label_id = ${labelId} and meta_seqs = ${seqs.mkString(",")} and dir = ${direction}
-      """.map { rs => LabelIndex(rs) }.single.apply
-    }
-  }
-
-  def findByLabelIdAndSeq(labelId: Int, seq: Byte, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelIndex] = {
-    //    val cacheKey = s"labelId=$labelId:seq=$seq"
-    val cacheKey = "labelId=" + labelId + ":seq=" + seq
-    if (useCache) {
-      withCache(cacheKey)( sql"""
-      select * from label_indices where label_id = ${labelId} and seq = ${seq}
-      """.map { rs => LabelIndex(rs) }.single.apply)
-    } else {
-      sql"""
-      select * from label_indices where label_id = ${labelId} and seq = ${seq}
-      """.map { rs => LabelIndex(rs) }.single.apply
-    }
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val labelIndex = findById(id)
-    val seqs = labelIndex.metaSeqs.mkString(",")
-    val (labelId, seq) = (labelIndex.labelId, labelIndex.seq)
-    sql"""delete from label_indices where id = ${id}""".execute.apply()
-
-    val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:seq=$seq", s"labelId=$labelId:seqs=$seqs:dir=${labelIndex.dir}")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from label_indices""".map { rs => LabelIndex(rs) }.list.apply
-    putsToCache(ls.map { x =>
-      var cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      var cacheKey = s"labelId=${x.labelId}:seq=${x.seq}}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      var cacheKey = s"labelId=${x.labelId}:seqs=${x.metaSeqs.mkString(",")}:dir=${x.dir}"
-      (cacheKey -> x)
-    })
-    putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
-      val cacheKey = s"labelId=${labelId}"
-      (cacheKey -> ls)
-    }.toList)
-  }
-}
-/**
-mgmt.buildIndex('nameAndAge',Vertex.class)
-.addKey(name,Mapping.TEXT.getParameter())
-.addKey(age,Mapping.TEXT.getParameter())
-.buildMixedIndex("search")
-
-v: {name: abc} - E1: {age: 20}, E2, E3....
-
-Management.createServiceColumn(
-		serviceName = serviceName, columnName = "person", columnType = "integer",
-    props = Seq(
-    	Prop("name", "-", "string"),
-    	Prop("age", "0", "integer"),
-    	Prop("location", "-", "string")
-    )
-)
-
-management.createLabel(
-		label = "bought",
-    srcServiceName = serviceName, srcColumnName = "person", srcColumnType = "integer",
-    tgtServiceName = serviceName, tgtColumnName = "product", tgtColumnType = "integer", idDirected = true,
-    serviceName = serviceName,
-    indices = Seq(
-    	Index("PK", Seq("amount", "created_at"), IndexType("mixed", propsMapping: Map[String, String]),
-{"in": {}, "out": {}})
-    ),
-    props = Seq(
-    	Prop("amount", "0.0", "double"),
-    	Prop("created_at", "2000-01-01", "string")
-    ),
-    consistencyLevel = "strong"
-)
-
-mgmt.buildIndex('PK', Edge.class)
-  .addKey(amount, Double)
-  .buildCompositeIndex
-
-*/
-case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, metaSeqs: Seq[Byte], formulars: String,
-                      dir: Option[Int], options: Option[String]) {
-  // both
-  lazy val label = Label.findById(labelId)
-  lazy val metas = label.metaPropsMap
-  lazy val sortKeyTypes = metaSeqs.flatMap(metaSeq => label.metaPropsMap.get(metaSeq))
-  lazy val sortKeyTypesArray = sortKeyTypes.toArray
-  lazy val propNames = sortKeyTypes.map { labelMeta => labelMeta.name }
-
-  lazy val toJson = {
-    val dirJs = dir.map(GraphUtil.fromDirection).getOrElse("both")
-    val optionsJs = try { options.map(Json.parse).getOrElse(Json.obj()) } catch { case e: Exception => Json.obj() }
-
-    Json.obj(
-      "name" -> name,
-      "propNames" -> sortKeyTypes.map(x => x.name),
-      "dir" -> dirJs,
-      "options" -> optionsJs
-    )
-  }
-
-  def parseOption(dir: String): Option[LabelIndexMutateOption] = try {
-    options.map { string =>
-      val jsObj = Json.parse(string) \ dir
-
-      val method = (jsObj \ "method").asOpt[String].getOrElse("default")
-      val rate = (jsObj \ "rate").asOpt[Double].getOrElse(1.0)
-      val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L)
-      val storeDegree = (jsObj \ "storeDegree").asOpt[Boolean].getOrElse(true)
-
-      LabelIndexMutateOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree)
-    }
-  } catch {
-    case e: Exception =>
-      logger.error(s"Parse failed labelOption: ${this.label}", e)
-      None
-  }
-
-  lazy val inDirOption = parseOption("in")
-
-  lazy val outDirOption = parseOption("out")
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
deleted file mode 100644
index 3f54f49..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-/**
- * Created by shon on 6/3/15.
- */
-
-import org.apache.s2graph.core.GraphExceptions.MaxPropSizeReachedException
-import org.apache.s2graph.core.{GraphExceptions, JSONParser}
-import play.api.libs.json.Json
-import scalikejdbc._
-
-import scala.util.Try
-
-object LabelMeta extends Model[LabelMeta] {
-
-  /** dummy sequences */
-
-  val fromSeq = (-4).toByte
-  val toSeq = (-5).toByte
-  val lastOpSeq = (-3).toByte
-  val lastDeletedAtSeq = (-2).toByte
-  val timestampSeq = (0).toByte
-  val labelSeq = (-6).toByte
-  val directionSeq = -7.toByte
-  val fromHashSeq = -8.toByte
-
-  val countSeq = (Byte.MaxValue - 2).toByte
-  val degreeSeq = (Byte.MaxValue - 1).toByte
-  val maxValue = Byte.MaxValue
-  val emptySeq = Byte.MaxValue
-
-  /** reserved sequences */
-  //  val deleted = LabelMeta(id = Some(lastDeletedAt), labelId = lastDeletedAt, name = "lastDeletedAt",
-  //    seq = lastDeletedAt, defaultValue = "", dataType = "long")
-  val fromHash = LabelMeta(id = None, labelId = fromHashSeq, name = "_from_hash",
-    seq = fromHashSeq, defaultValue = fromHashSeq.toString, dataType = "long")
-  val from = LabelMeta(id = Some(fromSeq), labelId = fromSeq, name = "_from",
-    seq = fromSeq, defaultValue = fromSeq.toString, dataType = "string")
-  val to = LabelMeta(id = Some(toSeq), labelId = toSeq, name = "_to",
-    seq = toSeq, defaultValue = toSeq.toString, dataType = "string")
-  val timestamp = LabelMeta(id = Some(-1), labelId = -1, name = "_timestamp",
-    seq = timestampSeq, defaultValue = "0", dataType = "long")
-  val degree = LabelMeta(id = Some(-1), labelId = -1, name = "_degree",
-    seq = degreeSeq, defaultValue = "0", dataType = "long")
-  val count = LabelMeta(id = Some(-1), labelId = -1, name = "_count",
-    seq = countSeq, defaultValue = "-1", dataType = "long")
-  val lastDeletedAt = LabelMeta(id = Some(-1), labelId = -1, name = "_lastDeletedAt",
-    seq = lastDeletedAtSeq, defaultValue = "-1", dataType = "long")
-  val label = LabelMeta(id = Some(-1), labelId = -1, name = "label",
-    seq = labelSeq, defaultValue = "", dataType = "string")
-  val direction = LabelMeta(id = Some(-1), labelId = -1, name = "direction",
-    seq = directionSeq, defaultValue = "out", dataType = "string")
-  val empty = LabelMeta(id = Some(-1), labelId = -1, name = "_empty",
-    seq = emptySeq, defaultValue = "-1", dataType = "long")
-
-  // Each reserved column(_timestamp, timestamp) has same seq number, starts with '_' has high priority
-  val reservedMetas = List(empty, label, direction, lastDeletedAt, from, fromHash, to, degree, timestamp, count).flatMap { lm => List(lm, lm.copy(name = lm.name.drop(1))) }.reverse
-  val reservedMetasInner = List(empty, label, direction, lastDeletedAt, from, fromHash, to, degree, timestamp, count)
-  val reservedMetaNamesSet = reservedMetasInner.map(_.name).toSet
-
-  val defaultRequiredMetaNames = Set("from", "_from", "to", "_to", "_from_hash", "label", "direction", "timestamp", "_timestamp")
-
-  def apply(rs: WrappedResultSet): LabelMeta = {
-    LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), rs.byte("seq"),
-      rs.string("default_value"), rs.string("data_type").toLowerCase, rs.boolean("store_in_global_index"))
-  }
-
-  /** Note: DegreeSeq should not be included in serializer/deserializer.
-    * only 0 <= seq <= CountSeq(Int.MaxValue - 2), not DegreeSet(Int.MaxValue - 1) should be
-    * included in actual bytes in storage.
-    * */
-  def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq // || seq == fromHashSeq
-
-  def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq // || seq == fromHashSeq
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta = {
-    val cacheKey = "id=" + id
-
-    withCache(cacheKey) {
-      sql"""select * from label_metas where id = ${id}""".map { rs => LabelMeta(rs) }.single.apply
-    }.get
-  }
-
-  def findAllByLabelId(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[LabelMeta] = {
-    val cacheKey = "labelId=" + labelId
-    lazy val labelMetas = sql"""select *
-    		  						from label_metas
-    		  						where label_id = ${labelId} order by seq ASC""".map(LabelMeta(_)).list.apply()
-
-    if (useCache) withCaches(cacheKey)(labelMetas)
-    else labelMetas
-  }
-
-  def findByName(labelId: Int, name: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelMeta] = {
-    name match {
-      case timestamp.name => Some(timestamp)
-      case from.name => Some(from)
-      case to.name => Some(to)
-      case _ =>
-        val cacheKey = "labelId=" + labelId + ":name=" + name
-        lazy val labelMeta = sql"""
-            select *
-            from label_metas where label_id = ${labelId} and name = ${name}"""
-          .map { rs => LabelMeta(rs) }.single.apply()
-
-        if (useCache) withCache(cacheKey)(labelMeta)
-        else labelMeta
-    }
-  }
-
-  def insert(labelId: Int, name: String, defaultValue: String, dataType: String, storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession) = {
-    val ls = findAllByLabelId(labelId, useCache = false)
-    val seq = ls.size + 1
-
-    if (seq < maxValue) {
-      sql"""insert into label_metas(label_id, name, seq, default_value, data_type, store_in_global_index)
-    select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}, ${storeInGlobalIndex}""".updateAndReturnGeneratedKey.apply()
-    } else {
-      throw MaxPropSizeReachedException("max property size reached")
-    }
-  }
-
-  def findOrInsert(labelId: Int,
-                   name: String,
-                   defaultValue: String,
-                   dataType: String,
-                   storeInGlobalIndex: Boolean = false)(implicit session: DBSession = AutoSession): LabelMeta = {
-
-    findByName(labelId, name) match {
-      case Some(c) => c
-      case None =>
-        insert(labelId, name, defaultValue, dataType, storeInGlobalIndex)
-        val cacheKey = "labelId=" + labelId + ":name=" + name
-        val cacheKeys = "labelId=" + labelId
-        expireCache(cacheKey)
-        expireCaches(cacheKeys)
-        findByName(labelId, name, useCache = false).get
-    }
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val labelMeta = findById(id)
-    val (labelId, name) = (labelMeta.labelId, labelMeta.name)
-    sql"""delete from label_metas where id = ${id}""".execute.apply()
-    val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:name=$name")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from label_metas""".map { rs => LabelMeta(rs) }.list.apply
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      cacheKey -> x
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"labelId=${x.labelId}:name=${x.name}"
-      cacheKey -> x
-    })
-    putsToCache(ls.map { x =>
-      val cacheKey = s"labelId=${x.labelId}:seq=${x.seq}"
-      cacheKey -> x
-    })
-
-    putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) =>
-      val cacheKey = s"labelId=${labelId}"
-      cacheKey -> ls
-    }.toList)
-
-    ls
-  }
-
-  def updateStoreInGlobalIndex(id: Int, storeInGlobalIndex: Boolean)(implicit session: DBSession = AutoSession): Try[Long] = Try {
-    sql"""
-          update label_metas set store_in_global_index = ${storeInGlobalIndex} where id = ${id}
-       """.updateAndReturnGeneratedKey.apply()
-  }
-}
-
-case class LabelMeta(id: Option[Int],
-                     labelId: Int,
-                     name: String,
-                     seq: Byte,
-                     defaultValue: String,
-                     dataType: String,
-                     storeInGlobalIndex: Boolean = false) {
-
-  lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType, "storeInGlobalIndex" -> storeInGlobalIndex)
-
-  override def equals(other: Any): Boolean = {
-    if (!other.isInstanceOf[LabelMeta]) false
-    else {
-      val o = other.asInstanceOf[LabelMeta]
-//      labelId == o.labelId &&
-        seq == o.seq
-    }
-  }
-  override def hashCode(): Int = seq.toInt
-//    (labelId, seq).hashCode()
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
deleted file mode 100644
index e21072e..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import java.util.concurrent.Executors
-import java.util.concurrent.atomic.AtomicLong
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.s2graph.core.JSONParser
-import org.apache.s2graph.core.utils.{SafeUpdateCache, logger}
-import play.api.libs.json.{JsObject, JsValue, Json}
-import scalikejdbc._
-
-import scala.concurrent.ExecutionContext
-import scala.io.Source
-import scala.language.{higherKinds, implicitConversions}
-import scala.util.{Failure, Success, Try}
-
-object Model {
-  var maxSize = 10000
-  var ttl = 60
-  val numOfThread = Runtime.getRuntime.availableProcessors()
-  val threadPool = Executors.newFixedThreadPool(numOfThread)
-  val ec = ExecutionContext.fromExecutor(threadPool)
-  val useUTF8Encoding = "?useUnicode=true&characterEncoding=utf8"
-
-  private val ModelReferenceCount = new AtomicLong(0L)
-
-  def apply(config: Config) = {
-    maxSize = config.getInt("cache.max.size")
-    ttl = config.getInt("cache.ttl.seconds")
-    Class.forName(config.getString("db.default.driver"))
-
-    val settings = ConnectionPoolSettings(
-      initialSize = 10,
-      maxSize = 10,
-      connectionTimeoutMillis = 30000L,
-      validationQuery = "select 1;")
-
-    ConnectionPool.singleton(
-      config.getString("db.default.url"),
-      config.getString("db.default.user"),
-      config.getString("db.default.password"),
-      settings)
-
-    checkSchema()
-
-    ModelReferenceCount.incrementAndGet()
-  }
-
-  def checkSchema(): Unit = {
-    withTx { implicit session =>
-      sql"""show tables""".map(rs => rs.string(1)).list.apply()
-    } match {
-      case Success(tables) =>
-        if (tables.isEmpty) {
-          // this is a very simple migration tool that only supports creating
-          // appropriate tables when there are no tables in the database at all.
-          // Ideally, it should be improved to a sophisticated migration tool
-          // that supports versioning, etc.
-          logger.info("Creating tables ...")
-          val schema = getClass.getResourceAsStream("schema.sql")
-          val lines = Source.fromInputStream(schema, "UTF-8").getLines
-          val sources = lines.map(_.split("-- ").head.trim).mkString("\n")
-          val statements = sources.split(";\n")
-          withTx { implicit session =>
-            statements.foreach(sql => session.execute(sql))
-          } match {
-            case Success(_) =>
-              logger.info("Successfully imported schema")
-            case Failure(e) =>
-              throw new RuntimeException("Error while importing schema", e)
-          }
-        }
-      case Failure(e) =>
-        throw new RuntimeException("Could not list tables in the database", e)
-    }
-  }
-
-  def withTx[T](block: DBSession => T): Try[T] = {
-    using(DB(ConnectionPool.borrow())) { conn =>
-      Try {
-        conn.begin()
-        val session = conn.withinTxSession()
-        val result = block(session)
-
-        conn.commit()
-
-        result
-      } recoverWith {
-        case e: Exception =>
-          conn.rollbackIfActive()
-          Failure(e)
-      }
-    }
-  }
-
-  def shutdown(modelDataDelete: Boolean = false) =
-    if (ModelReferenceCount.decrementAndGet() <= 0) {
-      // FIXME: When Model is served by embedded database and deleteData is set, Model deletes
-      // the underlying database. Its purpose is clearing runtime footprint when running tests.
-      if (modelDataDelete) {
-        withTx { implicit session =>
-          sql"SHOW TABLES"
-              .map(rs => rs.string(1))
-              .list
-              .apply()
-              .map { table => s"TRUNCATE TABLE $table" }
-        } match {
-          case Success(stmts) =>
-            val newStmts = List("SET FOREIGN_KEY_CHECKS = 0") ++ stmts ++ List("SET FOREIGN_KEY_CHECKS = 1")
-            withTx { implicit session =>
-              newStmts.foreach { stmt =>
-                session.execute(stmt)
-              }
-            } match {
-              case Success(_) =>
-                logger.info(s"Success to truncate models: $stmts")
-              case Failure(e) =>
-                throw new IllegalStateException(s"Failed to truncate models", e)
-            }
-          case Failure(e) =>
-            throw new IllegalStateException(s"Failed to list models", e)
-        }
-      }
-      clearCache()
-      ConnectionPool.closeAll()
-    }
-
-  def loadCache() = {
-    Service.findAll()
-    ServiceColumn.findAll()
-    Label.findAll()
-    LabelMeta.findAll()
-    LabelIndex.findAll()
-    ColumnMeta.findAll()
-  }
-
-  def clearCache() = {
-    Service.expireAll()
-    ServiceColumn.expireAll()
-    Label.expireAll()
-    LabelMeta.expireAll()
-    LabelIndex.expireAll()
-    ColumnMeta.expireAll()
-  }
-
-  def extraOptions(options: Option[String]): Map[String, JsValue] = options match {
-    case None => Map.empty
-    case Some(v) =>
-      try {
-        Json.parse(v).asOpt[JsObject].map { obj => obj.fields.toMap }.getOrElse(Map.empty)
-      } catch {
-        case e: Exception =>
-          logger.error(s"An error occurs while parsing the extra label option", e)
-          Map.empty
-      }
-  }
-
-  def toStorageConfig(options: Map[String, JsValue]): Option[Config] = {
-    try {
-      options.get("storage").map { jsValue =>
-        import scala.collection.JavaConverters._
-        val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, value) =>
-          key -> JSONParser.jsValueToAny(value).getOrElse(throw new RuntimeException("!!"))
-        }
-        ConfigFactory.parseMap(configMap.asJava)
-      }
-    } catch {
-      case e: Exception =>
-        logger.error(s"toStorageConfig error. use default storage", e)
-        None
-    }
-  }
-}
-
-trait Model[V] extends SQLSyntaxSupport[V] {
-
-  import Model._
-
-  implicit val ec: ExecutionContext = Model.ec
-
-  val cName = this.getClass.getSimpleName()
-  logger.info(s"LocalCache[$cName]: TTL[$ttl], MaxSize[$maxSize]")
-
-  val optionCache = new SafeUpdateCache[Option[V]](cName, maxSize, ttl)
-  val listCache = new SafeUpdateCache[List[V]](cName, maxSize, ttl)
-
-  val withCache = optionCache.withCache _
-
-  val withCaches = listCache.withCache _
-
-  val expireCache = optionCache.invalidate _
-
-  val expireCaches = listCache.invalidate _
-
-  def expireAll() = {
-    listCache.invalidateAll()
-    optionCache.invalidateAll()
-  }
-
-  def putsToCache(kvs: List[(String, V)]) = kvs.foreach {
-    case (key, value) => optionCache.put(key, Option(value))
-  }
-
-  def putsToCaches(kvs: List[(String, List[V])]) = kvs.foreach {
-    case (key, values) => listCache.put(key, values)
-  }
-
-  def getAllCacheData() : (List[(String, Option[_])], List[(String, List[_])]) = {
-    (optionCache.getAllData(), listCache.getAllData())
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
deleted file mode 100644
index 5b4f494..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import java.util.UUID
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.utils.logger
-import play.api.libs.json.Json
-import scalikejdbc._
-
-object Service extends Model[Service] {
-  def valueOf(rs: WrappedResultSet): Service = {
-    Service(rs.intOpt("id"), rs.string("service_name"), rs.string("access_token"),
-      rs.string("cluster"), rs.string("hbase_table_name"), rs.int("pre_split_size"), rs.intOpt("hbase_table_ttl"))
-  }
-
-  def findByAccessToken(accessToken: String)(implicit session: DBSession = AutoSession): Option[Service] = {
-    val cacheKey = s"accessToken=$accessToken"
-    withCache(cacheKey)( sql"""select * from services where access_token = ${accessToken}""".map { rs => Service.valueOf(rs) }.single.apply)
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession): Service = {
-    val cacheKey = "id=" + id
-    withCache(cacheKey)( sql"""select * from services where id = ${id}""".map { rs => Service.valueOf(rs) }.single.apply).get
-  }
-
-  def findByName(serviceName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Service] = {
-    val cacheKey = "serviceName=" + serviceName
-    lazy val serviceOpt = sql"""
-        select * from services where service_name = ${serviceName}
-      """.map { rs => Service.valueOf(rs) }.single.apply()
-
-    if (useCache) withCache(cacheKey)(serviceOpt)
-    else serviceOpt
-  }
-
-  def insert(serviceName: String, cluster: String,
-             hTableName: String, preSplitSize: Int, hTableTTL: Option[Int],
-             compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Unit = {
-    logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, $hTableTTL, $compressionAlgorithm")
-    val accessToken = UUID.randomUUID().toString()
-    sql"""insert into services(service_name, access_token, cluster, hbase_table_name, pre_split_size, hbase_table_ttl)
-    values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, ${preSplitSize}, ${hTableTTL})""".execute.apply()
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val service = findById(id)
-    val serviceName = service.serviceName
-    sql"""delete from service_columns where id = ${id}""".execute.apply()
-    val cacheKeys = List(s"id=$id", s"serviceName=$serviceName")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-
-  def findOrInsert(serviceName: String, cluster: String, hTableName: String,
-                   preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Service = {
-    findByName(serviceName, useCache) match {
-      case Some(s) => s
-      case None =>
-        insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
-        val cacheKey = "serviceName=" + serviceName
-        expireCache(cacheKey)
-        findByName(serviceName).get
-    }
-  }
-
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from services""".map { rs => Service.valueOf(rs) }.list.apply
-    putsToCache(ls.map { x =>
-      val cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
-    })
-
-    putsToCache(ls.map { x =>
-      val cacheKey = s"serviceName=${x.serviceName}"
-      (cacheKey -> x)
-    })
-
-    ls
-  }
-
-  def findAllConn()(implicit session: DBSession = AutoSession): List[String] = {
-    sql"""select distinct(cluster) from services""".map { rs => rs.string("cluster") }.list.apply
-  }
-}
-
-case class Service(id: Option[Int],
-                   serviceName: String,
-                   accessToken: String,
-                   cluster: String,
-                   hTableName: String,
-                   preSplitSize: Int,
-                   hTableTTL: Option[Int],
-                   options: Option[String] = None) {
-  lazy val toJson =
-    id match {
-      case Some(_id) =>
-        Json.obj("id" -> _id, "name" -> serviceName, "accessToken" -> accessToken, "cluster" -> cluster,
-          "hTableName" -> hTableName, "preSplitSize" -> preSplitSize, "hTableTTL" -> hTableTTL)
-      case None =>
-        Json.parse("{}")
-    }
-
-  lazy val extraOptions = Model.extraOptions(options)
-  lazy val storageConfigOpt: Option[Config] = toStorageConfig
-  def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = useCache)
-  def toStorageConfig: Option[Config] = Model.toStorageConfig(extraOptions)
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
deleted file mode 100644
index e8bec06..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import org.apache.s2graph.core.JSONParser
-import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.types.{HBaseType, InnerValLike, InnerValLikeWithTs}
-import play.api.libs.json.Json
-import scalikejdbc._
-
-object ServiceColumn extends Model[ServiceColumn] {
-  val Default = ServiceColumn(Option(0), -1, "default", "string", "v4")
-
-  def valueOf(rs: WrappedResultSet): ServiceColumn = {
-    ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version"))
-  }
-
-  def findByServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[ServiceColumn] = {
-    val cacheKey = "serviceId=" + serviceId
-
-    lazy val sql = sql"""select * from service_columns where service_id = ${serviceId}""".map { x => ServiceColumn.valueOf(x) }.list().apply()
-
-    if (useCache) withCaches(cacheKey)(sql)
-    else sql
-  }
-
-  def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
-    val cacheKey = "id=" + id
-
-    if (useCache) {
-      withCache(cacheKey)(sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply).get
-    } else {
-      sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn.valueOf(x) }.single.apply.get
-    }
-  }
-
-  def find(serviceId: Int, columnName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = {
-//    val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
-    val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
-    if (useCache) {
-      withCache(cacheKey) {
-        sql"""
-          select * from service_columns where service_id = ${serviceId} and column_name = ${columnName}
-        """.map { rs => ServiceColumn.valueOf(rs) }.single.apply()
-      }
-    } else {
-      sql"""
-        select * from service_columns where service_id = ${serviceId} and column_name = ${columnName}
-      """.map { rs => ServiceColumn.valueOf(rs) }.single.apply()
-    }
-  }
-  def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String)(implicit session: DBSession = AutoSession) = {
-    sql"""insert into service_columns(service_id, column_name, column_type, schema_version)
-         values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion})""".execute.apply()
-  }
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val serviceColumn = findById(id, useCache = false)
-    val (serviceId, columnName) = (serviceColumn.serviceId, serviceColumn.columnName)
-    sql"""delete from service_columns where id = ${id}""".execute.apply()
-    val cacheKeys = List(s"id=$id", s"serviceId=$serviceId:columnName=$columnName")
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-  def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
-    find(serviceId, columnName, useCache) match {
-      case Some(sc) => sc
-      case None =>
-        insert(serviceId, columnName, columnType, schemaVersion)
-//        val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
-        val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
-        expireCache(cacheKey)
-        find(serviceId, columnName).get
-    }
-  }
-  def findAll()(implicit session: DBSession = AutoSession) = {
-    val ls = sql"""select * from service_columns""".map { rs => ServiceColumn.valueOf(rs) }.list.apply
-    putsToCache(ls.map { x =>
-      var cacheKey = s"id=${x.id.get}"
-      (cacheKey -> x)
-    })
-    putsToCache(ls.map { x =>
-      var cacheKey = s"serviceId=${x.serviceId}:columnName=${x.columnName}"
-      (cacheKey -> x)
-    })
-    ls
-  }
-}
-case class ServiceColumn(id: Option[Int],
-                         serviceId: Int,
-                         columnName: String,
-                         columnType: String,
-                         schemaVersion: String)  {
-
-  lazy val service = Service.findById(serviceId)
-  lazy val metasWithoutCache = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get, false) :+ ColumnMeta.lastModifiedAtColumn
-  lazy val metas = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get) :+ ColumnMeta.lastModifiedAtColumn
-  lazy val metasMap = metas.map { meta => meta.seq.toInt -> meta } toMap
-  lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap
-  lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap
-  lazy val metaPropsDefaultMap = metas.map { meta =>
-    meta -> JSONParser.toInnerVal(meta.defaultValue, meta.dataType, schemaVersion)
-  }.toMap
-  lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType)
-
-  def propsToInnerVals(props: Map[String, Any]): Map[ColumnMeta, InnerValLike] = {
-    val ret = for {
-      (k, v) <- props
-      labelMeta <- metasInvMap.get(k)
-      innerVal = toInnerVal(v, labelMeta.dataType, schemaVersion)
-    } yield labelMeta -> innerVal
-
-    ret
-  }
-
-  def innerValsToProps(props: Map[Int, InnerValLike]): Map[String, Any] = {
-    for {
-      (k, v) <- props
-      columnMeta <- metasMap.get(k)
-    } yield {
-      columnMeta.name -> v.value
-    }
-  }
-
-  def innerValsWithTsToProps(props: Map[Int, InnerValLikeWithTs]): Map[String, Any] = {
-    for {
-      (k, v) <- props
-      columnMeta <- metasMap.get(k)
-    } yield {
-      columnMeta.name -> v.innerVal.value
-    }
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala
deleted file mode 100644
index 38e1761..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumnIndex.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.mysqls
-
-import scalikejdbc.{AutoSession, DBSession, WrappedResultSet, _}
-
-object ServiceColumnIndex extends Model[ServiceColumnIndex] {
-  val dbTableName = "service_column_indices"
-  val DefaultName = "_PK"
-  val DefaultSeq = 1.toByte
-  val MaxOrderSeq = 7
-
-  def apply(rs: WrappedResultSet): ServiceColumnIndex = {
-    ServiceColumnIndex(rs.intOpt("id"), rs.int("service_id"), rs.int("service_column_id"),
-      rs.string("name"),
-      rs.byte("seq"), rs.string("meta_seqs").split(",").filter(_ != "").map(s => s.toByte).toList match {
-        case metaSeqsList => metaSeqsList
-      },
-      rs.stringOpt("options")
-    )
-  }
-
-  def findById(id: Int)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "id=" + id
-    lazy val sql = sql"""select * from $dbTableName where id = ${id}"""
-    withCache(cacheKey) {
-      sql.map { rs => ServiceColumnIndex(rs) }.single.apply
-    }.get
-  }
-
-  def findBySeqs(serviceId: Int, serviceColumnId: Int, seqs: List[Byte])(implicit session: DBSession = AutoSession): Option[ServiceColumnIndex] = {
-    val cacheKey = "serviceId=" + serviceId + ":serviceColumnId=" + serviceColumnId + ":seqs=" + seqs.mkString(",")
-    lazy val sql =
-      sql"""
-      select * from $dbTableName where service_id = $serviceId and service_column_id = $serviceColumnId and meta_seqs = ${seqs.mkString(",")}
-      """
-    withCache(cacheKey) {
-      sql.map { rs => ServiceColumnIndex(rs) }.single.apply
-    }
-  }
-
-  def findBySeq(serviceId: Int,
-                serviceColumnId: Int,
-                seq: Byte,
-                useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = "serviceId=" + serviceId + ":serviceColumnId=" + serviceColumnId + ":seq=" + seq
-    lazy val sql =
-      sql"""
-      select * from $dbTableName where service_id = $serviceId and service_column_id = $serviceColumnId and seq = ${seq}
-      """
-    if (useCache) {
-      withCache(cacheKey)(sql.map { rs => ServiceColumnIndex(rs) }.single.apply)
-    } else {
-      sql.map { rs => ServiceColumnIndex(rs) }.single.apply
-    }
-  }
-
-
-  def findAll(serviceId: Int, serviceColumnId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession) = {
-    val cacheKey = s"serviceId=$serviceId:serviceColumnId=$serviceColumnId"
-    lazy val sql =
-      sql"""
-          select * from $dbTableName where service_id = ${serviceId} and seq > 0 order by seq ASC
-        """
-    if (useCache) {
-      withCaches(cacheKey)(
-        sql.map { rs => ServiceColumnIndex(rs) }.list.apply
-      )
-    } else {
-      sql.map { rs => LabelIndex(rs) }.list.apply
-    }
-  }
-
-  def insert(serviceId: Int,
-             serviceColumnId: Int,
-             indexName: String,
-             seq: Byte, metaSeqs: List[Byte], options: Option[String])(implicit session: DBSession = AutoSession): Long = {
-    sql"""
-    	insert into $dbTableName(service_id, service_column_id, name, seq, meta_seqs, options)
-    	values (${serviceId}, ${serviceColumnId}, ${indexName}, ${seq}, ${metaSeqs.mkString(",")}, ${options})
-    """
-      .updateAndReturnGeneratedKey.apply()
-  }
-
-  def findOrInsert(serviceId: Int,
-                   serviceColumnId: Int,
-                   indexName: String,
-                   metaSeqs: List[Byte],
-                   options: Option[String])(implicit session: DBSession = AutoSession): ServiceColumnIndex = {
-    findBySeqs(serviceId, serviceColumnId, metaSeqs) match {
-      case Some(s) => s
-      case None =>
-        val orders = findAll(serviceId, serviceColumnId, false)
-        val seq = (orders.size + 1).toByte
-        assert(seq <= MaxOrderSeq)
-        val createdId = insert(serviceId, serviceColumnId, indexName, seq, metaSeqs, options)
-        val cacheKeys = toCacheKeys(createdId.toInt, serviceId, serviceColumnId, seq, metaSeqs)
-
-        cacheKeys.foreach { key =>
-          expireCache(key)
-          expireCaches(key)
-        }
-        findBySeq(serviceId, serviceColumnId, seq).get
-    }
-  }
-
-  def toCacheKeys(id: Int, serviceId: Int, serviceColumnId: Int, seq: Byte, seqs: Seq[Byte]): Seq[String] = {
-    Seq(s"id=$id",
-      s"serviceId=$serviceId:serviceColumnId=$serviceColumnId:seq=$seq",
-      s"serviceId=$serviceId:serviceColumnId=$serviceColumnId:seqs=$seqs",
-      s"serviceId=$serviceId:serviceColumnId=$serviceColumnId")
-  }
-
-  def delete(id: Int)(implicit session: DBSession = AutoSession) = {
-    val me = findById(id)
-    val seqs = me.metaSeqs.mkString(",")
-    val (serviceId, serviceColumnId, seq) = (me.serviceId, me.serviceColumnId, me.seq)
-    lazy val sql = sql"""delete from $dbTableName where id = ${id}"""
-
-    sql.execute.apply()
-
-    val cacheKeys = toCacheKeys(id, serviceId, serviceColumnId, seq, me.metaSeqs)
-
-    cacheKeys.foreach { key =>
-      expireCache(key)
-      expireCaches(key)
-    }
-  }
-
-//  def findAll()(implicit session: DBSession = AutoSession) = {
-//    val ls = sql"""select * from $dbTableName""".map { rs => ServiceColumnIndex(rs) }.list.apply
-//    val singles = ls.flatMap { x =>
-//      val cacheKeys = toCacheKeys(x.id.get, x.serviceId, x.serviceColumnId, x.seq, x.metaSeqs).dropRight(1)
-//      cacheKeys.map { cacheKey =>
-//        cacheKey -> x
-//      }
-//    }
-//    val multies = ls.groupBy(x => (x.serviceId, x.serviceColumnId)).map { case ((serviceId, serviceColumnId), ls) =>
-//      val cacheKey = s"serviceId=$serviceId:serviceColumnId=$serviceColumnId"
-//      cacheKey -> ls
-//    }.toList
-//
-//    putsToCache(singles)
-//    putsToCaches(multies)
-//
-//  }
-}
-
-case class ServiceColumnIndex(id: Option[Int],
-                              serviceId: Int,
-                              serviceColumnId: Int,
-                              name: String,
-                              seq: Byte,
-                              metaSeqs: Seq[Byte],
-                              options: Option[String]) {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
index 00ef233..75e9657 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala
@@ -20,7 +20,7 @@
 package org.apache.s2graph.core.parsers
 
 import org.apache.s2graph.core.GraphExceptions.{LabelNotExistException, WhereParserException}
-import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
+import org.apache.s2graph.core.schema.{Label, LabelMeta}
 import org.apache.s2graph.core.types.InnerValLike
 import org.apache.s2graph.core.{S2EdgeLike}
 import org.apache.s2graph.core.JSONParser._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
index 6b1ce75..f59abc0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala
@@ -26,7 +26,7 @@ import com.google.common.cache.CacheBuilder
 import org.apache.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException}
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls._
+import org.apache.s2graph.core.schema._
 import org.apache.s2graph.core.parsers.{Where, WhereParser}
 import org.apache.s2graph.core.types._
 import play.api.libs.json._

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
index 460c627..c768d81 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala
@@ -24,7 +24,7 @@ import java.net.URL
 import org.apache.s2graph.core.GraphExceptions.{BadQueryException, LabelNotExistException}
 import org.apache.s2graph.core.JSONParser._
 import org.apache.s2graph.core._
-import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service}
+import org.apache.s2graph.core.schema.{Bucket, Experiment, Service}
 import org.apache.s2graph.core.utils.logger
 import play.api.libs.json._
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/8a388a42/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala
new file mode 100644
index 0000000..c88f854
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Bucket.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.schema
+
+import play.api.libs.json.{JsValue, Json}
+import scalikejdbc._
+
+import scala.util.Try
+
+object Bucket extends SQLSyntaxSupport[Bucket] {
+  import Schema._
+  val className = Bucket.getClass.getSimpleName
+
+  val rangeDelimiter = "~"
+  val INVALID_BUCKET_EXCEPTION = new RuntimeException("invalid bucket.")
+  val InActiveModulars = Set("0~0")
+
+  def valueOf(rs: WrappedResultSet): Bucket = {
+    Bucket(rs.intOpt("id"),
+      rs.int("experiment_id"),
+      rs.string("modular"),
+      rs.string("http_verb"),
+      rs.string("api_path"),
+      rs.string("request_body"),
+      rs.int("timeout"),
+      rs.string("impression_id"),
+      rs.boolean("is_graph_query"),
+      rs.boolean("is_empty"))
+  }
+
+  def finds(experimentId: Int)(implicit session: DBSession = AutoSession): List[Bucket] = {
+    val cacheKey = className + "experimentId=" + experimentId
+
+    withCaches(cacheKey, broadcast = false) {
+      sql"""select * from buckets where experiment_id = $experimentId"""
+        .map { rs => Bucket.valueOf(rs) }.list().apply()
+    }
+  }
+
+  def toRange(str: String): Option[(Int, Int)] = {
+    val range = str.split(rangeDelimiter)
+    if (range.length == 2) Option((range.head.toInt, range.last.toInt))
+    else None
+  }
+
+  def findByImpressionId(impressionId: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Bucket] = {
+    val cacheKey = className + "impressionId=" + impressionId
+
+    lazy val sql = sql"""select * from buckets where impression_id=$impressionId"""
+      .map { rs => Bucket.valueOf(rs)}.single().apply()
+
+    if (useCache) withCache(cacheKey)(sql)
+    else sql
+
+  }
+
+  def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Bucket = {
+    val cacheKey = className + "id=" + id
+    lazy val sql = sql"""select * from buckets where id = $id""".map { rs => Bucket.valueOf(rs)}.single().apply()
+    if (useCache) withCache(cacheKey, false) { sql }.get
+    else sql.get
+  }
+
+  def update(id: Int,
+             experimentId: Int,
+             modular: String,
+             httpVerb: String,
+             apiPath: String,
+             requestBody: String,
+             timeout: Int,
+             impressionId: String,
+             isGraphQuery: Boolean,
+             isEmpty: Boolean)(implicit session: DBSession = AutoSession): Try[Bucket] = {
+    Try {
+      sql"""
+            UPDATE buckets set experiment_id = $experimentId, modular = $modular, http_verb = $httpVerb, api_path = $apiPath,
+            request_body = $requestBody, timeout = $timeout, impression_id = $impressionId,
+            is_graph_query = $isGraphQuery, is_empty = $isEmpty WHERE id = $id
+        """
+        .update().apply()
+    }.map { cnt =>
+      findById(id)
+    }
+  }
+
+  def insert(experimentId: Int, modular: String, httpVerb: String, apiPath: String,
+             requestBody: String, timeout: Int, impressionId: String,
+             isGraphQuery: Boolean, isEmpty: Boolean)
+            (implicit session: DBSession = AutoSession): Try[Bucket] = {
+    Try {
+      sql"""
+            INSERT INTO buckets(experiment_id, modular, http_verb, api_path, request_body, timeout, impression_id,
+             is_graph_query, is_empty)
+            VALUES (${experimentId}, $modular, $httpVerb, $apiPath, $requestBody, $timeout, $impressionId,
+             $isGraphQuery, $isEmpty)
+        """
+        .updateAndReturnGeneratedKey().apply()
+    }.map { newId =>
+      Bucket(Some(newId.toInt), experimentId, modular, httpVerb, apiPath, requestBody, timeout, impressionId,
+        isGraphQuery, isEmpty)
+    }
+  }
+}
+
+case class Bucket(id: Option[Int],
+                  experimentId: Int,
+                  modular: String,
+                  httpVerb: String, apiPath: String,
+                  requestBody: String, timeout: Int, impressionId: String,
+                  isGraphQuery: Boolean = true,
+                  isEmpty: Boolean = false) {
+
+  import Bucket._
+
+  lazy val rangeOpt = toRange(modular)
+
+  def toJson(): JsValue =
+    Json.obj("id" -> id, "experimentId" -> experimentId, "modular" -> modular, "httpVerb" -> httpVerb,
+      "requestBody" -> requestBody, "isGraphQuery" -> isGraphQuery, "isEmpty" -> isEmpty)
+
+}