You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/01/15 09:14:30 UTC

incubator-s2graph git commit: [S2GRAPH-14] Abstract HBase specific methods in Management and Label

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master 6cfbf1d2b -> 66be5c8c3


[S2GRAPH-14] Abstract HBase specific methods in Management and Label

  Remove Management.createService from Label/Service.
  Add Management class and move createService/createLabel/createTable from Management object into Management class.
  Refactor caller of above methods according to changes.

JIRA:
  [S2GRAPH-14] https://issues.apache.org/jira/browse/S2GRAPH-14

Pull Request:
  Closes #2


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/66be5c8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/66be5c8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/66be5c8c

Branch: refs/heads/master
Commit: 66be5c8c31e89e9131e88a0e0e8a8e2e2a1b57bc
Parents: 6cfbf1d
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Jan 15 17:10:51 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Jan 15 17:10:51 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../com/kakao/s2graph/core/Management.scala     | 294 ++++++-------------
 .../com/kakao/s2graph/core/mysqls/Label.scala   |  18 +-
 .../com/kakao/s2graph/core/mysqls/Service.scala |   6 +-
 .../kakao/s2graph/core/storage/Storage.scala    |  14 +-
 .../core/storage/hbase/AsynchbaseStorage.scala  |  78 ++++-
 .../core/Integrate/IntegrateCommon.scala        |   6 +-
 .../s2graph/core/TestCommonWithModels.scala     |  14 +-
 .../s2graph/core/parsers/WhereParserTest.scala  |   2 +
 .../s2/counter/core/v1/ExactStorageHBase.scala  |   8 +-
 .../src/main/scala/s2/helper/CounterAdmin.scala |   5 +-
 .../s2/counter/core/RankingCounterSpec.scala    |   5 +-
 s2rest_play/app/Bootstrap.scala                 |   4 +-
 .../app/controllers/AdminController.scala       |  13 +-
 14 files changed, 227 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4ca49c6..188d194 100644
--- a/CHANGES
+++ b/CHANGES
@@ -6,6 +6,8 @@ Release 0.12.1 - unreleased
 
   IMPROVEMENT
 
+    S2GRAPH-14: Abstract HBase specific methods in Management and Label (Committed by DOYUNG YOON).
+
     S2GRAPH-24: Add counter config for readonly graph (Committed by Jaesang Kim).
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
index 5258a3a..32f7a5a 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
@@ -4,18 +4,11 @@ package com.kakao.s2graph.core
 import com.kakao.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
 import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop}
 import com.kakao.s2graph.core.mysqls._
+import com.kakao.s2graph.core.storage.Storage
+import com.kakao.s2graph.core.types.HBaseType._
 import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.utils.logger
-import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability}
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.regionserver.BloomType
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
-import play.Play
 import play.api.libs.json.Reads._
 import play.api.libs.json._
