You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/11/16 16:33:36 UTC
[3/7] incubator-s2graph git commit: [S2GRAPH-122]: Change data types
of Edge/IndexEdge/SnapshotEdge.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
index 69926fa..d2a7de7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageDeserializable.scala
@@ -21,6 +21,7 @@ package org.apache.s2graph.core.storage
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.QueryParam
+import org.apache.s2graph.core.mysqls.{LabelMeta, Label}
import org.apache.s2graph.core.types.{HBaseType, InnerVal, InnerValLike, InnerValLikeWithTs}
import org.apache.s2graph.core.utils.logger
@@ -36,16 +37,17 @@ object StorageDeserializable {
def bytesToKeyValues(bytes: Array[Byte],
offset: Int,
length: Int,
- version: String): (Array[(Byte, InnerValLike)], Int) = {
+ schemaVer: String,
+ label: Label): (Array[(LabelMeta, InnerValLike)], Int) = {
var pos = offset
val len = bytes(pos)
pos += 1
- val kvs = new Array[(Byte, InnerValLike)](len)
+ val kvs = new Array[(LabelMeta, InnerValLike)](len)
var i = 0
while (i < len) {
- val k = bytes(pos)
+ val k = label.labelMetaMap(bytes(pos))
pos += 1
- val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
+ val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer)
pos += numOfBytesUsed
kvs(i) = (k -> v)
i += 1
@@ -57,16 +59,17 @@ object StorageDeserializable {
def bytesToKeyValuesWithTs(bytes: Array[Byte],
offset: Int,
- version: String): (Array[(Byte, InnerValLikeWithTs)], Int) = {
+ schemaVer: String,
+ label: Label): (Array[(LabelMeta, InnerValLikeWithTs)], Int) = {
var pos = offset
val len = bytes(pos)
pos += 1
- val kvs = new Array[(Byte, InnerValLikeWithTs)](len)
+ val kvs = new Array[(LabelMeta, InnerValLikeWithTs)](len)
var i = 0
while (i < len) {
- val k = bytes(pos)
+ val k = label.labelMetaMap(bytes(pos))
pos += 1
- val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, version)
+ val (v, numOfBytesUsed) = InnerValLikeWithTs.fromBytes(bytes, pos, 0, schemaVer)
pos += numOfBytesUsed
kvs(i) = (k -> v)
i += 1
@@ -78,15 +81,15 @@ object StorageDeserializable {
def bytesToProps(bytes: Array[Byte],
offset: Int,
- version: String): (Array[(Byte, InnerValLike)], Int) = {
+ schemaVer: String): (Array[(LabelMeta, InnerValLike)], Int) = {
var pos = offset
val len = bytes(pos)
pos += 1
- val kvs = new Array[(Byte, InnerValLike)](len)
+ val kvs = new Array[(LabelMeta, InnerValLike)](len)
var i = 0
while (i < len) {
- val k = HBaseType.EMPTY_SEQ_BYTE
- val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
+ val k = LabelMeta.empty
+ val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, schemaVer)
pos += numOfBytesUsed
kvs(i) = (k -> v)
i += 1
@@ -98,17 +101,19 @@ object StorageDeserializable {
}
def bytesToLong(bytes: Array[Byte], offset: Int): Long = Bytes.toLong(bytes, offset)
+
+ def bytesToInt(bytes: Array[Byte], offset: Int): Int = Bytes.toInt(bytes, offset)
}
trait StorageDeserializable[E] {
- def fromKeyValues[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], version: String, cacheElementOpt: Option[E]): Option[E] = {
+ def fromKeyValues[T: CanSKeyValue](checkLabel: Option[Label], kvs: Seq[T], version: String, cacheElementOpt: Option[E]): Option[E] = {
try {
- Option(fromKeyValuesInner(queryParam, kvs, version, cacheElementOpt))
+ Option(fromKeyValuesInner(checkLabel, kvs, version, cacheElementOpt))
} catch {
case e: Exception =>
logger.error(s"${this.getClass.getName} fromKeyValues failed.", e)
None
}
}
- def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam, kvs: Seq[T], version: String, cacheElementOpt: Option[E]): E
+ def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], kvs: Seq[T], version: String, cacheElementOpt: Option[E]): E
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
index b7326f5..c1efe7b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageSerializable.scala
@@ -20,31 +20,33 @@
package org.apache.s2graph.core.storage
import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.LabelMeta
import org.apache.s2graph.core.types.{InnerValLike, InnerValLikeWithTs}
+import org.apache.s2graph.core.utils.logger
object StorageSerializable {
/** serializer */
- def propsToBytes(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
+ def propsToBytes(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = {
val len = props.length
assert(len < Byte.MaxValue)
var bytes = Array.fill(1)(len.toByte)
- for ((k, v) <- props) bytes = Bytes.add(bytes, v.bytes)
+ for ((_, v) <- props) bytes = Bytes.add(bytes, v.bytes)
bytes
}
- def propsToKeyValues(props: Seq[(Byte, InnerValLike)]): Array[Byte] = {
+ def propsToKeyValues(props: Seq[(LabelMeta, InnerValLike)]): Array[Byte] = {
val len = props.length
assert(len < Byte.MaxValue)
var bytes = Array.fill(1)(len.toByte)
- for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
+ for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes)
bytes
}
- def propsToKeyValuesWithTs(props: Seq[(Byte, InnerValLikeWithTs)]): Array[Byte] = {
+ def propsToKeyValuesWithTs(props: Seq[(LabelMeta, InnerValLikeWithTs)]): Array[Byte] = {
val len = props.length
assert(len < Byte.MaxValue)
var bytes = Array.fill(1)(len.toByte)
- for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k), v.bytes)
+ for ((k, v) <- props) bytes = Bytes.add(bytes, Array.fill(1)(k.seq), v.bytes)
bytes
}
@@ -53,13 +55,17 @@ object StorageSerializable {
val byte = labelOrderSeq << 1 | (if (isInverted) 1 else 0)
Array.fill(1)(byte.toByte)
}
+
+ def intToBytes(value: Int): Array[Byte] = Bytes.toBytes(value)
+
+ def longToBytes(value: Long): Array[Byte] = Bytes.toBytes(value)
}
trait StorageSerializable[E] {
val cf = Serializable.edgeCf
- val table: Array[Byte]
- val ts: Long
+ def table: Array[Byte]
+ def ts: Long
def toRowKey: Array[Byte]
def toQualifier: Array[Byte]
@@ -70,7 +76,7 @@ trait StorageSerializable[E] {
val qualifier = toQualifier
val value = toValue
val kv = SKeyValue(table, row, cf, qualifier, value, ts)
-
+// logger.debug(s"[SER]: ${kv.toLogString}}")
Seq(kv)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index b52ba53..e63dfea 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -19,11 +19,13 @@
package org.apache.s2graph.core.storage.hbase
+
+
import java.util
import java.util.Base64
-import com.stumbleupon.async.Deferred
-import com.typesafe.config.{Config, ConfigFactory}
+import com.stumbleupon.async.{Callback, Deferred}
+import com.typesafe.config.Config
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{ConnectionFactory, Durability}
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
@@ -35,15 +37,17 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.LabelMeta
import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange}
import org.apache.s2graph.core.types.{HBaseType, VertexId}
import org.apache.s2graph.core.utils._
import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
import org.hbase.async._
import scala.collection.JavaConversions._
-import scala.collection.{Map, Seq}
-import scala.concurrent.duration.Duration
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
+import scala.concurrent.duration.Duration
+import scala.util.Try
import scala.util.hashing.MurmurHash3
@@ -79,12 +83,15 @@ object AsynchbaseStorage {
logger.info(s"Asynchbase: ${client.getConfig.dumpConfiguration()}")
client
}
+
+ case class ScanWithRange(scan: Scanner, offset: Int, limit: Int)
+ type AsyncRPC = Either[GetRequest, ScanWithRange]
}
class AsynchbaseStorage(override val graph: Graph,
override val config: Config)(implicit ec: ExecutionContext)
- extends Storage[Deferred[StepInnerResult]](graph, config) {
+ extends Storage[AsyncRPC, Deferred[StepResult]](graph, config) {
import Extensions.DeferOps
@@ -92,26 +99,41 @@ class AsynchbaseStorage(override val graph: Graph,
* Asynchbase client setup.
* note that we need two client, one for bulk(withWait=false) and another for withWait=true
*/
- val configWithFlush = config.withFallback(ConfigFactory.parseMap(Map("hbase.rpcs.buffered_flush_interval" -> "0")))
- val client = AsynchbaseStorage.makeClient(config)
-
- private val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0")
- private val clients = Seq(client, clientWithFlush)
private val clientFlushInterval = config.getInt("hbase.rpcs.buffered_flush_interval").toString().toShort
+
+ /**
+ * since some runtime environment such as spark cluster has issue with guava version, that is used in Asynchbase.
+ * to fix version conflict, make this as lazy val for clients that don't require hbase client.
+ */
+ lazy val client = AsynchbaseStorage.makeClient(config)
+ lazy val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0")
+ lazy val clients = Seq(client, clientWithFlush)
+
private val emptyKeyValues = new util.ArrayList[KeyValue]()
+ private val emptyStepResult = new util.ArrayList[StepResult]()
+
private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
+ import CanDefer._
+
/** Future Cache to squash request */
- private val futureCache = new DeferCache[StepInnerResult, Deferred, Deferred](config, StepInnerResult.Empty, "FutureCache", useMetric = true)
+ lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true)
/** Simple Vertex Cache */
- private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue])
+ lazy private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue])
private val zkQuorum = config.getString("hbase.zookeeper.quorum")
private val zkQuorumSlave =
- if (config.hasPath("hbase.zookeeper.quorum")) Option(config.getString("hbase.zookeeper.quorum"))
+ if (config.hasPath("hbase.slave.zookeeper.quorum")) Option(config.getString("hbase.slave.zookeeper.quorum"))
else None
+ /** v4 max next row size */
+ private val v4_max_num_rows = 10000
+ private def getV4MaxNumRows(limit : Int): Int = {
+ if (limit < v4_max_num_rows) limit
+ else v4_max_num_rows
+ }
+
/**
* fire rpcs into proper hbase cluster using client and
* return true on all mutation success. otherwise return false.
@@ -120,37 +142,75 @@ class AsynchbaseStorage(override val graph: Graph,
if (kvs.isEmpty) Future.successful(true)
else {
val _client = client(withWait)
- val futures = kvs.map { kv =>
- val _defer = kv.operation match {
- case SKeyValue.Put => _client.put(new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp))
- case SKeyValue.Delete =>
- if (kv.qualifier == null) _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, kv.timestamp))
- else _client.delete(new DeleteRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.timestamp))
- case SKeyValue.Increment =>
- _client.atomicIncrement(new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value)))
- }
- val future = _defer.withCallback { ret => true }.recoverWith { ex =>
+ val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment)
+
+ /** Asynchbase IncrementRequest does not implement HasQualifiers */
+ val incrementsFutures = increments.map { kv =>
+ val inc = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))
+ val defer = _client.atomicIncrement(inc)
+ val future = defer.toFuture(Long.box(0)).map(_ => true).recover { case ex: Exception =>
logger.error(s"mutation failed. $kv", ex)
false
- }.toFuture
-
+ }
if (withWait) future else Future.successful(true)
}
- Future.sequence(futures).map(_.forall(identity))
+ /** PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
+ val othersFutures = putAndDeletes.groupBy { kv =>
+ (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp)
+ }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) =>
+
+ val durability = groupedKeyValues.head.durability
+ val qualifiers = new ArrayBuffer[Array[Byte]]()
+ val values = new ArrayBuffer[Array[Byte]]()
+
+ groupedKeyValues.foreach { kv =>
+ if (kv.qualifier != null) qualifiers += kv.qualifier
+ if (kv.value != null) values += kv.value
+ }
+ val defer = operation match {
+ case SKeyValue.Put =>
+ val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp)
+ put.setDurable(durability)
+ _client.put(put)
+ case SKeyValue.Delete =>
+ val delete =
+ if (qualifiers.isEmpty)
+ new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp)
+ else
+ new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp)
+ delete.setDurable(durability)
+ _client.delete(delete)
+ }
+ if (withWait) {
+ defer.toFuture(new AnyRef()).map(_ => true).recover { case ex: Exception =>
+ groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) }
+ false
+ }
+ } else Future.successful(true)
+ }
+ for {
+ incrementRets <- Future.sequence(incrementsFutures)
+ otherRets <- Future.sequence(othersFutures)
+ } yield (incrementRets ++ otherRets).forall(identity)
}
}
-
- override def fetchSnapshotEdgeKeyValues(hbaseRpc: AnyRef): Future[Seq[SKeyValue]] = {
- val defer = fetchKeyValuesInner(hbaseRpc)
- defer.toFuture.map { kvsArr =>
+ private def fetchKeyValues(rpc: AsyncRPC): Future[Seq[SKeyValue]] = {
+ val defer = fetchKeyValuesInner(rpc)
+ defer.toFuture(emptyKeyValues).map { kvsArr =>
kvsArr.map { kv =>
implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
- } toSeq
+ }
}
}
+ override def fetchSnapshotEdgeKeyValues(queryRequest: QueryRequest): Future[Seq[SKeyValue]] = {
+ val edge = toRequestEdge(queryRequest, Nil)
+ val rpc = buildRequest(queryRequest, edge)
+ fetchKeyValues(rpc)
+ }
+
/**
* since HBase natively provide CheckAndSet on storage level, implementation becomes simple.
* @param rpc: key value that is need to be stored on storage.
@@ -162,7 +222,7 @@ class AsynchbaseStorage(override val graph: Graph,
override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue]): Future[Boolean] = {
val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp)
val expected = expectedOpt.map(_.value).getOrElse(Array.empty)
- client(withWait = true).compareAndSet(put, expected).withCallback(ret => ret.booleanValue()).toFuture
+ client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true)
}
@@ -182,24 +242,24 @@ class AsynchbaseStorage(override val graph: Graph,
* @param queryRequest
* @return Scanner or GetRequest with proper setup with StartKey, EndKey, RangeFilter.
*/
- override def buildRequest(queryRequest: QueryRequest): AnyRef = {
+ override def buildRequest(queryRequest: QueryRequest, edge: Edge): AsyncRPC = {
import Serializable._
val queryParam = queryRequest.queryParam
val label = queryParam.label
- val edge = toRequestEdge(queryRequest)
val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
val snapshotEdge = edge.toSnapshotEdge
snapshotEdgeSerializer(snapshotEdge)
} else {
- val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.labelWithDir,
+ val indexEdge = IndexEdge(edge.srcVertex, edge.tgtVertex, edge.label, edge.dir,
edge.op, edge.version, queryParam.labelOrderSeq, edge.propsWithTs)
indexEdgeSerializer(indexEdge)
}
- val (rowKey, qualifier) = (serializer.toRowKey, serializer.toQualifier)
+ val rowKey = serializer.toRowKey
+ val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue))
- val (minTs, maxTs) = queryParam.duration.getOrElse((0L, Long.MaxValue))
+ val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
label.schemaVersion match {
case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
@@ -215,28 +275,26 @@ class AsynchbaseStorage(override val graph: Graph,
val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
val labelWithDirBytes = indexEdge.labelWithDir.bytes
val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
- // val labelIndexSeqWithIsInvertedStopBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = true)
- val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, Bytes.add(labelIndexSeqWithIsInvertedBytes, Array.fill(1)(edge.op)))
+ val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+
val (startKey, stopKey) =
- if (queryParam.columnRangeFilter != null) {
+ if (queryParam.intervalOpt.isDefined) {
// interval is set.
val _startKey = queryParam.cursorOpt match {
- case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), Array.fill(1)(0))
- case None => Bytes.add(baseKey, queryParam.columnRangeFilterMinBytes)
+ case Some(cursor) => Base64.getDecoder.decode(cursor)
+ case None => Bytes.add(baseKey, intervalMaxBytes)
}
- (_startKey, Bytes.add(baseKey, queryParam.columnRangeFilterMaxBytes))
+ (_startKey , Bytes.add(baseKey, intervalMinBytes))
} else {
- /*
- * note: since propsToBytes encode size of property map at first byte, we are sure about max value here
- */
+ /**
+ * note: since propsToBytes encode size of property map at first byte, we are sure about max value here
+ */
val _startKey = queryParam.cursorOpt match {
- case Some(cursor) => Bytes.add(Base64.getDecoder.decode(cursor), Array.fill(1)(0))
+ case Some(cursor) => Base64.getDecoder.decode(cursor)
case None => baseKey
}
(_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
}
-// logger.debug(s"[StartKey]: ${startKey.toList}")
-// logger.debug(s"[StopKey]: ${stopKey.toList}")
scanner.setStartKey(startKey)
scanner.setStopKey(stopKey)
@@ -244,15 +302,23 @@ class AsynchbaseStorage(override val graph: Graph,
if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")
scanner.setMaxVersions(1)
- scanner.setMaxNumRows(queryParam.offset + queryParam.limit)
+ // TODO: exclusive condition innerOffset with cursorOpt
+ if (queryParam.cursorOpt.isDefined) {
+ scanner.setMaxNumRows(getV4MaxNumRows(queryParam.limit))
+ } else {
+ scanner.setMaxNumRows(getV4MaxNumRows(queryParam.innerOffset + queryParam.innerLimit))
+ }
scanner.setMaxTimestamp(maxTs)
scanner.setMinTimestamp(minTs)
- scanner.setRpcTimeout(queryParam.rpcTimeoutInMillis)
+ scanner.setRpcTimeout(queryParam.rpcTimeout)
+
// SET option for this rpc properly.
- scanner
+ if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit))
+ else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit))
+
case _ =>
val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
- new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, qualifier)
+ new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, serializer.toQualifier)
} else {
new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
}
@@ -261,12 +327,14 @@ class AsynchbaseStorage(override val graph: Graph,
get.setFailfast(true)
get.setMinTimestamp(minTs)
get.setMaxTimestamp(maxTs)
- get.setTimeout(queryParam.rpcTimeoutInMillis)
+ get.setTimeout(queryParam.rpcTimeout)
val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset)
- get.setFilter(new FilterList(pagination +: Option(queryParam.columnRangeFilter).toSeq, MUST_PASS_ALL))
-
- get
+ val columnRangeFilterOpt = queryParam.intervalOpt.map { interval =>
+ new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true)
+ }
+ get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL))
+ Left(get)
}
}
@@ -274,81 +342,81 @@ class AsynchbaseStorage(override val graph: Graph,
* we are using future cache to squash requests into same key on storage.
*
* @param queryRequest
- * @param prevStepScore
* @param isInnerCall
* @param parentEdges
* @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future.
* seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback
*/
override def fetch(queryRequest: QueryRequest,
- prevStepScore: Double,
isInnerCall: Boolean,
- parentEdges: Seq[EdgeWithScore]): Deferred[StepInnerResult] = {
+ parentEdges: Seq[EdgeWithScore]): Deferred[StepResult] = {
- def fetchInner(hbaseRpc: AnyRef): Deferred[StepInnerResult] = {
- val queryParam = queryRequest.queryParam
+ def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = {
+ val prevStepScore = queryRequest.prevStepScore
+ val fallbackFn: (Exception => StepResult) = { ex =>
+ logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
+ StepResult.Failure
+ }
- fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
- val (startOffset, length) = queryParam.label.schemaVersion match {
- case HBaseType.VERSION4 => (queryParam.offset, queryParam.limit)
+ val queryParam = queryRequest.queryParam
+ fetchKeyValuesInner(hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { kvs =>
+ val (startOffset, len) = queryParam.label.schemaVersion match {
+ case HBaseType.VERSION4 =>
+ val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset
+ (offset, queryParam.limit)
case _ => (0, kvs.length)
}
- val edgeWithScores = toEdges(kvs, queryParam, prevStepScore, isInnerCall, parentEdges, startOffset, length)
- if (edgeWithScores.isEmpty) StepInnerResult.Empty
- else {
- val head = edgeWithScores.head
- val (degreeEdges, indexEdges) =
- if (head.edge.isDegree) (Seq(head), edgeWithScores.tail)
- else (Nil, edgeWithScores)
-
- val normalized =
- if (queryRequest.queryParam.shouldNormalize) normalize(indexEdges)
- else indexEdges
-
- val sampled = if (queryRequest.queryParam.sample >= 0) {
- sample(queryRequest, normalized, queryRequest.queryParam.sample)
- } else normalized
-
- StepInnerResult(edgesWithScoreLs = sampled, degreeEdges)
- }
- } recoverWith { ex =>
- logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
- StepInnerResult.Failure
+ toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len)
}
}
val queryParam = queryRequest.queryParam
val cacheTTL = queryParam.cacheTTLInMillis
- val request = buildRequest(queryRequest)
+ /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
+
+ val edge = toRequestEdge(queryRequest, parentEdges)
+ val request = buildRequest(queryRequest, edge)
+ val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
+ val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes)
if (cacheTTL <= 0) fetchInner(request)
else {
- val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
+ val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey)
+
+// val cacheKeyBytes = toCacheKeyBytes(request)
val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
}
}
+ override def fetches(queryRequests: Seq[QueryRequest],
+ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]]): Future[Seq[StepResult]] = {
+ val defers: Seq[Deferred[StepResult]] = for {
+ queryRequest <- queryRequests
+ } yield {
+ val queryOption = queryRequest.query.queryOption
+ val queryParam = queryRequest.queryParam
+ val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
+ val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+ fetch(queryRequest, isInnerCall = false, parentEdges)
+ }
- override def fetches(queryRequestWithScoreLs: scala.Seq[(QueryRequest, Double)],
- prevStepEdges: Predef.Map[VertexId, scala.Seq[EdgeWithScore]]): Future[scala.Seq[StepInnerResult]] = {
- val defers: Seq[Deferred[StepInnerResult]] = for {
- (queryRequest, prevStepScore) <- queryRequestWithScoreLs
- parentEdges <- prevStepEdges.get(queryRequest.vertex.id)
- } yield fetch(queryRequest, prevStepScore, isInnerCall = false, parentEdges)
-
- val grouped: Deferred[util.ArrayList[StepInnerResult]] = Deferred.group(defers)
- grouped withCallback {
- queryResults: util.ArrayList[StepInnerResult] =>
- queryResults.toIndexedSeq
- } toFuture
+ val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers)
+ grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] =>
+ queryResults.toSeq
+ }.toFuture(emptyStepResult)
}
- def fetchVertexKeyValues(request: AnyRef): Future[Seq[SKeyValue]] = fetchSnapshotEdgeKeyValues(request)
+ def fetchVertexKeyValues(request: QueryRequest): Future[Seq[SKeyValue]] = {
+ val edge = toRequestEdge(request, Nil)
+ fetchKeyValues(buildRequest(request, edge))
+ }
+ def fetchVertexKeyValues(request: AsyncRPC): Future[Seq[SKeyValue]] = fetchKeyValues(request)
+
/**
* when withWait is given, we use client with flushInterval set to 0.
* if we are not using this, then we are adding extra wait time as much as flushInterval in worst case.
@@ -357,35 +425,44 @@ class AsynchbaseStorage(override val graph: Graph,
* @param withWait
* @return
*/
- override def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = {
+ override def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = {
+
val _client = client(withWait)
- val defers: Seq[Deferred[(Boolean, Long)]] = for {
+ val defers: Seq[Deferred[(Boolean, Long, Long)]] = for {
edge <- edges
} yield {
- val edgeWithIndex = edge.edgesWithIndex.head
- val countWithTs = edge.propsWithTs(LabelMeta.countSeq)
- val countVal = countWithTs.innerVal.toString().toLong
- val kv = buildIncrementsCountAsync(edgeWithIndex, countVal).head
- val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))
- val defer = _client.bufferAtomicIncrement(request) withCallback { resultCount: java.lang.Long =>
- (true, resultCount.longValue())
- } recoverWith { ex =>
- logger.error(s"mutation failed. $request", ex)
- (false, -1L)
+ val futures: List[Deferred[(Boolean, Long, Long)]] = for {
+ relEdge <- edge.relatedEdges
+ edgeWithIndex <- relEdge.edgesWithIndexValid
+ } yield {
+ val countWithTs = edge.propsWithTs(LabelMeta.count)
+ val countVal = countWithTs.innerVal.toString().toLong
+ val kv = buildIncrementsCountAsync(edgeWithIndex, countVal).head
+ val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, Bytes.toLong(kv.value))
+ val fallbackFn: (Exception => (Boolean, Long, Long)) = { ex =>
+ logger.error(s"mutation failed. $request", ex)
+ (false, -1L, -1L)
+ }
+ val defer = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long =>
+ (true, resultCount.longValue(), countVal)
+ }
+ if (withWait) defer
+ else Deferred.fromResult((true, -1L, -1L))
}
- if (withWait) defer
- else Deferred.fromResult((true, -1L))
+
+ val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = Deferred.group(futures)
+ grouped.map(new util.ArrayList[(Boolean, Long, Long)]()) { resultLs => resultLs.head }
}
- val grouped: Deferred[util.ArrayList[(Boolean, Long)]] = Deferred.groupInOrder(defers)
- grouped.toFuture.map(_.toSeq)
+ val grouped: Deferred[util.ArrayList[(Boolean, Long, Long)]] = Deferred.groupInOrder(defers)
+ grouped.toFuture(new util.ArrayList[(Boolean, Long, Long)]()).map(_.toSeq)
}
override def flush(): Unit = clients.foreach { client =>
super.flush()
val timeout = Duration((clientFlushInterval + 10) * 20, duration.MILLISECONDS)
- Await.result(client.flush().toFuture, timeout)
+ Await.result(client.flush().toFuture(new AnyRef), timeout)
}
@@ -394,40 +471,37 @@ class AsynchbaseStorage(override val graph: Graph,
cfs: List[String],
regionMultiplier: Int,
ttl: Option[Int],
- compressionAlgorithm: String): Unit = {
+ compressionAlgorithm: String,
+ replicationScopeOpt: Option[Int] = None,
+ totalRegionCount: Option[Int] = None): Unit = {
/** TODO: Decide if we will allow each app server to connect to multiple hbase cluster */
for {
zkAddr <- Seq(zkQuorum) ++ zkQuorumSlave.toSeq
} {
logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
val admin = getAdmin(zkAddr)
- val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
+ val regionCount = totalRegionCount.getOrElse(admin.getClusterStatus.getServersSize * regionMultiplier)
try {
if (!admin.tableExists(TableName.valueOf(tableName))) {
- try {
- val desc = new HTableDescriptor(TableName.valueOf(tableName))
- desc.setDurability(Durability.ASYNC_WAL)
- for (cf <- cfs) {
- val columnDesc = new HColumnDescriptor(cf)
- .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
- .setBloomFilterType(BloomType.ROW)
- .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
- .setMaxVersions(1)
- .setTimeToLive(2147483647)
- .setMinVersions(0)
- .setBlocksize(32768)
- .setBlockCacheEnabled(true)
- if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
- desc.addFamily(columnDesc)
- }
-
- if (regionCount <= 1) admin.createTable(desc)
- else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
- } catch {
- case e: Throwable =>
- logger.error(s"$zkAddr, $tableName failed with $e", e)
- throw e
+ val desc = new HTableDescriptor(TableName.valueOf(tableName))
+ desc.setDurability(Durability.ASYNC_WAL)
+ for (cf <- cfs) {
+ val columnDesc = new HColumnDescriptor(cf)
+ .setCompressionType(Algorithm.valueOf(compressionAlgorithm.toUpperCase))
+ .setBloomFilterType(BloomType.ROW)
+ .setDataBlockEncoding(DataBlockEncoding.FAST_DIFF)
+ .setMaxVersions(1)
+ .setTimeToLive(2147483647)
+ .setMinVersions(0)
+ .setBlocksize(32768)
+ .setBlockCacheEnabled(true)
+ if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
+ if (replicationScopeOpt.isDefined) columnDesc.setScope(replicationScopeOpt.get)
+ desc.addFamily(columnDesc)
}
+
+ if (regionCount <= 1) admin.createTable(desc)
+ else admin.createTable(desc, getStartKey(regionCount), getEndKey(regionCount), regionCount)
} else {
logger.info(s"$zkAddr, $tableName, $cfs already exist.")
}
@@ -445,12 +519,12 @@ class AsynchbaseStorage(override val graph: Graph,
/** Asynchbase implementation override default getVertices to use future Cache */
override def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = {
- def fromResult(queryParam: QueryParam,
- kvs: Seq[SKeyValue],
+ def fromResult(kvs: Seq[SKeyValue],
version: String): Option[Vertex] = {
if (kvs.isEmpty) None
- else vertexDeserializer.fromKeyValues(queryParam, kvs, version, None)
+ else vertexDeserializer.fromKeyValues(None, kvs, version, None)
+// .map(S2Vertex(graph, _))
}
val futures = vertices.map { vertex =>
@@ -461,55 +535,85 @@ class AsynchbaseStorage(override val graph: Graph,
get.maxVersions(1)
val cacheKey = MurmurHash3.stringHash(get.toString)
- vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 10000)(fetchVertexKeyValues(get)).map { kvs =>
- fromResult(QueryParam.Empty, kvs, vertex.serviceColumn.schemaVersion)
+ vertexCache.getOrElseUpdate(cacheKey, cacheTTL = 10000)(fetchVertexKeyValues(Left(get))).map { kvs =>
+ fromResult(kvs, vertex.serviceColumn.schemaVersion)
}
}
Future.sequence(futures).map { result => result.toList.flatten }
}
+ class V4ResultHandler(scanner: Scanner, defer: Deferred[util.ArrayList[KeyValue]], offset: Int, limit : Int) extends Callback[Object, util.ArrayList[util.ArrayList[KeyValue]]] {
+ val results = new util.ArrayList[KeyValue]()
+ var offsetCount = 0
+ override def call(kvsLs: util.ArrayList[util.ArrayList[KeyValue]]): Object = {
+ try {
+ if (kvsLs == null) {
+ defer.callback(results)
+ Try(scanner.close())
+ } else {
+ val curRet = new util.ArrayList[KeyValue]()
+ kvsLs.foreach(curRet.addAll(_))
+ val prevOffset = offsetCount
+ offsetCount += curRet.size()
+
+ val nextRet = if(offsetCount > offset){
+ if(prevOffset < offset ) {
+ curRet.subList(offset - prevOffset, curRet.size())
+ } else{
+ curRet
+ }
+ } else{
+ emptyKeyValues
+ }
+ val needCount = limit - results.size()
+ if (needCount >= nextRet.size()) {
+ results.addAll(nextRet)
+ } else {
+ results.addAll(nextRet.subList(0, needCount))
+ }
+ if (results.size() < limit) {
+ scanner.nextRows().addCallback(this)
+ } else {
+ defer.callback(results)
+ Try(scanner.close())
+ }
+ }
+ } catch{
+ case ex: Exception =>
+ logger.error(s"fetchKeyValuesInner failed.", ex)
+ defer.callback(ex)
+ Try(scanner.close())
+ }
+ }
+ }
/**
* Private Methods which is specific to Asynchbase implementation.
*/
- private def fetchKeyValuesInner(rpc: AnyRef): Deferred[util.ArrayList[KeyValue]] = {
+ private def fetchKeyValuesInner(rpc: AsyncRPC): Deferred[util.ArrayList[KeyValue]] = {
rpc match {
- case getRequest: GetRequest => client.get(getRequest)
- case scanner: Scanner =>
- scanner.nextRows().withCallback { kvsLs =>
- val ls = new util.ArrayList[KeyValue]
- if (kvsLs == null) {
-
- } else {
- kvsLs.foreach { kvs =>
- if (kvs != null) kvs.foreach { kv => ls.add(kv) }
- else {
-
- }
- }
- }
- scanner.close()
- ls
- }.recoverWith { ex =>
- logger.error(s"fetchKeyValuesInner failed.", ex)
- scanner.close()
- emptyKeyValues
- }
+ case Left(get) => client.get(get)
+ case Right(ScanWithRange(scanner, offset, limit)) =>
+ val deferred = new Deferred[util.ArrayList[KeyValue]]()
+ scanner.nextRows().addCallback(new V4ResultHandler(scanner, deferred, offset, limit))
+ deferred
case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc"))
}
}
- private def toCacheKeyBytes(hbaseRpc: AnyRef): Array[Byte] = {
+ private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = {
+ /** with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
hbaseRpc match {
- case getRequest: GetRequest => getRequest.key()
- case scanner: Scanner => scanner.getCurrentKey()
+ case Left(getRequest) => getRequest.key
+ case Right(ScanWithRange(scanner, offset, limit)) =>
+ Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit)))
case _ =>
logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc")
- Array.empty[Byte]
+ throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc")
}
}
@@ -520,8 +624,6 @@ class AsynchbaseStorage(override val graph: Graph,
val principal = config.getString("principal")
val keytab = config.getString("keytab")
-
-
System.setProperty("java.security.auth.login.config", jaas)
System.setProperty("java.security.krb5.conf", krb5Conf)
// System.setProperty("sun.security.krb5.debug", "true")
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
index e2b7c2f..2428173 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala
@@ -20,141 +20,125 @@
package org.apache.s2graph.core.storage.serde.indexedge.tall
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
import org.apache.s2graph.core.storage.StorageDeserializable._
-import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable}
+import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable}
import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex}
import scala.collection.immutable
+object IndexEdgeDeserializable{
+ def getNewInstance() = new IndexEdgeDeserializable()
+}
class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
import StorageDeserializable._
- type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
- type ValueRaw = (Array[(Byte, InnerValLike)], Int)
+ type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
+ type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
- private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
- // val degree = Bytes.toLong(kv.value)
- val degree = bytesToLongFunc(kv.value, 0)
- val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
- val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
- (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
- }
+ override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
+ _kvs: Seq[T],
+ schemaVer: String,
+ cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+
+ assert(_kvs.size == 1)
+
+ // val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+ val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
+// logger.debug(s"[DES]: ${kv.toLogString}}")
- private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
- var qualifierLen = 0
+ val version = kv.timestamp
+ // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
var pos = 0
- val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
- val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
+ val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer)
+ pos += srcIdLen
+ val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
+ pos += 4
+ val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
+ pos += 1
+
+ val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
+// val op = kv.row(pos)
+// pos += 1
+
+ if (pos == kv.row.length) {
+ // degree
+ // val degreeVal = Bytes.toLong(kv.value)
+ val degreeVal = bytesToLongFunc(kv.value, 0)
+ val ts = kv.timestamp
+ val tsInnerValLikeWithTs = InnerValLikeWithTs.withLong(ts, ts, schemaVer)
+ val props = Map(LabelMeta.timestamp -> tsInnerValLikeWithTs,
+ LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer))
+ val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
+ IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), label, labelWithDir.dir, GraphUtil.defaultOpByte, ts, labelIdxSeq, props, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
+ } else {
+ // not degree edge
+ val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
pos = endAt
- qualifierLen += endAt
- val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+
+
+ val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length - 1) {
(HBaseType.defaultTgtVertexId, 0)
} else {
- TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
+ TargetVertexId.fromBytes(kv.row, endAt, kv.row.length - 1, schemaVer)
}
- qualifierLen += tgtVertexIdLen
- (props, endAt, tgtVertexId, tgtVertexIdLen)
- }
- val (op, opLen) =
- if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
- else (kv.qualifier(qualifierLen), 1)
+ val op = kv.row(kv.row.length-1)
+
+ val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs]
+ val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
+
+ /** process indexProps */
+ val size = idxPropsRaw.length
+ (0 until size).foreach { ith =>
+ val meta = index.sortKeyTypesArray(ith)
+ val (k, v) = idxPropsRaw(ith)
+ if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
+ else allProps += meta -> InnerValLikeWithTs(v, version)
+ }
+// for {
+// (meta, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw)
+// } {
+// if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
+// else {
+// allProps += meta -> InnerValLikeWithTs(v, version)
+// }
+// }
+
+ /** process props */
+ if (op == GraphUtil.operations("incrementCount")) {
+ // val countVal = Bytes.toLong(kv.value)
+ val countVal = bytesToLongFunc(kv.value, 0)
+ allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+ } else {
+ val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label)
+ props.foreach { case (k, v) =>
+ allProps += (k -> InnerValLikeWithTs(v, version))
+ }
+ }
+ val _mergedProps = allProps.result()
+ val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match {
+ case None =>
+ val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer)
+ val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
+ (mergedProps, tsInnerVal)
+ case Some(tsInnerVal) =>
+ (_mergedProps, tsInnerVal)
+ }
+// val mergedProps =
+// if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps
+// else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
- qualifierLen += opLen
+ /** process tgtVertexId */
+ val tgtVertexId =
+ mergedProps.get(LabelMeta.to) match {
+ case None => tgtVertexIdRaw
+ case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+ }
- (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
- }
- private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
- val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
- (props, endAt)
- }
+ IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
- private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
- (Array.empty[(Byte, InnerValLike)], 0)
+ }
}
-
- override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
- _kvs: Seq[T],
- schemaVer: String,
- cacheElementOpt: Option[IndexEdge]): IndexEdge = {
-
- assert(_kvs.size == 1)
-
- val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
- val kv = kvs.head
- val version = kv.timestamp
- // logger.debug(s"[Des]: ${kv.row.toList}, ${kv.qualifier.toList}, ${kv.value.toList}")
- var pos = 0
- val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, schemaVer)
- pos += srcIdLen
- val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
- pos += 4
- val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
- pos += 1
-
- val op = kv.row(pos)
- pos += 1
-
- if (pos == kv.row.length) {
- // degree
- // val degreeVal = Bytes.toLong(kv.value)
- val degreeVal = bytesToLongFunc(kv.value, 0)
- val ts = kv.timestamp
- val props = Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(ts, ts, schemaVer),
- LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer))
- val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
- IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), labelWithDir, op, ts, labelIdxSeq, props)
- } else {
- // not degree edge
-
-
- val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer)
- pos = endAt
- val (tgtVertexIdRaw, tgtVertexIdLen) = if (endAt == kv.row.length) {
- (HBaseType.defaultTgtVertexId, 0)
- } else {
- TargetVertexId.fromBytes(kv.row, endAt, kv.row.length, schemaVer)
- }
-
- val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
- val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
- /* process indexProps */
- for {
- (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
- } {
- if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
- else allProps += seq -> InnerValLikeWithTs(v, version)
- }
-
- /* process props */
- if (op == GraphUtil.operations("incrementCount")) {
- // val countVal = Bytes.toLong(kv.value)
- val countVal = bytesToLongFunc(kv.value, 0)
- allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
- } else {
- val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
- props.foreach { case (k, v) =>
- allProps += (k -> InnerValLikeWithTs(v, version))
- }
- }
- val _mergedProps = allProps.result()
- val mergedProps =
- if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
- else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-
- /* process tgtVertexId */
- val tgtVertexId =
- mergedProps.get(LabelMeta.toSeq) match {
- case None => tgtVertexIdRaw
- case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
- }
-
-
- IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
-
- }
- }
-}
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
index a76bd1f..cd242dc 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeSerializable.scala
@@ -21,16 +21,16 @@ package org.apache.s2graph.core.storage.serde.indexedge.tall
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.mysqls.LabelMeta
-import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
+import org.apache.s2graph.core.storage.{Serializable, StorageSerializable}
import org.apache.s2graph.core.types.VertexId
import org.apache.s2graph.core.{GraphUtil, IndexEdge}
+import org.apache.s2graph.core.storage.StorageSerializable._
-
-class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
+class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byte] = longToBytes) extends Serializable[IndexEdge] {
import StorageSerializable._
- override val ts = indexEdge.version
- override val table = indexEdge.label.hbaseTableName.getBytes()
+ override def ts = indexEdge.version
+ override def table = indexEdge.label.hbaseTableName.getBytes()
def idxPropsMap = indexEdge.orders.toMap
def idxPropsBytes = propsToBytes(indexEdge.orders)
@@ -43,25 +43,25 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
val row = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
// logger.error(s"${row.toList}\n${srcIdBytes.toList}\n${labelWithDirBytes.toList}\n${labelIndexSeqWithIsInvertedBytes.toList}")
- val qualifier =
- if (indexEdge.degreeEdge) Array.empty[Byte]
- else
- idxPropsMap.get(LabelMeta.toSeq) match {
+ if (indexEdge.degreeEdge) row
+ else {
+ val qualifier = idxPropsMap.get(LabelMeta.to) match {
case None => Bytes.add(idxPropsBytes, VertexId.toTargetVertexId(indexEdge.tgtVertex.id).bytes)
case Some(vId) => idxPropsBytes
}
- /* TODO search usage of op byte. if there is no, then remove opByte */
- Bytes.add(row, Array.fill(1)(GraphUtil.defaultOpByte), qualifier)
+ val opByte = if (indexEdge.op == GraphUtil.operations("incrementCount")) indexEdge.op else GraphUtil.defaultOpByte
+ Bytes.add(row, qualifier, Array.fill(1)(opByte))
+ }
}
override def toQualifier: Array[Byte] = Array.empty[Byte]
override def toValue: Array[Byte] =
if (indexEdge.degreeEdge)
- Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
+ longToBytes(indexEdge.props(LabelMeta.degree).innerVal.toString().toLong)
else if (indexEdge.op == GraphUtil.operations("incrementCount"))
- Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
+ longToBytes(indexEdge.props(LabelMeta.count).innerVal.toString().toLong)
else propsToKeyValues(indexEdge.metas.toSeq)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
index eb3d765..534667b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala
@@ -19,122 +19,126 @@
package org.apache.s2graph.core.storage.serde.indexedge.wide
-import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core.mysqls.{Label, LabelMeta}
import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.{GraphUtil, IndexEdge, QueryParam, Vertex}
-import scala.collection.immutable
-
-import scala.collection.immutable
-
+import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex}
import scala.collection.immutable
class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] {
-
-
- import StorageDeserializable._
-
- type QualifierRaw = (Array[(Byte, InnerValLike)], VertexId, Byte, Boolean, Int)
- type ValueRaw = (Array[(Byte, InnerValLike)], Int)
-
- private def parseDegreeQualifier(kv: SKeyValue, version: String): QualifierRaw = {
- // val degree = Bytes.toLong(kv.value)
- val degree = bytesToLongFunc(kv.value, 0)
- val idxPropsRaw = Array(LabelMeta.degreeSeq -> InnerVal.withLong(degree, version))
- val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", version))
- (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
- }
-
- private def parseQualifier(kv: SKeyValue, version: String): QualifierRaw = {
- var qualifierLen = 0
- var pos = 0
- val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
- val (props, endAt) = bytesToProps(kv.qualifier, pos, version)
- pos = endAt
- qualifierLen += endAt
- val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
- (HBaseType.defaultTgtVertexId, 0)
- } else {
- TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, version)
- }
- qualifierLen += tgtVertexIdLen
- (props, endAt, tgtVertexId, tgtVertexIdLen)
- }
- val (op, opLen) =
- if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
- else (kv.qualifier(qualifierLen), 1)
-
- qualifierLen += opLen
-
- (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
- }
-
- private def parseValue(kv: SKeyValue, version: String): ValueRaw = {
- val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, version)
- (props, endAt)
- }
-
- private def parseDegreeValue(kv: SKeyValue, version: String): ValueRaw = {
- (Array.empty[(Byte, InnerValLike)], 0)
- }
-
- override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
- _kvs: Seq[T],
- schemaVer: String,
- cacheElementOpt: Option[IndexEdge]): IndexEdge = {
- assert(_kvs.size == 1)
-
- val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
-
- val kv = kvs.head
- val version = kv.timestamp
-
- val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
- (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
- }.getOrElse(parseRow(kv, schemaVer))
-
- val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
- if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
- else parseQualifier(kv, schemaVer)
-
- val allProps = immutable.Map.newBuilder[Byte, InnerValLikeWithTs]
- val index = queryParam.label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${queryParam.label.id.get}, ${labelIdxSeq}"))
-
- /* process indexProps */
- for {
- (seq, (k, v)) <- index.metaSeqs.zip(idxPropsRaw)
- } {
- if (k == LabelMeta.degreeSeq) allProps += k -> InnerValLikeWithTs(v, version)
- else allProps += seq -> InnerValLikeWithTs(v, version)
- }
-
- /* process props */
- if (op == GraphUtil.operations("incrementCount")) {
- // val countVal = Bytes.toLong(kv.value)
- val countVal = bytesToLongFunc(kv.value, 0)
- allProps += (LabelMeta.countSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
- } else if (kv.qualifier.isEmpty) {
- val countVal = bytesToLongFunc(kv.value, 0)
- allProps += (LabelMeta.degreeSeq -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
- } else {
- val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer)
- props.foreach { case (k, v) => allProps += (k -> InnerValLikeWithTs(v, version)) }
- }
-
- val _mergedProps = allProps.result()
- val mergedProps =
- if (_mergedProps.contains(LabelMeta.timeStampSeq)) _mergedProps
- else _mergedProps + (LabelMeta.timeStampSeq -> InnerValLikeWithTs.withLong(version, version, schemaVer))
-
- /* process tgtVertexId */
- val tgtVertexId =
- mergedProps.get(LabelMeta.toSeq) match {
- case None => tgtVertexIdRaw
- case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
- }
-
- IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), labelWithDir, op, version, labelIdxSeq, mergedProps)
-
- }
-}
+ import StorageDeserializable._
+
+ type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int)
+ type ValueRaw = (Array[(LabelMeta, InnerValLike)], Int)
+
+ private def parseDegreeQualifier(kv: SKeyValue, schemaVer: String): QualifierRaw = {
+ // val degree = Bytes.toLong(kv.value)
+ val degree = bytesToLongFunc(kv.value, 0)
+ val idxPropsRaw = Array(LabelMeta.degree -> InnerVal.withLong(degree, schemaVer))
+ val tgtVertexIdRaw = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer))
+ (idxPropsRaw, tgtVertexIdRaw, GraphUtil.operations("insert"), false, 0)
+ }
+
+ private def parseQualifier(kv: SKeyValue, schemaVer: String): QualifierRaw = {
+ var qualifierLen = 0
+ var pos = 0
+ val (idxPropsRaw, idxPropsLen, tgtVertexIdRaw, tgtVertexIdLen) = {
+ val (props, endAt) = bytesToProps(kv.qualifier, pos, schemaVer)
+ pos = endAt
+ qualifierLen += endAt
+ val (tgtVertexId, tgtVertexIdLen) = if (endAt == kv.qualifier.length) {
+ (HBaseType.defaultTgtVertexId, 0)
+ } else {
+ TargetVertexId.fromBytes(kv.qualifier, endAt, kv.qualifier.length, schemaVer)
+ }
+ qualifierLen += tgtVertexIdLen
+ (props, endAt, tgtVertexId, tgtVertexIdLen)
+ }
+ val (op, opLen) =
+ if (kv.qualifier.length == qualifierLen) (GraphUtil.defaultOpByte, 0)
+ else (kv.qualifier(qualifierLen), 1)
+
+ qualifierLen += opLen
+
+ (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdLen != 0, qualifierLen)
+ }
+
+ override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
+ _kvs: Seq[T],
+ schemaVer: String,
+ cacheElementOpt: Option[IndexEdge]): IndexEdge = {
+ assert(_kvs.size == 1)
+
+// val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
+
+ val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head)
+ val version = kv.timestamp
+
+ val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e =>
+ (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0)
+ }.getOrElse(parseRow(kv, schemaVer))
+
+ val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId))
+
+ val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) =
+ if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer)
+ else parseQualifier(kv, schemaVer)
+
+ val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs]
+ val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}"))
+
+ /** process indexProps */
+ val size = idxPropsRaw.length
+ (0 until size).foreach { ith =>
+ val meta = index.sortKeyTypesArray(ith)
+ val (k, v) = idxPropsRaw(ith)
+ if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
+ else allProps += meta -> InnerValLikeWithTs(v, version)
+ }
+// for {
+// (seq, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw)
+// } {
+// if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version)
+// else allProps += seq -> InnerValLikeWithTs(v, version)
+// }
+
+ /** process props */
+ if (op == GraphUtil.operations("incrementCount")) {
+ // val countVal = Bytes.toLong(kv.value)
+ val countVal = bytesToLongFunc(kv.value, 0)
+ allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+ } else if (kv.qualifier.isEmpty) {
+ val countVal = bytesToLongFunc(kv.value, 0)
+ allProps += (LabelMeta.degree -> InnerValLikeWithTs.withLong(countVal, version, schemaVer))
+ } else {
+ val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label)
+ props.foreach { case (k, v) =>
+ allProps += (k -> InnerValLikeWithTs(v, version))
+ }
+ }
+
+ val _mergedProps = allProps.result()
+ val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match {
+ case None =>
+ val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer)
+ val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
+ (mergedProps, tsInnerVal)
+ case Some(tsInnerVal) =>
+ (_mergedProps, tsInnerVal)
+ }
+// val mergedProps =
+// if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps
+// else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer))
+
+ /** process tgtVertexId */
+ val tgtVertexId =
+ mergedProps.get(LabelMeta.to) match {
+ case None => tgtVertexIdRaw
+ case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal)
+ }
+
+ IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal))
+
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
index c700e53..211b159 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeSerializable.scala
@@ -24,12 +24,13 @@ import org.apache.s2graph.core.mysqls.LabelMeta
import org.apache.s2graph.core.storage.{SKeyValue, Serializable, StorageSerializable}
import org.apache.s2graph.core.types.VertexId
import org.apache.s2graph.core.{GraphUtil, IndexEdge}
+import org.apache.s2graph.core.storage.StorageSerializable._
-class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge] {
+class IndexEdgeSerializable(indexEdge: IndexEdge, longToBytes: Long => Array[Byte] = longToBytes) extends Serializable[IndexEdge] {
import StorageSerializable._
- override val ts = indexEdge.version
- override val table = indexEdge.label.hbaseTableName.getBytes()
+ override def ts = indexEdge.version
+ override def table = indexEdge.label.hbaseTableName.getBytes()
def idxPropsMap = indexEdge.orders.toMap
def idxPropsBytes = propsToBytes(indexEdge.orders)
@@ -49,7 +50,7 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
if (indexEdge.op == GraphUtil.operations("incrementCount")) {
Bytes.add(idxPropsBytes, tgtIdBytes, Array.fill(1)(indexEdge.op))
} else {
- idxPropsMap.get(LabelMeta.toSeq) match {
+ idxPropsMap.get(LabelMeta.to) match {
case None => Bytes.add(idxPropsBytes, tgtIdBytes)
case Some(vId) => idxPropsBytes
}
@@ -59,9 +60,9 @@ class IndexEdgeSerializable(indexEdge: IndexEdge) extends Serializable[IndexEdge
override def toValue: Array[Byte] =
if (indexEdge.degreeEdge)
- Bytes.toBytes(indexEdge.props(LabelMeta.degreeSeq).innerVal.toString().toLong)
+ longToBytes(indexEdge.props(LabelMeta.degree).innerVal.toString().toLong)
else if (indexEdge.op == GraphUtil.operations("incrementCount"))
- Bytes.toBytes(indexEdge.props(LabelMeta.countSeq).innerVal.toString().toLong)
+ longToBytes(indexEdge.props(LabelMeta.count).innerVal.toString().toLong)
else propsToKeyValues(indexEdge.metas.toSeq)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
index 368e3f3..91b8db1 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala
@@ -20,11 +20,11 @@
package org.apache.s2graph.core.storage.serde.snapshotedge.tall
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable}
import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId}
-import org.apache.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
+import org.apache.s2graph.core.{Edge, SnapshotEdge, Vertex}
class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
@@ -34,7 +34,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
(statusCode.toByte, op.toByte)
}
- override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+ override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
_kvs: Seq[T],
version: String,
cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
@@ -42,9 +42,10 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
assert(kvs.size == 1)
val kv = kvs.head
- val schemaVer = queryParam.label.schemaVersion
+ val label = checkLabel.get
+ val schemaVer = label.schemaVersion
val cellVersion = kv.timestamp
- /* rowKey */
+ /** rowKey */
def parseRowV3(kv: SKeyValue, version: String) = {
var pos = 0
val (srcIdAndTgtId, srcIdAndTgtIdLen) = SourceAndTargetVertexIdPair.fromBytes(kv.row, pos, kv.row.length, version)
@@ -64,13 +65,14 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
val srcVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, srcInnerId)
val tgtVertexId = SourceVertexId(HBaseType.DEFAULT_COL_ID, tgtInnerId)
- val (props, op, ts, statusCode, _pendingEdgeOpt) = {
+ val (props, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) = {
var pos = 0
val (statusCode, op) = statusCodeWithOp(kv.value(pos))
pos += 1
- val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+ val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
val kvsMap = props.toMap
- val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
+ val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
+ val ts = tsInnerVal.toString.toLong
pos = endAt
val _pendingEdgeOpt =
@@ -80,24 +82,24 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
pos += 1
// val versionNum = Bytes.toLong(kv.value, pos, 8)
// pos += 8
- val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+ val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
pos = endAt
val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
val pendingEdge =
Edge(Vertex(srcVertexId, cellVersion),
Vertex(tgtVertexId, cellVersion),
- labelWithDir, pendingEdgeOp,
+ label, labelWithDir.dir, pendingEdgeOp,
cellVersion, pendingEdgeProps.toMap,
- statusCode = pendingEdgeStatusCode, lockTs = lockTs)
+ statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
Option(pendingEdge)
}
- (kvsMap, op, ts, statusCode, _pendingEdgeOpt)
+ (kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
}
SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
- labelWithDir, op, cellVersion, props, statusCode = statusCode,
- pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
+ label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
+ pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
index 4f7c17b..fc84469 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala
@@ -29,8 +29,8 @@ import org.apache.s2graph.core.types.SourceAndTargetVertexIdPair
class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
import StorageSerializable._
- override val ts = snapshotEdge.version
- override val table = snapshotEdge.label.hbaseTableName.getBytes()
+ override def ts = snapshotEdge.version
+ override def table = snapshotEdge.label.hbaseTableName.getBytes()
def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
val byte = (((statusCode << 4) | op).toByte)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
index 8aca2cf..8d95e77 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala
@@ -20,11 +20,11 @@
package org.apache.s2graph.core.storage.serde.snapshotedge.wide
import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.mysqls.{LabelIndex, LabelMeta}
+import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta}
import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable}
import org.apache.s2graph.core.types.TargetVertexId
-import org.apache.s2graph.core.{Edge, QueryParam, SnapshotEdge, Vertex}
+import org.apache.s2graph.core.{Edge, SnapshotEdge, Vertex}
class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
@@ -34,7 +34,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
(statusCode.toByte, op.toByte)
}
- override def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
+ override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
_kvs: Seq[T],
version: String,
cacheElementOpt: Option[SnapshotEdge]): SnapshotEdge = {
@@ -42,21 +42,23 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
assert(kvs.size == 1)
val kv = kvs.head
- val schemaVer = queryParam.label.schemaVersion
+ val label = checkLabel.get
+ val schemaVer = label.schemaVersion
val cellVersion = kv.timestamp
val (srcVertexId, labelWithDir, _, _, _) = cacheElementOpt.map { e =>
(e.srcVertex.id, e.labelWithDir, LabelIndex.DefaultSeq, true, 0)
}.getOrElse(parseRow(kv, schemaVer))
- val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt) = {
+ val (tgtVertexId, props, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) = {
val (tgtVertexId, _) = TargetVertexId.fromBytes(kv.qualifier, 0, kv.qualifier.length, schemaVer)
var pos = 0
val (statusCode, op) = statusCodeWithOp(kv.value(pos))
pos += 1
- val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+ val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
val kvsMap = props.toMap
- val ts = kvsMap(LabelMeta.timeStampSeq).innerVal.toString.toLong
+ val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
+ val ts = tsInnerVal.toString.toLong
pos = endAt
val _pendingEdgeOpt =
@@ -66,24 +68,24 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] {
pos += 1
// val versionNum = Bytes.toLong(kv.value, pos, 8)
// pos += 8
- val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer)
+ val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
pos = endAt
val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
val pendingEdge =
Edge(Vertex(srcVertexId, cellVersion),
Vertex(tgtVertexId, cellVersion),
- labelWithDir, pendingEdgeOp,
+ label, labelWithDir.dir, pendingEdgeOp,
cellVersion, pendingEdgeProps.toMap,
- statusCode = pendingEdgeStatusCode, lockTs = lockTs)
+ statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
Option(pendingEdge)
}
- (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt)
+ (tgtVertexId, kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal)
}
SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts),
- labelWithDir, op, cellVersion, props, statusCode = statusCode,
- pendingEdgeOpt = _pendingEdgeOpt, lockTs = None)
+ label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode,
+ pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
index 757ef1b..4ceb4a8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala
@@ -34,8 +34,8 @@ import org.apache.s2graph.core.types.VertexId
class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[SnapshotEdge] {
import StorageSerializable._
- override val ts = snapshotEdge.version
- override val table = snapshotEdge.label.hbaseTableName.getBytes()
+ override def ts = snapshotEdge.version
+ override def table = snapshotEdge.label.hbaseTableName.getBytes()
def statusCodeWithOp(statusCode: Byte, op: Byte): Array[Byte] = {
val byte = (((statusCode << 4) | op).toByte)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
index 737c2a8..3ec17ab 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexDeserializable.scala
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,18 +19,19 @@
package org.apache.s2graph.core.storage.serde.vertex
-import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.Label
+import org.apache.s2graph.core.storage.StorageDeserializable._
import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable}
import org.apache.s2graph.core.types.{InnerVal, InnerValLike, VertexId}
import org.apache.s2graph.core.{QueryParam, Vertex}
import scala.collection.mutable.ListBuffer
-class VertexDeserializable extends Deserializable[Vertex] {
- def fromKeyValuesInner[T: CanSKeyValue](queryParam: QueryParam,
- _kvs: Seq[T],
- version: String,
- cacheElementOpt: Option[Vertex]): Vertex = {
+class VertexDeserializable(bytesToInt: (Array[Byte], Int) => Int = bytesToInt) extends Deserializable[Vertex] {
+ def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label],
+ _kvs: Seq[T],
+ version: String,
+ cacheElementOpt: Option[Vertex]): Vertex = {
val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
@@ -46,7 +47,7 @@ class VertexDeserializable extends Deserializable[Vertex] {
} {
val propKey =
if (kv.qualifier.length == 1) kv.qualifier.head.toInt
- else Bytes.toInt(kv.qualifier)
+ else bytesToInt(kv.qualifier, 0)
val ts = kv.timestamp
if (ts > maxTs) maxTs = ts
@@ -63,4 +64,3 @@ class VertexDeserializable extends Deserializable[Vertex] {
Vertex(vertexId, maxTs, propsMap.toMap, belongLabelIds = belongLabelIds)
}
}
-
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
index 6bb162c..77bbb87 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/vertex/VertexSerializable.scala
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,11 +19,12 @@
package org.apache.s2graph.core.storage.serde.vertex
-import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core.Vertex
+import org.apache.s2graph.core.storage.StorageSerializable._
import org.apache.s2graph.core.storage.{SKeyValue, Serializable}
+import org.apache.s2graph.core.utils.logger
-case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
+case class VertexSerializable(vertex: Vertex, intToBytes: Int => Array[Byte] = intToBytes) extends Serializable[Vertex] {
override val table = vertex.hbaseTableName.getBytes
override val ts = vertex.ts
@@ -37,10 +38,10 @@ case class VertexSerializable(vertex: Vertex) extends Serializable[Vertex] {
/** vertex override toKeyValues since vertex expect to produce multiple sKeyValues */
override def toKeyValues: Seq[SKeyValue] = {
val row = toRowKey
- val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield Bytes.toBytes(k) -> v.bytes
- val belongsTo = vertex.belongLabelIds.map { labelId => Bytes.toBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
+ val base = for ((k, v) <- vertex.props ++ vertex.defaultProps) yield intToBytes(k) -> v.bytes
+ val belongsTo = vertex.belongLabelIds.map { labelId => intToBytes(Vertex.toPropKey(labelId)) -> Array.empty[Byte] }
(base ++ belongsTo).map { case (qualifier, value) =>
SKeyValue(vertex.hbaseTableName.getBytes, row, cf, qualifier, value, vertex.ts)
} toSeq
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/66bdf1bc/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
index c03319d..b885bc6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/types/HBaseType.scala
@@ -20,6 +20,7 @@
package org.apache.s2graph.core.types
import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.mysqls.LabelMeta
object HBaseType {
val VERSION4 = "v4"
@@ -28,7 +29,7 @@ object HBaseType {
val VERSION1 = "v1"
// val DEFAULT_VERSION = VERSION2
val DEFAULT_VERSION = VERSION3
- val EMPTY_SEQ_BYTE = Byte.MaxValue
+// val EMPTY_SEQ_BYTE = Byte.MaxValue
val DEFAULT_COL_ID = 0
val bitsForDir = 2
val maxBytes = Bytes.toBytes(Int.MaxValue)
@@ -100,7 +101,7 @@ object HBaseDeserializable {
val kvs = new Array[(Byte, InnerValLike)](len)
var i = 0
while (i < len) {
- val k = EMPTY_SEQ_BYTE
+ val k = LabelMeta.emptySeq
val (v, numOfBytesUsed) = InnerVal.fromBytes(bytes, pos, 0, version)
pos += numOfBytesUsed
kvs(i) = (k -> v)