You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/01/15 09:14:30 UTC
incubator-s2graph git commit: [S2GRAPH-14] Abstract HBase specific
methods in Management and Label
Repository: incubator-s2graph
Updated Branches:
refs/heads/master 6cfbf1d2b -> 66be5c8c3
[S2GRAPH-14] Abstract HBase specific methods in Management and Label
Remove Management.createService from Label/Service.
Add Management class and move createService/createLabel/createTable from Management object into Management class.
Refactor caller of above methods according to changes.
JIRA:
[S2GRAPH-14] https://issues.apache.org/jira/browse/S2GRAPH-14
Pull Request:
Closes #2
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/66be5c8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/66be5c8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/66be5c8c
Branch: refs/heads/master
Commit: 66be5c8c31e89e9131e88a0e0e8a8e2e2a1b57bc
Parents: 6cfbf1d
Author: DO YUNG YOON <st...@apache.org>
Authored: Fri Jan 15 17:10:51 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Fri Jan 15 17:10:51 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../com/kakao/s2graph/core/Management.scala | 294 ++++++-------------
.../com/kakao/s2graph/core/mysqls/Label.scala | 18 +-
.../com/kakao/s2graph/core/mysqls/Service.scala | 6 +-
.../kakao/s2graph/core/storage/Storage.scala | 14 +-
.../core/storage/hbase/AsynchbaseStorage.scala | 78 ++++-
.../core/Integrate/IntegrateCommon.scala | 6 +-
.../s2graph/core/TestCommonWithModels.scala | 14 +-
.../s2graph/core/parsers/WhereParserTest.scala | 2 +
.../s2/counter/core/v1/ExactStorageHBase.scala | 8 +-
.../src/main/scala/s2/helper/CounterAdmin.scala | 5 +-
.../s2/counter/core/RankingCounterSpec.scala | 5 +-
s2rest_play/app/Bootstrap.scala | 4 +-
.../app/controllers/AdminController.scala | 13 +-
14 files changed, 227 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4ca49c6..188d194 100644
--- a/CHANGES
+++ b/CHANGES
@@ -6,6 +6,8 @@ Release 0.12.1 - unreleased
IMPROVEMENT
+ S2GRAPH-14: Abstract HBase specific methods in Management and Label (Committed by DOYUNG YOON).
+
S2GRAPH-24: Add counter config for readonly graph (Committed by Jaesang Kim).
BUG FIXES
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
index 5258a3a..32f7a5a 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala
@@ -4,18 +4,11 @@ package com.kakao.s2graph.core
import com.kakao.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException}
import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop}
import com.kakao.s2graph.core.mysqls._
+import com.kakao.s2graph.core.storage.Storage
+import com.kakao.s2graph.core.types.HBaseType._
import com.kakao.s2graph.core.types._
-import com.kakao.s2graph.core.utils.logger
-import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability}
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
-import org.apache.hadoop.hbase.regionserver.BloomType
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
-import play.Play
import play.api.libs.json.Reads._
import play.api.libs.json._
-
import scala.util.Try
/**
@@ -36,20 +29,8 @@ object Management extends JSONParser {
import HBaseType._
- val hardLimit = 10000
- val defaultLimit = 100
- val defaultCompressionAlgorithm = "gz"
-// Play.application().configuration().getString("hbase.table.compression.algorithm")
+ val DefaultCompressionAlgorithm = "gz"
- def createService(serviceName: String,
- cluster: String, hTableName: String,
- preSplitSize: Int, hTableTTL: Option[Int],
- compressionAlgorithm: String): Try[Service] = {
-
- Model withTx { implicit session =>
- Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
- }
- }
def findService(serviceName: String) = {
Service.findByName(serviceName, useCache = false)
@@ -68,62 +49,7 @@ object Management extends JSONParser {
Label.updateHTableName(targetLabel.label, newHTableName)
}
- /**
- * label
- */
- /**
- * copy label when if oldLabel exist and newLabel do not exist.
- * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
- */
- def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
- val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists."))
- if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
-
- val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
- val allIndices = old.indices.map { index => Index(index.name, index.propNames) }
-
- createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
- old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
- old.isDirected, old.serviceName,
- allIndices, allProps,
- old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm)
- }
-
- def createLabel(label: String,
- srcServiceName: String,
- srcColumnName: String,
- srcColumnType: String,
- tgtServiceName: String,
- tgtColumnName: String,
- tgtColumnType: String,
- isDirected: Boolean = true,
- serviceName: String,
- indices: Seq[Index],
- props: Seq[Prop],
- consistencyLevel: String,
- hTableName: Option[String],
- hTableTTL: Option[Int],
- schemaVersion: String = DEFAULT_VERSION,
- isAsync: Boolean,
- compressionAlgorithm: String = defaultCompressionAlgorithm): Try[Label] = {
-
- val labelOpt = Label.findByName(label, useCache = false)
-
- Model withTx { implicit session =>
- labelOpt match {
- case Some(l) =>
- throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
- case None =>
- Label.insertAll(label,
- srcServiceName, srcColumnName, srcColumnType,
- tgtServiceName, tgtColumnName, tgtColumnType,
- isDirected, serviceName, indices, props, consistencyLevel,
- hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm)
- Label.findByName(label, useCache = false).get
- }
- }
- }
def createServiceColumn(serviceName: String,
columnName: String,
@@ -326,121 +252,6 @@ object Management extends JSONParser {
props
}
- val idTableName = "id"
- val cf = "a"
- val idColName = "id"
- val regionCnt = 10
-
- def getAdmin(zkAddr: String) = {
- val conf = HBaseConfiguration.create()
- conf.set("hbase.zookeeper.quorum", zkAddr)
- val conn = ConnectionFactory.createConnection(conf)
- conn.getAdmin
- }
-
- def enableTable(zkAddr: String, tableName: String) = {
- getAdmin(zkAddr).enableTable(TableName.valueOf(tableName))
- }
-
- def disableTable(zkAddr: String, tableName: String) = {
- getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
- }
-
- def dropTable(zkAddr: String, tableName: String) = {
- getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
- getAdmin(zkAddr).deleteTable(TableName.valueOf(tableName))
- }
-
- // def deleteEdgesByLabelIds(zkAddr: String,
- // tableName: String,
- // labelIds: String = "",
- // minTs: Long = 0L,
- // maxTs: Long = Long.MaxValue,
- // include: Boolean = true) = {
- // val conf = HBaseConfiguration.create()
- // val longTimeout = "1200000"
- // conf.set("hbase.rpc.timeout", longTimeout)
- // conf.set("hbase.client.operation.timeout", longTimeout)
- // conf.set("hbase.client.scanner.timeout.period", longTimeout)
- // conf.set("hbase.zookeeper.quorum", zkAddr)
- // val conn = HConnectionManager.createConnection(conf)
- // val table = conn.getTable(tableName.getBytes)
- // var builder = DeleteLabelsArgument.newBuilder()
- // val scanner = Scan.newBuilder()
- //
- // scanner.setTimeRange(TimeRange.newBuilder().setFrom(minTs).setTo(maxTs))
- // /**
- // * when we clean up all data does not match current database ids
- // * we will delete row completely
- // */
- // if (!include) scanner.setFilter(ProtobufUtil.toFilter(new FirstKeyOnlyFilter))
- //
- // builder.setScan(scanner)
- // for (id <- labelIds.split(",")) {
- // builder.addId(id.toInt)
- // }
- //
- // val argument = builder.build()
- //
- // val regionStats = table.coprocessorService(classOf[GraphStatService], null, null,
- // new Batch.Call[GraphStatService, Long]() {
- // override def call(counter: GraphStatService): Long = {
- // val controller: ServerRpcController = new ServerRpcController()
- // val rpcCallback: BlockingRpcCallback[CountResponse] = new BlockingRpcCallback[CountResponse]()
- //
- // if (include) {
- // counter.cleanUpDeleteLabelsRows(controller, argument, rpcCallback)
- // } else {
- // counter.cleanUpDeleteLabelsRowsExclude(controller, argument, rpcCallback)
- // }
- //
- // val response: CountResponse = rpcCallback.get()
- // if (controller.failedOnException()) throw controller.getFailedOn()
- // if (response != null && response.hasCount()) {
- // response.getCount()
- // } else {
- // 0L
- // }
- // }
- // })
- //
- // // regionStats.map(kv => Bytes.toString(kv._1) -> kv._2) ++ Map("total" -> regionStats.values().sum)
- // }
-
- def createTable(zkAddr: String, tableName: String, cfs: List[String], regionMultiplier: Int, ttl: Option[Int],
- compressionAlgorithm: String = defaultCompressionAlgorithm) = {
- logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
- val admin = getAdmin(zkAddr)
- val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
- if (!admin.tableExists(TableName.valueOf(tableName))) {
- try {
- val desc = new HTableDescriptor(TableName.valueOf(tableName))
- desc.setDurability(Durability.ASYNC_WAL)
- for (cf <- cfs) {
- val columnDesc = new HColumnDescriptor(cf)
- .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
- .setBloomFilterType(BloomType.ROW)
- .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
- .setMaxVersions(1)
- .setTimeToLive(2147483647)
- .setMinVersions(0)
- .setBlocksize(32768)
- .setBlockCacheEnabled(true)
- if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
- desc.addFamily(columnDesc)
- }
-
- if (regionCount <= 1) admin.createTable(desc)
- else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
- } catch {
- case e: Throwable =>
- logger.error(s"$zkAddr, $tableName failed with $e", e)
- throw e
- }
- } else {
- logger.info(s"$zkAddr, $tableName, $cf already exist.")
- }
- }
/**
* update label name.
@@ -457,15 +268,102 @@ object Management extends JSONParser {
}
}
}
+}
+
+class Management(graph: Graph) {
+ import Management._
+ val storage = graph.storage
- // we only use murmur hash to distribute row key.
- def getStartKey(regionCount: Int): Array[Byte] = {
- Bytes.toBytes((Int.MaxValue / regionCount))
+ def createTable(zkAddr: String,
+ tableName: String,
+ cfs: List[String],
+ regionMultiplier: Int,
+ ttl: Option[Int],
+ compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit =
+ storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm)
+
+ /** HBase specific code */
+ def createService(serviceName: String,
+ cluster: String, hTableName: String,
+ preSplitSize: Int, hTableTTL: Option[Int],
+ compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = {
+
+ Model withTx { implicit session =>
+ val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm)
+ /** create hbase table for service */
+ storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
+ service
+ }
}
- def getEndKey(regionCount: Int): Array[Byte] = {
- Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
+ /** HBase specific code */
+ def createLabel(label: String,
+ srcServiceName: String,
+ srcColumnName: String,
+ srcColumnType: String,
+ tgtServiceName: String,
+ tgtColumnName: String,
+ tgtColumnType: String,
+ isDirected: Boolean = true,
+ serviceName: String,
+ indices: Seq[Index],
+ props: Seq[Prop],
+ consistencyLevel: String,
+ hTableName: Option[String],
+ hTableTTL: Option[Int],
+ schemaVersion: String = DEFAULT_VERSION,
+ isAsync: Boolean,
+ compressionAlgorithm: String = "gz"): Try[Label] = {
+
+ val labelOpt = Label.findByName(label, useCache = false)
+
+ Model withTx { implicit session =>
+ labelOpt match {
+ case Some(l) =>
+ throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.")
+ case None =>
+ /** create all models */
+ val newLabel = Label.insertAll(label,
+ srcServiceName, srcColumnName, srcColumnType,
+ tgtServiceName, tgtColumnName, tgtColumnType,
+ isDirected, serviceName, indices, props, consistencyLevel,
+ hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm)
+
+ /** create hbase table */
+ val service = newLabel.service
+ (hTableName, hTableTTL) match {
+ case (None, None) => // do nothing
+ case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
+ case (Some(hbaseTableName), None) =>
+ // create own hbase table with default ttl on service level.
+ storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
+ case (Some(hbaseTableName), Some(hbaseTableTTL)) =>
+ // create own hbase table with own ttl.
+ storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm)
+ }
+ newLabel
+ }
+ }
}
+ /**
+ * label
+ */
+ /**
+ * copy label when if oldLabel exist and newLabel do not exist.
+ * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster.
+ */
+ def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = {
+ val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists."))
+ if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.")
+
+ val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) }
+ val allIndices = old.indices.map { index => Index(index.name, index.propNames) }
-}
+ createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType,
+ old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType,
+ old.isDirected, old.serviceName,
+ allIndices, allProps,
+ old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala
index 1e49abd..005a01e 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Label.scala
@@ -1,9 +1,5 @@
package com.kakao.s2graph.core.mysqls
-/**
- * Created by shon on 6/3/15.
- */
-
import com.kakao.s2graph.core.GraphExceptions.ModelNotFoundException
import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop}
import com.kakao.s2graph.core.utils.logger
@@ -141,7 +137,7 @@ object Label extends Model[Label] {
hTableTTL: Option[Int],
schemaVersion: String,
isAsync: Boolean,
- compressionAlgorithm: String)(implicit session: DBSession = AutoSession) = {
+ compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Label = {
val srcServiceOpt = Service.findByName(srcServiceName, useCache = false)
val tgtServiceOpt = Service.findByName(tgtServiceName, useCache = false)
@@ -188,18 +184,6 @@ object Label extends Model[Label] {
}
}
- /** TODO: */
- (hTableName, hTableTTL) match {
- case (None, None) => // do nothing
- case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also")
- case (Some(hbaseTableName), None) =>
- // create own hbase table with default ttl on service level.
- Management.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm)
- case (Some(hbaseTableName), Some(hbaseTableTTL)) =>
- // create own hbase table with own ttl.
- Management.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm)
- }
-
val cacheKeys = List(s"id=$createdId", s"label=$labelName")
val ret = findByName(labelName, useCache = false).get
putsToCache(cacheKeys.map(k => k -> ret))
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala
index edd36df..2840db8 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala
@@ -1,8 +1,5 @@
package com.kakao.s2graph.core.mysqls
-/**
- * Created by shon on 6/3/15.
- */
import java.util.UUID
@@ -39,12 +36,11 @@ object Service extends Model[Service] {
def insert(serviceName: String, cluster: String,
hTableName: String, preSplitSize: Int, hTableTTL: Option[Int],
- compressionAlgorithm: String)(implicit session: DBSession = AutoSession) = {
+ compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Unit = {
logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, $hTableTTL, $compressionAlgorithm")
val accessToken = UUID.randomUUID().toString()
sql"""insert into services(service_name, access_token, cluster, hbase_table_name, pre_split_size, hbase_table_ttl)
values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, ${preSplitSize}, ${hTableTTL})""".execute.apply()
- Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
}
def delete(id: Int)(implicit session: DBSession = AutoSession) = {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
index 79303a5..bff0f3b 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala
@@ -2,14 +2,16 @@ package com.kakao.s2graph.core.storage
import com.google.common.cache.Cache
import com.kakao.s2graph.core._
-import com.kakao.s2graph.core.mysqls.Label
+import com.kakao.s2graph.core.mysqls.{Service, Label}
import com.kakao.s2graph.core.utils.logger
+import com.typesafe.config.Config
import scala.collection.Seq
import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Try
-abstract class Storage(implicit ec: ExecutionContext) {
+abstract class Storage(val config: Config)(implicit ec: ExecutionContext) {
def cacheOpt: Option[Cache[Integer, Seq[QueryResult]]]
@@ -67,6 +69,12 @@ abstract class Storage(implicit ec: ExecutionContext) {
def flush(): Unit
+ def createTable(zkAddr: String,
+ tableName: String,
+ cfs: List[String],
+ regionMultiplier: Int,
+ ttl: Option[Int],
+ compressionAlgorithm: String): Unit
def toEdge[K: CanSKeyValue](kv: K,
queryParam: QueryParam,
@@ -133,5 +141,7 @@ abstract class Storage(implicit ec: ExecutionContext) {
}
}
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 74f580e..3ccb473 100644
--- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -11,6 +11,12 @@ import com.kakao.s2graph.core.types._
import com.kakao.s2graph.core.utils.{Extensions, logger}
import com.stumbleupon.async.Deferred
import com.typesafe.config.Config
+import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability}
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.regionserver.BloomType
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.hbase.async._
import scala.collection.JavaConversions._
@@ -44,8 +50,8 @@ object AsynchbaseStorage {
}
}
-class AsynchbaseStorage(val config: Config, vertexCache: Cache[Integer, Option[Vertex]])
- (implicit ec: ExecutionContext) extends Storage {
+class AsynchbaseStorage(override val config: Config, vertexCache: Cache[Integer, Option[Vertex]])
+ (implicit ec: ExecutionContext) extends Storage(config) {
import AsynchbaseStorage._
@@ -737,4 +743,72 @@ class AsynchbaseStorage(val config: Config, vertexCache: Cache[Integer, Option[V
Await.result(client.flush().toFuture, timeout)
}
+
+ def createTable(zkAddr: String,
+ tableName: String,
+ cfs: List[String],
+ regionMultiplier: Int,
+ ttl: Option[Int],
+ compressionAlgorithm: String): Unit = {
+ logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
+ val admin = getAdmin(zkAddr)
+ val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
+ if (!admin.tableExists(TableName.valueOf(tableName))) {
+ try {
+ val desc = new HTableDescriptor(TableName.valueOf(tableName))
+ desc.setDurability(Durability.ASYNC_WAL)
+ for (cf <- cfs) {
+ val columnDesc = new HColumnDescriptor(cf)
+ .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+ .setBloomFilterType(BloomType.ROW)
+ .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+ .setMaxVersions(1)
+ .setTimeToLive(2147483647)
+ .setMinVersions(0)
+ .setBlocksize(32768)
+ .setBlockCacheEnabled(true)
+ if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+ desc.addFamily(columnDesc)
+ }
+
+ if (regionCount <= 1) admin.createTable(desc)
+ else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
+ } catch {
+ case e: Throwable =>
+ logger.error(s"$zkAddr, $tableName failed with $e", e)
+ throw e
+ }
+ } else {
+ logger.info(s"$zkAddr, $tableName, $cfs already exist.")
+ }
+ }
+
+
+ private def getAdmin(zkAddr: String) = {
+ val conf = HBaseConfiguration.create()
+ conf.set("hbase.zookeeper.quorum", zkAddr)
+ val conn = ConnectionFactory.createConnection(conf)
+ conn.getAdmin
+ }
+ private def enableTable(zkAddr: String, tableName: String) = {
+ getAdmin(zkAddr).enableTable(TableName.valueOf(tableName))
+ }
+
+ private def disableTable(zkAddr: String, tableName: String) = {
+ getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
+ }
+
+ private def dropTable(zkAddr: String, tableName: String) = {
+ getAdmin(zkAddr).disableTable(TableName.valueOf(tableName))
+ getAdmin(zkAddr).deleteTable(TableName.valueOf(tableName))
+ }
+
+ private def getStartKey(regionCount: Int): Array[Byte] = {
+ Bytes.toBytes((Int.MaxValue / regionCount))
+ }
+
+ private def getEndKey(regionCount: Int): Array[Byte] = {
+ Bytes.toBytes((Int.MaxValue / regionCount * (regionCount - 1)))
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
index 0ced48f..e60b824 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/Integrate/IntegrateCommon.scala
@@ -17,11 +17,13 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
var graph: Graph = _
var parser: RequestParser = _
+ var management: Management = _
var config: Config = _
override def beforeAll = {
config = ConfigFactory.load()
graph = new Graph(config)(ExecutionContext.Implicits.global)
+ management = new Management(graph)
parser = new RequestParser(graph.config)
initTestData()
}
@@ -43,7 +45,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
parser.toServiceElements(jsValue)
val tryRes =
- Management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
+ management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
println(s">> Service created : $createService, $tryRes")
val labelNames = Map(testLabelName -> testLabelNameCreate,
@@ -60,7 +62,7 @@ trait IntegrateCommon extends FunSuite with Matchers with BeforeAndAfterAll {
val json = Json.parse(create)
val tryRes = for {
labelArgs <- parser.toLabelElements(json)
- label <- (Management.createLabel _).tupled(labelArgs)
+ label <- (management.createLabel _).tupled(labelArgs)
} yield label
tryRes.get
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
index e2a3363..4fa7b59 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/TestCommonWithModels.scala
@@ -18,10 +18,12 @@ trait TestCommonWithModels {
var graph: Graph = _
var config: Config = _
+ var management: Management = _
def initTests() = {
config = ConfigFactory.load()
graph = new Graph(config)(ExecutionContext.Implicits.global)
+ management = new Management(graph)
implicit val session = AutoSession
@@ -75,8 +77,8 @@ trait TestCommonWithModels {
def createTestService() = {
implicit val session = AutoSession
- Management.createService(serviceName, cluster, hTableName, preSplitSize, hTableTTL = None, "gz")
- Management.createService(serviceNameV2, cluster, hTableName, preSplitSize, hTableTTL = None, "gz")
+ management.createService(serviceName, cluster, hTableName, preSplitSize, hTableTTL = None, "gz")
+ management.createService(serviceNameV2, cluster, hTableName, preSplitSize, hTableTTL = None, "gz")
}
def deleteTestService() = {
@@ -95,16 +97,16 @@ trait TestCommonWithModels {
def createTestLabel() = {
implicit val session = AutoSession
- Management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType,
+ management.createLabel(labelName, serviceName, columnName, columnType, serviceName, columnName, columnType,
isDirected = true, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4")
- Management.createLabel(labelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
+ management.createLabel(labelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
isDirected = true, serviceNameV2, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
- Management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType,
+ management.createLabel(undirectedLabelName, serviceName, columnName, columnType, serviceName, tgtColumnName, tgtColumnType,
isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION1, false, "lg4")
- Management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
+ management.createLabel(undirectedLabelNameV2, serviceNameV2, columnNameV2, columnTypeV2, serviceNameV2, tgtColumnNameV2, tgtColumnTypeV2,
isDirected = false, serviceName, testIdxProps, testProps, consistencyLevel, Some(hTableName), hTableTTL, VERSION2, false, "lg4")
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
----------------------------------------------------------------------
diff --git a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
index 393d8b1..659983c 100644
--- a/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
+++ b/s2core/src/test/scala/com/kakao/s2graph/core/parsers/WhereParserTest.scala
@@ -8,6 +8,8 @@ import play.api.libs.json.Json
class WhereParserTest extends FunSuite with Matchers with TestCommonWithModels {
// dummy data for dummy edge
+ initTests()
+
import HBaseType.{VERSION1, VERSION2}
val ts = System.currentTimeMillis()
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala b/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala
index b946f0a..a664de4 100644
--- a/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala
+++ b/s2counter_core/src/main/scala/s2/counter/core/v1/ExactStorageHBase.scala
@@ -1,5 +1,6 @@
package s2.counter.core.v1
+import com.kakao.s2graph.core.Graph
import com.typesafe.config.Config
import org.apache.hadoop.hbase.CellUtil
import org.apache.hadoop.hbase.client._
@@ -17,9 +18,7 @@ import scala.collection.JavaConversions._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
-/**
- * Created by hsleep(honeysleep@gmail.com) on 2015. 10. 1..
- */
+
class ExactStorageHBase(config: Config) extends ExactStorage {
import ExactStorageHBase._
@@ -30,6 +29,8 @@ class ExactStorageHBase(config: Config) extends ExactStorage {
private[counter] val withHBase = new WithHBase(config)
private[counter] val hbaseManagement = new Management(config)
+
+
private def getTableName(policy: Counter): String = {
policy.hbaseTable.getOrElse(s2config.HBASE_TABLE_NAME)
}
@@ -280,6 +281,7 @@ class ExactStorageHBase(config: Config) extends ExactStorage {
override def prepare(policy: Counter): Unit = {
// create hbase table
policy.hbaseTable.foreach { table =>
+
if (!hbaseManagement.tableExists(s2config.HBASE_ZOOKEEPER_QUORUM, table)) {
hbaseManagement.createTable(s2config.HBASE_ZOOKEEPER_QUORUM, table,
ColumnFamily.values.map(_.toString).toList, 1)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala b/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala
index 8547bc8..3cf9181 100644
--- a/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala
+++ b/s2counter_core/src/main/scala/s2/helper/CounterAdmin.scala
@@ -1,5 +1,6 @@
package s2.helper
+import com.kakao.s2graph.core.Graph
import com.kakao.s2graph.core.mysqls.Label
import com.typesafe.config.Config
import play.api.libs.json.Json
@@ -18,11 +19,13 @@ class CounterAdmin(config: Config) {
val s2config = new S2CounterConfig(config)
val counterModel = new CounterModel(config)
val graphOp = new GraphOperation(config)
+ val s2graph = new Graph(config)(scala.concurrent.ExecutionContext.global)
+ val storageManagement = new com.kakao.s2graph.core.Management(s2graph)
def setupCounterOnGraph(): Unit = {
// create s2counter service
val service = "s2counter"
- com.kakao.s2graph.core.Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"$service-${config.getString("phase")}", 1, None, "gz")
+ storageManagement.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"$service-${config.getString("phase")}", 1, None, "gz")
// create bucket label
val label = "s2counter_topK_bucket"
if (Label.findByName(label, useCache = false).isEmpty) {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala
----------------------------------------------------------------------
diff --git a/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala b/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala
index dd6a6bf..ea67a2a 100644
--- a/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala
+++ b/s2counter_core/src/test/scala/s2/counter/core/RankingCounterSpec.scala
@@ -1,5 +1,6 @@
package s2.counter.core
+import com.kakao.s2graph.core.{Management, Graph}
import com.kakao.s2graph.core.mysqls.Label
import com.typesafe.config.ConfigFactory
import org.specs2.mutable.Specification
@@ -70,7 +71,9 @@ class RankingCounterSpec extends Specification with BeforeAfterAll {
}
val graphOp = new GraphOperation(config)
- com.kakao.s2graph.core.Management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz")
+ val graph = new Graph(config)(scala.concurrent.ExecutionContext.global)
+ val management = new Management(graph)
+ management.createService(service, s2config.HBASE_ZOOKEEPER_QUORUM, s"${service}_dev", 1, None, "gz")
val strJs =
s"""
|{
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2rest_play/app/Bootstrap.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/Bootstrap.scala b/s2rest_play/app/Bootstrap.scala
index a661ac3..a1acb10 100644
--- a/s2rest_play/app/Bootstrap.scala
+++ b/s2rest_play/app/Bootstrap.scala
@@ -5,7 +5,7 @@ import java.util.concurrent.Executors
import actors.QueueActor
import com.kakao.s2graph.core.rest.RequestParser
import com.kakao.s2graph.core.utils.logger
-import com.kakao.s2graph.core.{ExceptionHandler, Graph}
+import com.kakao.s2graph.core.{Management, ExceptionHandler, Graph}
import config.Config
import controllers.{AdminController, ApplicationController}
import play.api.Application
@@ -18,6 +18,7 @@ import scala.util.Try
object Global extends WithFilters(new GzipFilter()) {
var s2graph: Graph = _
+ var storageManagement: Management = _
var s2parser: RequestParser = _
// Application entry point
@@ -32,6 +33,7 @@ object Global extends WithFilters(new GzipFilter()) {
// init s2graph with config
s2graph = new Graph(config)(ec)
+ storageManagement = new Management(s2graph)
s2parser = new RequestParser(s2graph.config) // merged config
QueueActor.init(s2graph)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66be5c8c/s2rest_play/app/controllers/AdminController.scala
----------------------------------------------------------------------
diff --git a/s2rest_play/app/controllers/AdminController.scala b/s2rest_play/app/controllers/AdminController.scala
index 2eb248b..bb73c40 100644
--- a/s2rest_play/app/controllers/AdminController.scala
+++ b/s2rest_play/app/controllers/AdminController.scala
@@ -14,6 +14,7 @@ import scala.util.{Failure, Success, Try}
object AdminController extends Controller {
import ApplicationController._
+ private val management: Management = com.kakao.s2graph.rest.Global.storageManagement
private val requestParser: RequestParser = com.kakao.s2graph.rest.Global.s2parser
/**
@@ -181,9 +182,10 @@ object AdminController extends Controller {
tryResponse(serviceTry)(_.toJson)
}
+
def createServiceInner(jsValue: JsValue) = {
val (serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm) = requestParser.toServiceElements(jsValue)
- Management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
+ management.createService(serviceName, cluster, tableName, preSplitSize, ttl, compressionAlgorithm)
}
/**
@@ -195,9 +197,10 @@ object AdminController extends Controller {
tryResponse(ret)(_.toJson)
}
+
def createLabelInner(json: JsValue) = for {
labelArgs <- requestParser.toLabelElements(json)
- label <- (Management.createLabel _).tupled(labelArgs)
+ label <- (management.createLabel _).tupled(labelArgs)
} yield label
/**
@@ -322,7 +325,7 @@ object AdminController extends Controller {
* @return
*/
def copyLabel(oldLabelName: String, newLabelName: String) = Action { request =>
- val copyTry = Management.copyLabel(oldLabelName, newLabelName, Some(newLabelName))
+ val copyTry = management.copyLabel(oldLabelName, newLabelName, Some(newLabelName))
tryResponse(copyTry)(_.label + "created")
}
@@ -408,7 +411,9 @@ object AdminController extends Controller {
// Management.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm)
request.body.asJson.map(_.validate[HTableParams] match {
case JsSuccess(hTableParams, _) => {
- Management.createTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"), hTableParams.preSplitSize, hTableParams.hTableTTL, hTableParams.compressionAlgorithm.getOrElse(Management.defaultCompressionAlgorithm))
+ management.createTable(hTableParams.cluster, hTableParams.hTableName, List("e", "v"),
+ hTableParams.preSplitSize, hTableParams.hTableTTL,
+ hTableParams.compressionAlgorithm.getOrElse(Management.DefaultCompressionAlgorithm))
logger.info(hTableParams.toString())
ok(s"HTable was created.")
}