-
 import scala.util.Try
 
 /**
@@ -36,20 +29,8 @@ object Management extends JSONParser {
 
   import HBaseType._
 
-  val hardLimit = 10000
-  val defaultLimit = 100
-  val defaultCompressionAlgorithm = "gz"
-//    Play.application().configuration().getString("hbase.table.compression.algorithm")
+  val DefaultCompressionAlgorithm = "gz"
 
-  def createService(serviceName: String,
-                    cluster: String, hTableName: String,
-                    preSplitSize: Int, hTableTTL: Option[Int],
-                    compressionAlgorithm: String): Try[Service] = {
-
-    Model withTx { implicit session =>
-      Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
-    }
-  }
 
   def findService(serviceName: String) = {
     Service.findByName(serviceName, useCache = false)
@@ -68,62 +49,7 @@ object Management extends JSONParser {
     Label.updateHTableName(targetLabel.label, newHTableName)
   }
 
-  /**
-   * label
-   */
-  /**
-   * copy label when if oldLabel exist and newLabel do not exist.
-   * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
-   */
-  def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
-    val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists."))
-    if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
-
-    val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
-    val allIndices = old.indices.map { index => Index(index.name, index.propNames) }
-
-    createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
-      old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
-      old.isDirected, old.serviceName,
-      allIndices, allProps,
-      old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm)
-  }
-
-  def createLabel(label: String,
-                  srcServiceName: String,
-                  srcColumnName: String,
-                  srcColumnType: String,
-                  tgtServiceName: String,
-                  tgtColumnName: String,
-                  tgtColumnType: String,
-                  isDirected: Boolean = true,
-                  serviceName: String,
-                  indices: Seq[Index],
-                  props: Seq[Prop],
-                  consistencyLevel: String,
-                  hTableName: Option[String],
-                  hTableTTL: Option[Int],
-                  schemaVersion: String = DEFAULT_VERSION,
-                  isAsync: Boolean,
-                  compressionAlgorithm: String = defaultCompressionAlgorithm): Try[Label] = {
-
-    val labelOpt = Label.findByName(label, useCache = false)
-
-    Model withTx { implicit session =>
-      labelOpt match {
-        case Some(l) =>
-          throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
-        case None =>
-          Label.insertAll(label,
-            srcServiceName, srcColumnName, srcColumnType,
-            tgtServiceName, tgtColumnName, tgtColumnType,
-            isDirected, serviceName, indices, props, consistencyLevel,
-            hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm)
 
-          Label.findByName(label, useCache = false).get
-      }
-    }
-  }
 
   def createServiceColumn(serviceName: String,
                           columnName: String,
@@ -326,121 +252,6 @@ object Management extends JSONParser {
     props
   }
 
-  val idTableName = "id"
-  val cf = "a"
-  val idColName = "id"
-  val regionCnt = 10
-
-  def getAdmin(zkAddr: String) = {
-    val conf = HBaseConfiguration.create()
-    conf.set("hbase.zookeeper.quorum", zkAddr)
-    val conn = ConnectionFactory.createConnection(conf)
-    conn.getAdmin
-  }
-
-  def enableTable(zkAddr: String, tableName: String) = {
-    getAdmin(zkAddr).enableTable(TableName.valueOf(tableName))
-  }
-
-  def disableTable(zkAddr: String, tableName: String) = {
-    getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
-  }
-
-  def dropTable(zkAddr: String, tableName: String) = {
-    getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
-    getAdmin(zkAddr).deleteTable(TableName.valueOf(tableName))
-  }
-
-  //  def deleteEdgesByLabelIds(zkAddr: String,
-  //    tableName: String,
-  //    labelIds: String = "",
-  //    minTs: Long = 0L,
-  //    maxTs: Long = Long.MaxValue,
-  //    include: Boolean = true) = {
-  //    val conf = HBaseConfiguration.create()
-  //    val longTimeout = "1200000"
-  //    conf.set("hbase.rpc.timeout", longTimeout)
-  //    conf.set("hbase.client.operation.timeout", longTimeout)
-  //    conf.set("hbase.client.scanner.timeout.period", longTimeout)
-  //    conf.set("hbase.zookeeper.quorum", zkAddr)
-  //    val conn = HConnectionManager.createConnection(conf)
-  //    val table = conn.getTable(tableName.getBytes)
-  //    var builder = DeleteLabelsArgument.newBuilder()
-  //    val scanner = Scan.newBuilder()
-  //
-  //    scanner.setTimeRange(TimeRange.newBuilder().setFrom(minTs).setTo(maxTs))
-  //    /**
-  //     *  when we clean up all data does not match current database ids
-  //     *  we will delete row completely
-  //     */
-  //    if (!include) scanner.setFilter(ProtobufUtil.toFilter(new FirstKeyOnlyFilter))
-  //
-  //    builder.setScan(scanner)
-  //    for (id <- labelIds.split(",")) {
-  //      builder.addId(id.toInt)
-  //    }
-  //
-  //    val argument = builder.build()
-  //
-  //    val regionStats = table.coprocessorService(classOf[GraphStatService], null, null,
-  //      new Batch.Call[GraphStatService, Long]() {
-  //        override def call(counter: GraphStatService): Long = {
-  //          val controller: ServerRpcController = new ServerRpcController()
-  //          val rpcCallback: BlockingRpcCallback[CountResponse] = new BlockingRpcCallback[CountResponse]()
-  //
-  //          if (include) {
-  //            counter.cleanUpDeleteLabelsRows(controller, argument, rpcCallback)
-  //          } else {
-  //            counter.cleanUpDeleteLabelsRowsExclude(controller, argument, rpcCallback)
-  //          }
-  //
-  //          val response: CountResponse = rpcCallback.get()
-  //          if (controller.failedOnException()) throw controller.getFailedOn()
-  //          if (response != null && response.hasCount()) {
-  //            response.getCount()
-  //          } else {
-  //            0L
-  //          }
-  //        }
-  //      })
-  //
-  //    //    regionStats.map(kv => Bytes.toString(kv._1) -> kv._2) ++ Map("total" -> regionStats.values().sum)
-  //  }
-
-  def createTable(zkAddr: String, tableName: String, cfs: List[String], regionMultiplier: Int, ttl: Option[Int],
-                  compressionAlgorithm: String = defaultCompressionAlgorithm) = {
-    logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
-    val admin = getAdmin(zkAddr)
-    val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
-    if (!admin.tableExists(TableName.valueOf(tableName))) {
-      try {
-        val desc = new HTableDescriptor(TableName.valueOf(tableName))
-        desc.setDurability(Durability.ASYNC_WAL)
-        for (cf <- cfs) {
-          val columnDesc = new HColumnDescriptor(cf)
-            .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
-            .setBloomFilterType(BloomType.ROW)
-            .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
-            .setMaxVersions(1)
-            .setTimeToLive(2147483647)
-            .setMinVersions(0)
-            .setBlocksize(32768)
-            .setBlockCacheEnabled(true)
-          if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
-          desc.addFamily(columnDesc)
-        }
-
-        if (regionCount <= 1) admin.createTable(desc)
-        else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
-      } catch {
-        case e: Throwable =>
-          logger.error(s"$zkAddr, $tableName failed with $e", e)
-          throw e
-      }
-    } else {
-      logger.info(s"$zkAddr, $tableName, $cf already exist.")
-    }
-  }
 
   /**
    * update label name.
@@ -457,15 +268,102 @@ object Management extends JSONParser {
       }
     }
   }
+}
+
+class Management(graph: Graph) {
+  import Management._
+  val storage = graph.storage
 
-  // we only use murmur hash to distribute row key.
-  def getStartKey(regionCount: Int): Array[Byte] = {
-    Bytes.toBytes((Int.MaxValue / regionCount))
+  def createTable(zkAddr: String,
+                  tableName: String,
+                  cfs: List[String],
+                  regionMultiplier: Int,
+                  ttl: Option[Int],
+                  compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit =
+    storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm)
+
+  /** HBase specific code */
+  def createService(serviceName: String,
+                    cluster: String, hTableName: String,
+                    preSplitSize: Int, hTableTTL: Option[Int],
+                    compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = {
+
+    Model withTx { implicit session =>
+      val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
+      /** create hbase table for service */
+      storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
+      service
+    }
   }
 
-  def getEndKey(regionCount: Int): Array[Byte] = {
-    Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
+  /** HBase specific code */
+  def createLabel(label: String,
+                  srcServiceName: String,
+                  srcColumnName: String,
+                  srcColumnType: String,
+                  tgtServiceName: String,
+                  tgtColumnName: String,
+                  tgtColumnType: String,
+                  isDirected: Boolean = true,
+                  serviceName: String,
+                  indices: Seq[Index],
+                  props: Seq[Prop],
+                  consistencyLevel: String,
+                  hTableName: Option[String],
+                  hTableTTL: Option[Int],
+                  schemaVersion: String = DEFAULT_VERSION,
+                  isAsync: Boolean,
+                  compressionAlgorithm: String = "gz"): Try[Label] = {
+
+    val labelOpt = Label.findByName(label, useCache = false)
+
+    Model withTx { implicit session =>
+      labelOpt match {
+        case Some(l) =>
+          throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
+        case None =>
+          /** create all models */
+          val newLabel = Label.insertAll(label,
+            srcServiceName, srcColumnName, srcColumnType,
+            tgtServiceName, tgtColumnName, tgtColumnType,
+            isDirected, serviceName, indices, props, consistencyLevel,
+            hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm)
+
+          /** create hbase table */
+          val service = newLabel.service
+          (hTableName, hTableTTL) match {
+            case (None, None) => // do nothing
+            case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
+            case (Some(hbaseTableName), None) =>
+              // create own hbase table with default ttl on service level.
+              storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
+            case (Some(hbaseTableName), Some(hbaseTableTTL)) =>
+              // create own hbase table with own ttl.
+              storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm)
+          }
+          newLabel
+      }
+    }
   }
 
+  /**
+   * label
+   */
+  /**
+   * copy label when if oldLabel exist and newLabel do not exist.
+   * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
+   */
+  def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
+    val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists."))
+    if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
+
+    val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
+    val allIndices = old.indices.map { index => Index(index.name, index.propNames) }
 
-}
+    createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
+      old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
+      old.isDirected, old.serviceName,
+      allIndices, allProps,
+      old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala
index 1e49abd..005a01e 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala
@@ -1,9 +1,5 @@
 package com.kakao.s2graph.core.mysqls
 
-/**
- * Created by shon on 6/3/15.
- */
-
 import com.kakao.s2graph.core.GraphExceptions.ModelNotFoundException
 import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop}
 import com.kakao.s2graph.core.utils.logger
@@ -141,7 +137,7 @@ object Label extends Model[Label] {
                 hTableTTL: Option[Int],
                 schemaVersion: String,
                 isAsync: Boolean,
-                compressionAlgorithm: String)(implicit session: DBSession = AutoSession) = {
+                compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Label = {
 
     val srcServiceOpt = Service.findByName(srcServiceName, useCache = false)
     val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false)
@@ -188,18 +184,6 @@ object Label extends Model[Label] {
             }
           }
 
-          /** TODO: */
-          (hTableName, hTableTTL) match {
-            case (None, None) => // do nothing
-            case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
-            case (Some(hbaseTableName), None) =>
-              // create own hbase table with default ttl on service level.
-              Management.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
-            case (Some(hbaseTableName), Some(hbaseTableTTL)) =>
-              // create own hbase table with own ttl.
-              Management.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm)
-          }
-
           val cacheKeys = List(s"id=$createdId", s"label=$labelName")
           val ret = findByName(labelName, useCache = false).get
           putsToCache(cacheKeys.map(k => k -> ret))

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala
index edd36df..2840db8 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala
@@ -1,8 +1,5 @@
 package com.kakao.s2graph.core.mysqls
 
-/**
- * Created by shon on 6/3/15.
- */
 
 import java.util.UUID
 
@@ -39,12 +36,11 @@ object Service extends Model[Service] {
 
   def insert(serviceName: String, cluster: String,
              hTableName: String, preSplitSize: Int, hTableTTL: Option[Int],
-             compressionAlgorithm: String)(implicit session: DBSession = AutoSession) = {
+             compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Unit = {
     logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, $hTableTTL, $compressionAlgorithm")
     val accessToken = UUID.randomUUID().toString()
     sql"""insert into services(service_name, access_token, cluster, hbase_table_name, pre_split_size, hbase_table_ttl)
     values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, ${preSplitSize}, ${hTableTTL})""".execute.apply()
-    Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
   }
 
   def delete(id: Int)(implicit session: DBSession = AutoSession) = {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
index 79303a5..bff0f3b 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
@@ -2,14 +2,16 @@ package com.kakao.s2graph.core.storage
 
 import com.google.common.cache.Cache
 import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.Label
+import com.kakao.s2graph.core.mysqls.{Service, Label}
 import com.kakao.s2graph.core.utils.logger
+import com.typesafe.config.Config
 
 
 import scala.collection.Seq
 import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Try
 
-abstract class Storage(implicit ec: ExecutionContext) {
+abstract class Storage(val config: Config)(implicit ec: ExecutionContext) {
 
   def cacheOpt: Option[Cache[Integer, Seq[QueryResult]]]
 
@@ -67,6 +69,12 @@ abstract class Storage(implicit ec: ExecutionContext) {
 
   def flush(): Unit
 
+  def createTable(zkAddr: String,
+                  tableName: String,
+                  cfs: List[String],
+                  regionMultiplier: Int,
+                  ttl: Option[Int],
+                  compressionAlgorithm: String): Unit
 
   def toEdge[K: CanSKeyValue](kv: K,
                               queryParam: QueryParam,
@@ -133,5 +141,7 @@ abstract class Storage(implicit ec: ExecutionContext) {
       }
     }
   }
+
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 74f580e..3ccb473 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -11,6 +11,12 @@ import com.kakao.s2graph.core.types._
 import com.kakao.s2graph.core.utils.{Extensions, logger}
 import com.stumbleupon.async.Deferred
 import com.typesafe.config.Config
+import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability}
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.regionserver.BloomType
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
 import org.hbase.async._
 
 import scala.collection.JavaConversions._
@@ -44,8 +50,8 @@ object AsynchbaseStorage {
   }
 }
 
-class AsynchbaseStorage(val config: Config, vertexCache: Cache[Integer, Option[Vertex]])
-                       (implicit ec: ExecutionContext) extends Storage {
+class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, Option[Vertex]])
+                       (implicit ec: ExecutionContext) extends Storage(config) {
 
   import AsynchbaseStorage._
 
@@ -737,4 +743,72 @@ class AsynchbaseStorage(val config: Config, vertexCache: Cache[Integer, Option[V
     Await.result(client.flush().toFuture, timeout)
   }
 
+
+  def createTable(zkAddr: String,
+                  tableName: String,
+                  cfs: List[String],
+                  regionMultiplier: Int,
+                  ttl: Option[Int],
+                  compressionAlgorithm: String): Unit = {
+    logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
+    val admin = getAdmin(zkAddr)
+    val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
+    if (!admin.tableExists(TableName.valueOf(tableName))) {
+      try {
+        val desc = new HTableDescriptor(TableName.valueOf(tableName))
+        desc.setDurability(Durability.ASYNC_WAL)
+        for (cf <- cfs) {
+          val columnDesc = new HColumnDescriptor(cf)
+            .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+            .setBloomFilterType(BloomType.ROW)
+            .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+            .setMaxVersions(1)
+            .setTimeToLive(2147483647)
+            .setMinVersions(0)
+            .setBlocksize(32768)
+            .setBlockCacheEnabled(true)
+          if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+          desc.addFamily(columnDesc)
+        }
+
+        if (regionCount <= 1) admin.createTable(desc)
+        else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
+      } catch {
+        case e: Throwable =>
+          logger.error(s"$zkAddr, $tableName failed with $e", e)
+          throw e
+      }
+    } else {
+      logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+    }
+  }
+
+
+  private def getAdmin(zkAddr: String) = {
+    val conf = HBaseConfiguration.create()
+    conf.set("hbase.zookeeper.quorum", zkAddr)
+    val conn = ConnectionFactory.createConnection(conf)
+    conn.getAdmin
+  }
+  private def enableTable(zkAddr: String, tableName: String) = {
+    getAdmin(zkAddr).enableTable(TableName.valueOf(tableName))
+  }
+
+  private def disableTable(zkAddr: String, tableName: String) = {
+    getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
+  }
+
+  private def dropTable(zkAddr: String, tableName: String) = {
+    getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
+    getAdmin(zkAddr).deleteTable(TableName.valueOf(tableName))
+  }
+
+  private def getStartKey(regionCount: Int): Array[Byte] = {
+    Bytes.toBytes((Int.MaxValue / regionCount))
+  }
+
+  private def getEndKey(regionCount: Int): Array[Byte] = {
+    Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
index 0ced48f..e60b824 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
@@ -17,11 +17,13 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
 
   var graph: Graph = _
   var parser: RequestParser = _
+  var management: Management = _
   var config: Config = _
 
   override def beforeAll = {
     config = ConfigFactory.load()
     graph = new Graph(config)(ExecutionContext.Implicits.global)
+    management = new Management(graph)
     parser = new RequestParser(graph.config)
     initTestData()
   }
@@ -43,7 +45,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
       parser.toServiceElements(jsValue)
 
     val tryRes =
-      Management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
+      management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
     println(s">> Service created : $createService, $tryRes")
 
     val labelNames = Map(testLabelName -> testLabelNameCreate,
@@ -60,7 +62,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
           val json = Json.parse(create)
           val tryRes = for {
             labelArgs <- parser.toLabelElements(json)
-            label <- (Management.createLabel _).tupled(labelArgs)
+            label <- (management.createLabel _).tupled(labelArgs)
           } yield label
 
           tryRes.get

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
index e2a3363..4fa7b59 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
@@ -18,10 +18,12 @@ trait TestCommonWithModels {
 
   var graph: Graph = _
   var config: Config = _
+  var management: Management = _
 
   def initTests() = {
     config = ConfigFactory.load()
     graph = new Graph(config)(ExecutionContext.Implicits.global)
+    management = new Management(graph)
 
     implicit val session = AutoSession
 
@@ -75,8 +77,8 @@ trait TestCommonWithModels {
 
   def createTestService() = {
     implicit val session = AutoSession
-    Management.createService(serviceName, cluster, hTableName, preSplitSize, hTableTTL = None, "gz")
-    Management.createService(serviceNameV2, cluster, hTableName, preSplitSize, hTableTTL = None, "gz")
+    management.createService(serviceName, cluster, hTableName, preSplitSize, hTableTTL = None, "gz")
+    management.createService(serviceNameV2, cluster, hTableName, preSplitSize, hTableTTL = None, "gz")
   }
 
   def deleteTestService() = {
@@ -95,16 +97,16 @@ trait TestCommonWithModels {
 
   def createTestLabel() = {
     implicit val session = AutoSession
-    Management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType,
+    management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType,
       isDirected = true, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4")
 
-    Management.createLabel(labelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
+    management.createLabel(labelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
       isDirected = true, serviceNameV2, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
 
-    Management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType,
+    management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType,
       isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4")
 
-    Management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
+    management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
       isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
index 393d8b1..659983c 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
@@ -8,6 +8,8 @@ import play.api.libs.json.Json
 
 class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels {
   // dummy data for dummy edge
+  initTests()
+  
   import HBaseType.{VERSION1, VERSION2}
 
   val ts = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala b/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala
index b946f0a..a664de4 100644
--- a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala
+++ b/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala
@@ -1,5 +1,6 @@
 package s2.counter.core.v1
 
+import com.kakao.s2graph.core.Graph
 import com.typesafe.config.Config
 import org.apache.hadoop.hbase.CellUtil
 import org.apache.hadoop.hbase.client._
@@ -17,9 +18,7 @@ import scala.collection.JavaConversions._
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
-/**
- * Created by hsleep(honeysleep@gmail.com) on 2015. 10. 1..
- */
+
 class ExactStorageHBase(config: Config) extends ExactStorage {
   import ExactStorageHBase._
 
@@ -30,6 +29,8 @@ class ExactStorageHBase(config: Config) extends ExactStorage {
   private[counter] val withHBase = new WithHBase(config)
   private[counter] val hbaseManagement = new Management(config)
 
+
+
   private def getTableName(policy: Counter): String = {
     policy.hbaseTable.getOrElse(s2config.HBASE_TABLE_NAME)
   }
@@ -280,6 +281,7 @@ class ExactStorageHBase(config: Config) extends ExactStorage {
   override def prepare(policy: Counter): Unit = {
     // create hbase table
     policy.hbaseTable.foreach { table =>
+
       if (!hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table)) {
         hbaseManagement.createTable(s2config.HBASE_ZOOKEEPER_QUORUM, table,
           ColumnFamily.values.map(_.toString).toList, 1)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala b/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala
index 8547bc8..3cf9181 100644
--- a/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala
+++ b/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala
@@ -1,5 +1,6 @@
 package s2.helper
 
+import com.kakao.s2graph.core.Graph
 import com.kakao.s2graph.core.mysqls.Label
 import com.typesafe.config.Config
 import play.api.libs.json.Json
@@ -18,11 +19,13 @@ class CounterAdmin(config: Config) {
   val s2config = new S2CounterConfig(config)
   val counterModel = new CounterModel(config)
   val graphOp = new GraphOperation(config)
+  val s2graph = new Graph(config)(scala.concurrent.ExecutionContext.global)
+  val storageManagement = new com.kakao.s2graph.core.Management(s2graph)
 
   def setupCounterOnGraph(): Unit = {
     // create s2counter service
     val service = "s2counter"
-    com.kakao.s2graph.core.Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"$service-${config.getString("phase")}", 1, None, "gz")
+    storageManagement.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"$service-${config.getString("phase")}", 1, None, "gz")
     // create bucket label
     val label = "s2counter_topK_bucket"
     if (Label.findByName(label, useCache = false).isEmpty) {

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala b/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala
index dd6a6bf..ea67a2a 100644
--- a/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala
+++ b/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala
@@ -1,5 +1,6 @@
 package s2.counter.core
 
+import com.kakao.s2graph.core.{Management, Graph}
 import com.kakao.s2graph.core.mysqls.Label
 import com.typesafe.config.ConfigFactory
 import org.specs2.mutable.Specification
@@ -70,7 +71,9 @@ class RankingCounterSpec extends Specification with BeforeAfterAll {
       }
 
       val graphOp = new GraphOperation(config)
-      com.kakao.s2graph.core.Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz")
+      val graph = new Graph(config)(scala.concurrent.ExecutionContext.global)
+      val management = new Management(graph)
+      management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz")
       val strJs =
         s"""
            |{

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2rest_play/app/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/Bootstrap.scala b/s2rest_play/app/Bootstrap.scala
index a661ac3..a1acb10 100644
--- a/s2rest_play/app/Bootstrap.scala
+++ b/s2rest_play/app/Bootstrap.scala
@@ -5,7 +5,7 @@ import java.util.concurrent.Executors
 import actors.QueueActor
 import com.kakao.s2graph.core.rest.RequestParser
 import com.kakao.s2graph.core.utils.logger
-import com.kakao.s2graph.core.{ExceptionHandler, Graph}
+import com.kakao.s2graph.core.{Management, ExceptionHandler, Graph}
 import config.Config
 import controllers.{AdminController, ApplicationController}
 import play.api.Application
@@ -18,6 +18,7 @@ import scala.util.Try
 
 object Global extends WithFilters(new GzipFilter()) {
   var s2graph: Graph = _
+  var storageManagement: Management = _
   var s2parser: RequestParser = _
 
   // Application entry point
@@ -32,6 +33,7 @@ object Global extends WithFilters(new GzipFilter()) {
 
     // init s2graph with config
     s2graph = new Graph(config)(ec)
+    storageManagement = new Management(s2graph)
     s2parser = new RequestParser(s2graph.config) // merged config
 
     QueueActor.init(s2graph)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2rest_play/app/controllers/AdminController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/AdminController.scala b/s2rest_play/app/controllers/AdminController.scala
index 2eb248b..bb73c40 100644
--- a/s2rest_play/app/controllers/AdminController.scala
+++ b/s2rest_play/app/controllers/AdminController.scala
@@ -14,6 +14,7 @@ import scala.util.{Failure, Success, Try}
 object AdminController extends Controller {
 
   import ApplicationController._
+  private val management: Management = com.kakao.s2graph.rest.Global.storageManagement
   private val requestParser: RequestParser = com.kakao.s2graph.rest.Global.s2parser
 
   /**
@@ -181,9 +182,10 @@ object AdminController extends Controller {
     tryResponse(serviceTry)(_.toJson)
   }
 
+
   def createServiceInner(jsValue: JsValue) = {
     val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = requestParser.toServiceElements(jsValue)
-    Management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
+    management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
   }
 
   /**
@@ -195,9 +197,10 @@ object AdminController extends Controller {
     tryResponse(ret)(_.toJson)
   }
 
+
   def createLabelInner(json: JsValue) = for {
     labelArgs <- requestParser.toLabelElements(json)
-    label <- (Management.createLabel _).tupled(labelArgs)
+    label <- (management.createLabel _).tupled(labelArgs)
   } yield label
 
   /**
@@ -322,7 +325,7 @@ object AdminController extends Controller {
    * @return
    */
   def copyLabel(oldLabelName: String, newLabelName: String) = Action { request =>
-    val copyTry = Management.copyLabel(oldLabelName, newLabelName, Some(newLabelName))
+    val copyTry = management.copyLabel(oldLabelName, newLabelName, Some(newLabelName))
     tryResponse(copyTry)(_.label + "created")
   }
 
@@ -408,7 +411,9 @@ object AdminController extends Controller {
     //    Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
     request.body.asJson.map(_.validate[HTableParams] match {
       case JsSuccess(hTableParams, _) => {
-        Management.createTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"), hTableParams.preSplitSize, hTableParams.hTableTTL, hTableParams.compressionAlgorithm.getOrElse(Management.defaultCompressionAlgorithm))
+        management.createTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"),
+          hTableParams.preSplitSize, hTableParams.hTableTTL,
+          hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm))
         logger.info(hTableParams.toString())
         ok(s"HTable was created.")
       }