You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/11 03:05:43 UTC
[03/11] incubator-s2graph git commit: separate Storage into multiple
small interfaces such as EdgeFetcher/VertexMutator, ...
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
new file mode 100644
index 0000000..4239d15
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import java.util
+
+import com.stumbleupon.async.Deferred
+import com.typesafe.config.Config
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.{HBaseType, VertexId}
+import org.apache.s2graph.core.utils.{CanDefer, DeferCache, Extensions, logger}
+import org.hbase.async._
+
+import scala.concurrent.ExecutionContext
+
+class AsynchbaseEdgeFetcher(val graph: S2GraphLike,
+ val config: Config,
+ val client: HBaseClient,
+ val serDe: StorageSerDe,
+ val io: StorageIO) extends EdgeFetcher {
+
+ import AsynchbaseStorage._
+ import CanDefer._
+ import Extensions.DeferOps
+
+ import scala.collection.JavaConverters._
+
+ /** Future Cache to squash request */
+ lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true)
+
+ override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext) = {
+ val defers: Seq[Deferred[StepResult]] = for {
+ queryRequest <- queryRequests
+ } yield {
+ val queryOption = queryRequest.query.queryOption
+ val queryParam = queryRequest.queryParam
+ val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
+ val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
+ fetch(queryRequest, isInnerCall = false, parentEdges)
+ }
+
+ val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers.asJava)
+ grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] =>
+ queryResults
+ }.toFuture(emptyStepResult).map(_.asScala)
+ }
+
+ /**
+ * we are using future cache to squash requests into same key on storage.
+ *
+ * @param queryRequest
+ * @param isInnerCall
+ * @param parentEdges
+ * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future.
+ * seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback
+ */
+ private def fetch(queryRequest: QueryRequest,
+ isInnerCall: Boolean,
+ parentEdges: Seq[EdgeWithScore])(implicit ec: ExecutionContext): Deferred[StepResult] = {
+
+ def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = {
+ val prevStepScore = queryRequest.prevStepScore
+ val fallbackFn: (Exception => StepResult) = { ex =>
+ logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
+ StepResult.Failure
+ }
+
+ val queryParam = queryRequest.queryParam
+ AsynchbaseStorage.fetchKeyValuesInner(client, hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { _kvs =>
+ val kvs = _kvs.asScala
+ val (startOffset, len) = queryParam.label.schemaVersion match {
+ case HBaseType.VERSION4 =>
+ val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset
+ (offset, queryParam.limit)
+ case _ => (0, kvs.length)
+ }
+
+ io.toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len)
+ }
+ }
+
+ val queryParam = queryRequest.queryParam
+ val cacheTTL = queryParam.cacheTTLInMillis
+ /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
+
+ val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
+ val request = buildRequest(client, serDe, queryRequest, edge)
+
+ val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
+ val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes)
+
+ if (cacheTTL <= 0) fetchInner(request)
+ else {
+ val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey)
+
+ // val cacheKeyBytes = toCacheKeyBytes(request)
+ val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+ futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala
new file mode 100644
index 0000000..f03310c
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticEdgeFetcher.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import org.apache.s2graph.core.{QueryRequest, S2EdgeLike}
+import org.apache.s2graph.core.storage.{OptimisticEdgeFetcher, SKeyValue, StorageIO, StorageSerDe}
+import org.hbase.async.HBaseClient
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsynchbaseOptimisticEdgeFetcher(val client: HBaseClient,
+ val serDe: StorageSerDe,
+ val io: StorageIO) extends OptimisticEdgeFetcher {
+ override protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+ val request = AsynchbaseStorage.buildRequest(client, serDe, queryRequest, edge)
+ AsynchbaseStorage.fetchKeyValues(client, request)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala
new file mode 100644
index 0000000..8305e04
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseOptimisticMutator.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.S2GraphLike
+import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.utils.{Extensions, logger}
+import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, PutRequest}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsynchbaseOptimisticMutator(val graph: S2GraphLike,
+ val serDe: StorageSerDe,
+ val optimisticEdgeFetcher: OptimisticEdgeFetcher,
+ val client: HBaseClient,
+ val clientWithFlush: HBaseClient) extends OptimisticMutator {
+ import Extensions.DeferOps
+
+ private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
+ /**
+ * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
+ * note that this should be return true on all success.
+ * we assumes that each storage implementation has client as member variable.
+ *
+ * @param cluster : where this key values should be stored.
+ * @param kvs : sequence of SKeyValue that need to be stored in storage.
+ * @param withWait : flag to control wait ack from storage.
+ * note that in AsynchbaseStorage(which support asynchronous operations), even with true,
+ * it never block thread, but rather submit work and notified by event loop when storage send ack back.
+ * @return ack message from storage.
+ */
+ override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = {
+ if (kvs.isEmpty) Future.successful(MutateResponse.Success)
+ else {
+ val _client = client(withWait)
+ val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment)
+
+ /* Asynchbase IncrementRequest does not implement HasQualifiers */
+ val incrementsFutures = increments.map { kv =>
+ val countVal = Bytes.toLong(kv.value)
+ val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, countVal)
+ val fallbackFn: (Exception => MutateResponse) = { ex =>
+ logger.error(s"mutation failed. $request", ex)
+ new IncrementResponse(false, -1L, -1L)
+ }
+ val future = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long =>
+ new IncrementResponse(true, resultCount.longValue(), countVal)
+ }.toFuture(MutateResponse.IncrementFailure)
+
+ if (withWait) future else Future.successful(MutateResponse.IncrementSuccess)
+ }
+
+ /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
+ val othersFutures = putAndDeletes.groupBy { kv =>
+ (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp)
+ }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) =>
+
+ val durability = groupedKeyValues.head.durability
+ val qualifiers = new ArrayBuffer[Array[Byte]]()
+ val values = new ArrayBuffer[Array[Byte]]()
+
+ groupedKeyValues.foreach { kv =>
+ if (kv.qualifier != null) qualifiers += kv.qualifier
+ if (kv.value != null) values += kv.value
+ }
+ val defer = operation match {
+ case SKeyValue.Put =>
+ val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp)
+ put.setDurable(durability)
+ _client.put(put)
+ case SKeyValue.Delete =>
+ val delete =
+ if (qualifiers.isEmpty)
+ new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp)
+ else
+ new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp)
+ delete.setDurable(durability)
+ _client.delete(delete)
+ }
+ if (withWait) {
+ defer.toFuture(new AnyRef()).map(_ => MutateResponse.Success).recover { case ex: Exception =>
+ groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) }
+ MutateResponse.Failure
+ }
+ } else Future.successful(MutateResponse.Success)
+ }
+ for {
+ incrementRets <- Future.sequence(incrementsFutures)
+ otherRets <- Future.sequence(othersFutures)
+ } yield new MutateResponse(isSuccess = (incrementRets ++ otherRets).forall(_.isSuccess))
+ }
+ }
+
+ /**
+ * write requestKeyValue into storage if the current value in storage that is stored matches.
+ * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
+ *
+ * Most important thing is this have to be 'atomic' operation.
+ * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
+ * either blocked or failed on write-write conflict case.
+ *
+ * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
+ * prevent wrong data for read.
+ *
+ * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
+ * compareAndSet to synchronize.
+ *
+ * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
+ * for storage that does not support concurrency control, then storage implementation
+ * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
+ * and write(writeLock).
+ *
+ * @param rpc
+ * @param expectedOpt
+ * @return
+ */
+ override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] = {
+ val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp)
+ val expected = expectedOpt.map(_.value).getOrElse(Array.empty)
+ client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true)
+ .map(r => new MutateResponse(isSuccess = r))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 4be3767..f65ee20 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -21,25 +21,40 @@ package org.apache.s2graph.core.storage.hbase
import java.util
+import java.util.Base64
import java.util.concurrent.{ExecutorService, Executors}
+import com.stumbleupon.async.Deferred
import com.typesafe.config.Config
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core._
import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.storage.serde._
+import org.apache.s2graph.core.types.{HBaseType, VertexId}
import org.apache.s2graph.core.utils._
+import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
import org.hbase.async._
-import org.apache.s2graph.core.storage.serde._
+
import scala.collection.JavaConversions._
+import scala.concurrent.{ExecutionContext, Future}
object AsynchbaseStorage {
+ import Extensions.DeferOps
+ import CanDefer._
+
val vertexCf = Serializable.vertexCf
val edgeCf = Serializable.edgeCf
+
val emptyKVs = new util.ArrayList[KeyValue]()
+ val emptyKeyValues = new util.ArrayList[KeyValue]()
+ val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]()
+ val emptyStepResult = new util.ArrayList[StepResult]()
AsynchbasePatcher.init()
+
def makeClient(config: Config, overrideKv: (String, String)*) = {
val asyncConfig: org.hbase.async.Config =
if (config.hasPath("hbase.security.auth.enable") && config.getBoolean("hbase.security.auth.enable")) {
@@ -135,6 +150,161 @@ object AsynchbaseStorage {
hbaseExecutor
}
+
+ def fetchKeyValues(client: HBaseClient, rpc: AsyncRPC)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+ val defer = fetchKeyValuesInner(client, rpc)
+ defer.toFuture(emptyKeyValues).map { kvsArr =>
+ kvsArr.map { kv =>
+ implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
+ }
+ }
+ }
+
+ def fetchKeyValuesInner(client: HBaseClient, rpc: AsyncRPC)(implicit ec: ExecutionContext): Deferred[util.ArrayList[KeyValue]] = {
+ rpc match {
+ case Left(get) => client.get(get)
+ case Right(ScanWithRange(scanner, offset, limit)) =>
+ val fallbackFn: (Exception => util.ArrayList[KeyValue]) = { ex =>
+ logger.error(s"fetchKeyValuesInner failed.", ex)
+ scanner.close()
+ emptyKeyValues
+ }
+
+ scanner.nextRows().mapWithFallback(new util.ArrayList[util.ArrayList[KeyValue]]())(fallbackFn) { kvsLs =>
+ val ls = new util.ArrayList[KeyValue]
+ if (kvsLs == null) {
+ } else {
+ kvsLs.foreach { kvs =>
+ if (kvs != null) kvs.foreach { kv => ls.add(kv) }
+ else {
+
+ }
+ }
+ }
+
+ scanner.close()
+ val toIndex = Math.min(ls.size(), offset + limit)
+ new util.ArrayList[KeyValue](ls.subList(offset, toIndex))
+ }
+ case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc"))
+ }
+ }
+
+ def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = {
+ /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
+ hbaseRpc match {
+ case Left(getRequest) => getRequest.key
+ case Right(ScanWithRange(scanner, offset, limit)) =>
+ Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit)))
+ case _ =>
+ logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc")
+ throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc")
+ }
+ }
+
+ def buildRequest(serDe: StorageSerDe, queryRequest: QueryRequest, vertex: S2VertexLike) = {
+ val kvs = serDe.vertexSerializer(vertex).toKeyValues
+ val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf)
+ // get.setTimeout(this.singleGetTimeout.toShort)
+ get.setFailfast(true)
+ get.maxVersions(1)
+
+ Left(get)
+ }
+
+ def buildRequest(client: HBaseClient, serDe: StorageSerDe, queryRequest: QueryRequest, edge: S2EdgeLike) = {
+ val queryParam = queryRequest.queryParam
+ val label = queryParam.label
+
+ val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+ val snapshotEdge = edge.toSnapshotEdge
+ serDe.snapshotEdgeSerializer(snapshotEdge)
+ } else {
+ val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq)
+ serDe.indexEdgeSerializer(indexEdge)
+ }
+
+ val rowKey = serializer.toRowKey
+ val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue))
+
+ val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
+
+ label.schemaVersion match {
+ case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
+ val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName)
+ scanner.setFamily(SKeyValue.EdgeCf)
+
+ /*
+ * TODO: remove this part.
+ */
+ val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq)
+ val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
+
+ val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+ val labelWithDirBytes = indexEdge.labelWithDir.bytes
+ val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
+ val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+
+ val (startKey, stopKey) =
+ if (queryParam.intervalOpt.isDefined) {
+ // interval is set.
+ val _startKey = queryParam.cursorOpt match {
+ case Some(cursor) => Base64.getDecoder.decode(cursor)
+ case None => Bytes.add(baseKey, intervalMaxBytes)
+ }
+ (_startKey , Bytes.add(baseKey, intervalMinBytes))
+ } else {
+ /*
+ * note: since propsToBytes encode size of property map at first byte, we are sure about max value here
+ */
+ val _startKey = queryParam.cursorOpt match {
+ case Some(cursor) => Base64.getDecoder.decode(cursor)
+ case None => baseKey
+ }
+ (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
+ }
+
+ scanner.setStartKey(startKey)
+ scanner.setStopKey(stopKey)
+
+ if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")
+
+ scanner.setMaxVersions(1)
+ // TODO: exclusive condition innerOffset with cursorOpt
+ if (queryParam.cursorOpt.isDefined) {
+ scanner.setMaxNumRows(queryParam.limit)
+ } else {
+ scanner.setMaxNumRows(queryParam.innerOffset + queryParam.innerLimit)
+ }
+ scanner.setMaxTimestamp(maxTs)
+ scanner.setMinTimestamp(minTs)
+ scanner.setRpcTimeout(queryParam.rpcTimeout)
+
+ // SET option for this rpc properly.
+ if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit))
+ else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit))
+
+ case _ =>
+ val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
+ new GetRequest(label.hbaseTableName.getBytes, rowKey, SKeyValue.EdgeCf, serializer.toQualifier)
+ } else {
+ new GetRequest(label.hbaseTableName.getBytes, rowKey, SKeyValue.EdgeCf)
+ }
+
+ get.maxVersions(1)
+ get.setFailfast(true)
+ get.setMinTimestamp(minTs)
+ get.setMaxTimestamp(maxTs)
+ get.setTimeout(queryParam.rpcTimeout)
+
+ val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset)
+ val columnRangeFilterOpt = queryParam.intervalOpt.map { interval =>
+ new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true)
+ }
+ get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL))
+ Left(get)
+ }
+ }
}
@@ -149,11 +319,21 @@ class AsynchbaseStorage(override val graph: S2GraphLike,
val clientWithFlush = AsynchbaseStorage.makeClient(config, "hbase.rpcs.buffered_flush_interval" -> "0")
val clients = Seq(client, clientWithFlush)
- override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients)
+// private lazy val _fetcher = new AsynchbaseStorageFetcher(graph, config, client, serDe, io)
+
+ private lazy val optimisticEdgeFetcher = new AsynchbaseOptimisticEdgeFetcher(client, serDe, io)
+ private lazy val optimisticMutator = new AsynchbaseOptimisticMutator(graph, serDe, optimisticEdgeFetcher, client, clientWithFlush)
+ private lazy val _mutator = new DefaultOptimisticMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator)
+ override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients)
override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph)
- override val reader: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io)
+ override val edgeFetcher: EdgeFetcher = new AsynchbaseEdgeFetcher(graph, config, client, serDe, io)
+ override val edgeBulkFetcher: EdgeBulkFetcher = new AsynchbaseEdgeBulkFetcher(graph, config, client, serDe, io)
+ override val vertexFetcher: VertexFetcher = new AsynchbaseVertexFetcher(graph, config, client, serDe, io)
+ override val vertexBulkFetcher: VertexBulkFetcher = new AsynchbaseVertexBulkFetcher(graph, config, client, serDe, io)
+
+ override val edgeMutator: EdgeMutator = _mutator
+ override val vertexMutator: VertexMutator = _mutator
- override val mutator: Mutator = new AsynchbaseStorageWritable(graph, serDe, reader, client, clientWithFlush)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
deleted file mode 100644
index 0526042..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- *
- */
-
-package org.apache.s2graph.core.storage.hbase
-
-import java.util
-import java.util.Base64
-
-import com.stumbleupon.async.Deferred
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.schema.{Label, ServiceColumn}
-import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.storage.serde._
-import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.{AsyncRPC, ScanWithRange}
-import org.apache.s2graph.core.types.{HBaseType, VertexId}
-import org.apache.s2graph.core.utils.{CanDefer, DeferCache, Extensions, logger}
-import org.hbase.async.FilterList.Operator.MUST_PASS_ALL
-import org.hbase.async._
-
-import scala.collection.JavaConversions._
-import scala.concurrent.{ExecutionContext, Future}
-
-class AsynchbaseStorageReadable(val graph: S2GraphLike,
- val config: Config,
- val client: HBaseClient,
- override val serDe: StorageSerDe,
- override val io: StorageIO) extends StorageReadable {
- import Extensions.DeferOps
- import CanDefer._
-
- private val emptyKeyValues = new util.ArrayList[KeyValue]()
- private val emptyKeyValuesLs = new util.ArrayList[util.ArrayList[KeyValue]]()
- private val emptyStepResult = new util.ArrayList[StepResult]()
-
- /** Future Cache to squash request */
- lazy private val futureCache = new DeferCache[StepResult, Deferred, Deferred](config, StepResult.Empty, "AsyncHbaseFutureCache", useMetric = true)
- /** v4 max next row size */
- private val v4_max_num_rows = 10000
- private def getV4MaxNumRows(limit : Int): Int = {
- if (limit < v4_max_num_rows) limit
- else v4_max_num_rows
- }
-
- /**
- * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues.
- * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build
- * client request(GetRequest, Scanner) based on user provided query.
- *
- * @param queryRequest
- * @return
- */
- private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike) = {
- import Serializable._
- val queryParam = queryRequest.queryParam
- val label = queryParam.label
-
- val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
- val snapshotEdge = edge.toSnapshotEdge
- serDe.snapshotEdgeSerializer(snapshotEdge)
- } else {
- val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq)
- serDe.indexEdgeSerializer(indexEdge)
- }
-
- val rowKey = serializer.toRowKey
- val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue))
-
- val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
-
- label.schemaVersion match {
- case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
- val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName)
- scanner.setFamily(edgeCf)
-
- /*
- * TODO: remove this part.
- */
- val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq)
- val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
-
- val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
- val labelWithDirBytes = indexEdge.labelWithDir.bytes
- val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
- val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
-
- val (startKey, stopKey) =
- if (queryParam.intervalOpt.isDefined) {
- // interval is set.
- val _startKey = queryParam.cursorOpt match {
- case Some(cursor) => Base64.getDecoder.decode(cursor)
- case None => Bytes.add(baseKey, intervalMaxBytes)
- }
- (_startKey , Bytes.add(baseKey, intervalMinBytes))
- } else {
- /*
- * note: since propsToBytes encode size of property map at first byte, we are sure about max value here
- */
- val _startKey = queryParam.cursorOpt match {
- case Some(cursor) => Base64.getDecoder.decode(cursor)
- case None => baseKey
- }
- (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
- }
-
- scanner.setStartKey(startKey)
- scanner.setStopKey(stopKey)
-
- if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")
-
- scanner.setMaxVersions(1)
- // TODO: exclusive condition innerOffset with cursorOpt
- if (queryParam.cursorOpt.isDefined) {
- scanner.setMaxNumRows(getV4MaxNumRows(queryParam.limit))
- } else {
- scanner.setMaxNumRows(getV4MaxNumRows(queryParam.innerOffset + queryParam.innerLimit))
- }
- scanner.setMaxTimestamp(maxTs)
- scanner.setMinTimestamp(minTs)
- scanner.setRpcTimeout(queryParam.rpcTimeout)
-
- // SET option for this rpc properly.
- if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit))
- else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit))
-
- case _ =>
- val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
- new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf, serializer.toQualifier)
- } else {
- new GetRequest(label.hbaseTableName.getBytes, rowKey, edgeCf)
- }
-
- get.maxVersions(1)
- get.setFailfast(true)
- get.setMinTimestamp(minTs)
- get.setMaxTimestamp(maxTs)
- get.setTimeout(queryParam.rpcTimeout)
-
- val pagination = new ColumnPaginationFilter(queryParam.limit, queryParam.offset)
- val columnRangeFilterOpt = queryParam.intervalOpt.map { interval =>
- new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true)
- }
- get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL))
- Left(get)
- }
- }
-
- /**
- *
- * @param queryRequest
- * @param vertex
- * @return
- */
- private def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike) = {
- val kvs = serDe.vertexSerializer(vertex).toKeyValues
- val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf)
- // get.setTimeout(this.singleGetTimeout.toShort)
- get.setFailfast(true)
- get.maxVersions(1)
-
- Left(get)
- }
-
- override def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext) = {
- val rpc = buildRequest(queryRequest, edge)
- fetchKeyValues(rpc)
- }
-
- override def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext) = {
- val rpc = buildRequest(queryRequest, vertex)
- fetchKeyValues(rpc)
- }
-
- /**
- * responsible to fire parallel fetch call into storage and create future that will return merged result.
- *
- * @param queryRequests
- * @param prevStepEdges
- * @return
- */
- override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext) = {
- val defers: Seq[Deferred[StepResult]] = for {
- queryRequest <- queryRequests
- } yield {
- val queryOption = queryRequest.query.queryOption
- val queryParam = queryRequest.queryParam
- val shouldBuildParents = queryOption.returnTree || queryParam.whereHasParent
- val parentEdges = if (shouldBuildParents) prevStepEdges.getOrElse(queryRequest.vertex.id, Nil) else Nil
- fetch(queryRequest, isInnerCall = false, parentEdges)
- }
-
- val grouped: Deferred[util.ArrayList[StepResult]] = Deferred.groupInOrder(defers)
- grouped.map(emptyStepResult) { queryResults: util.ArrayList[StepResult] =>
- queryResults.toSeq
- }.toFuture(emptyStepResult)
- }
-
- def fetchKeyValues(rpc: AsyncRPC)(implicit ec: ExecutionContext) = {
- val defer = fetchKeyValuesInner(rpc)
- defer.toFuture(emptyKeyValues).map { kvsArr =>
- kvsArr.map { kv =>
- implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
- }
- }
- }
-
- override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
- val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
- val distinctLabels = labels.toSet
- val scan = AsynchbasePatcher.newScanner(client, hTableName)
- scan.setFamily(Serializable.edgeCf)
- scan.setMaxVersions(1)
-
- scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
- case null => Seq.empty
- case kvsLs =>
- kvsLs.flatMap { kvs =>
- kvs.flatMap { kv =>
- val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
-
- serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION)
- .fromKeyValues(Seq(kv), None)
- .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
- }
- }
- }
- }
-
- Future.sequence(futures).map(_.flatten)
- }
-
- override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
- val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) =>
- val distinctColumns = columns.toSet
- val scan = AsynchbasePatcher.newScanner(client, hTableName)
- scan.setFamily(Serializable.vertexCf)
- scan.setMaxVersions(1)
-
- scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
- case null => Seq.empty
- case kvsLs =>
- kvsLs.flatMap { kvs =>
- serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs, None)
- .filter(v => distinctColumns(v.serviceColumn))
- }
- }
- }
- Future.sequence(futures).map(_.flatten)
- }
-
-
- /**
- * we are using future cache to squash requests into same key on storage.
- *
- * @param queryRequest
- * @param isInnerCall
- * @param parentEdges
- * @return we use Deferred here since it has much better performrance compared to scala.concurrent.Future.
- * seems like map, flatMap on scala.concurrent.Future is slower than Deferred's addCallback
- */
- private def fetch(queryRequest: QueryRequest,
- isInnerCall: Boolean,
- parentEdges: Seq[EdgeWithScore])(implicit ec: ExecutionContext): Deferred[StepResult] = {
-
- def fetchInner(hbaseRpc: AsyncRPC): Deferred[StepResult] = {
- val prevStepScore = queryRequest.prevStepScore
- val fallbackFn: (Exception => StepResult) = { ex =>
- logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
- StepResult.Failure
- }
-
- val queryParam = queryRequest.queryParam
- fetchKeyValuesInner(hbaseRpc).mapWithFallback(emptyKeyValues)(fallbackFn) { kvs =>
- val (startOffset, len) = queryParam.label.schemaVersion match {
- case HBaseType.VERSION4 =>
- val offset = if (queryParam.cursorOpt.isDefined) 0 else queryParam.offset
- (offset, queryParam.limit)
- case _ => (0, kvs.length)
- }
-
- io.toEdges(kvs, queryRequest, prevStepScore, isInnerCall, parentEdges, startOffset, len)
- }
- }
-
- val queryParam = queryRequest.queryParam
- val cacheTTL = queryParam.cacheTTLInMillis
- /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
-
- val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
- val request = buildRequest(queryRequest, edge)
-
- val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
- val requestCacheKey = Bytes.add(toCacheKeyBytes(request), intervalMaxBytes, intervalMinBytes)
-
- if (cacheTTL <= 0) fetchInner(request)
- else {
- val cacheKeyBytes = Bytes.add(queryRequest.query.queryOption.cacheKeyBytes, requestCacheKey)
-
- // val cacheKeyBytes = toCacheKeyBytes(request)
- val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
- futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
- }
- }
-
- /**
- * Private Methods which is specific to Asynchbase implementation.
- */
- private def fetchKeyValuesInner(rpc: AsyncRPC)(implicit ec: ExecutionContext): Deferred[util.ArrayList[KeyValue]] = {
- rpc match {
- case Left(get) => client.get(get)
- case Right(ScanWithRange(scanner, offset, limit)) =>
- val fallbackFn: (Exception => util.ArrayList[KeyValue]) = { ex =>
- logger.error(s"fetchKeyValuesInner failed.", ex)
- scanner.close()
- emptyKeyValues
- }
-
- scanner.nextRows().mapWithFallback(new util.ArrayList[util.ArrayList[KeyValue]]())(fallbackFn) { kvsLs =>
- val ls = new util.ArrayList[KeyValue]
- if (kvsLs == null) {
- } else {
- kvsLs.foreach { kvs =>
- if (kvs != null) kvs.foreach { kv => ls.add(kv) }
- else {
-
- }
- }
- }
-
- scanner.close()
- val toIndex = Math.min(ls.size(), offset + limit)
- new util.ArrayList[KeyValue](ls.subList(offset, toIndex))
- }
- case _ => Deferred.fromError(new RuntimeException(s"fetchKeyValues failed. $rpc"))
- }
- }
-
- private def toCacheKeyBytes(hbaseRpc: AsyncRPC): Array[Byte] = {
- /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */
- hbaseRpc match {
- case Left(getRequest) => getRequest.key
- case Right(ScanWithRange(scanner, offset, limit)) =>
- Bytes.add(scanner.getCurrentKey, Bytes.add(Bytes.toBytes(offset), Bytes.toBytes(limit)))
- case _ =>
- logger.error(s"toCacheKeyBytes failed. not supported class type. $hbaseRpc")
- throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc")
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
deleted file mode 100644
index b4236b9..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.hbase
-
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core.S2GraphLike
-import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.utils.{Extensions, logger}
-import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, PutRequest}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
-
-class AsynchbaseStorageWritable(val graph: S2GraphLike,
- val serDe: StorageSerDe,
- val reader: StorageReadable,
- val client: HBaseClient,
- val clientWithFlush: HBaseClient) extends DefaultOptimisticMutator(graph, serDe, reader) {
- import Extensions.DeferOps
-
- private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
- /**
- * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
- * note that this should be return true on all success.
- * we assumes that each storage implementation has client as member variable.
- *
- * @param cluster : where this key values should be stored.
- * @param kvs : sequence of SKeyValue that need to be stored in storage.
- * @param withWait : flag to control wait ack from storage.
- * note that in AsynchbaseStorage(which support asynchronous operations), even with true,
- * it never block thread, but rather submit work and notified by event loop when storage send ack back.
- * @return ack message from storage.
- */
- override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = {
- if (kvs.isEmpty) Future.successful(MutateResponse.Success)
- else {
- val _client = client(withWait)
- val (increments, putAndDeletes) = kvs.partition(_.operation == SKeyValue.Increment)
-
- /* Asynchbase IncrementRequest does not implement HasQualifiers */
- val incrementsFutures = increments.map { kv =>
- val countVal = Bytes.toLong(kv.value)
- val request = new AtomicIncrementRequest(kv.table, kv.row, kv.cf, kv.qualifier, countVal)
- val fallbackFn: (Exception => MutateResponse) = { ex =>
- logger.error(s"mutation failed. $request", ex)
- new IncrementResponse(false, -1L, -1L)
- }
- val future = _client.bufferAtomicIncrement(request).mapWithFallback(0L)(fallbackFn) { resultCount: java.lang.Long =>
- new IncrementResponse(true, resultCount.longValue(), countVal)
- }.toFuture(MutateResponse.IncrementFailure)
-
- if (withWait) future else Future.successful(MutateResponse.IncrementSuccess)
- }
-
- /* PutRequest and DeleteRequest accept byte[][] qualifiers/values. */
- val othersFutures = putAndDeletes.groupBy { kv =>
- (kv.table.toSeq, kv.row.toSeq, kv.cf.toSeq, kv.operation, kv.timestamp)
- }.map { case ((table, row, cf, operation, timestamp), groupedKeyValues) =>
-
- val durability = groupedKeyValues.head.durability
- val qualifiers = new ArrayBuffer[Array[Byte]]()
- val values = new ArrayBuffer[Array[Byte]]()
-
- groupedKeyValues.foreach { kv =>
- if (kv.qualifier != null) qualifiers += kv.qualifier
- if (kv.value != null) values += kv.value
- }
- val defer = operation match {
- case SKeyValue.Put =>
- val put = new PutRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, values.toArray, timestamp)
- put.setDurable(durability)
- _client.put(put)
- case SKeyValue.Delete =>
- val delete =
- if (qualifiers.isEmpty)
- new DeleteRequest(table.toArray, row.toArray, cf.toArray, timestamp)
- else
- new DeleteRequest(table.toArray, row.toArray, cf.toArray, qualifiers.toArray, timestamp)
- delete.setDurable(durability)
- _client.delete(delete)
- }
- if (withWait) {
- defer.toFuture(new AnyRef()).map(_ => MutateResponse.Success).recover { case ex: Exception =>
- groupedKeyValues.foreach { kv => logger.error(s"mutation failed. $kv", ex) }
- MutateResponse.Failure
- }
- } else Future.successful(MutateResponse.Success)
- }
- for {
- incrementRets <- Future.sequence(incrementsFutures)
- otherRets <- Future.sequence(othersFutures)
- } yield new MutateResponse(isSuccess = (incrementRets ++ otherRets).forall(_.isSuccess))
- }
- }
-
- /**
- * write requestKeyValue into storage if the current value in storage that is stored matches.
- * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
- *
- * Most important thing is this have to be 'atomic' operation.
- * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
- * either blocked or failed on write-write conflict case.
- *
- * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
- * prevent wrong data for read.
- *
- * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
- * compareAndSet to synchronize.
- *
- * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
- * for storage that does not support concurrency control, then storage implementation
- * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
- * and write(writeLock).
- *
- * @param rpc
- * @param expectedOpt
- * @return
- */
- override def writeLock(rpc: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] = {
- val put = new PutRequest(rpc.table, rpc.row, rpc.cf, rpc.qualifier, rpc.value, rpc.timestamp)
- val expected = expectedOpt.map(_.value).getOrElse(Array.empty)
- client(withWait = true).compareAndSet(put, expected).map(true.booleanValue())(ret => ret.booleanValue()).toFuture(true)
- .map(r => new MutateResponse(isSuccess = r))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
new file mode 100644
index 0000000..e6bf4e6
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.storage.serde.Serializable
+import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.utils.Extensions
+import org.apache.s2graph.core.{S2Graph, S2GraphLike, VertexBulkFetcher}
+import org.hbase.async.HBaseClient
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsynchbaseVertexBulkFetcher(val graph: S2GraphLike,
+ val config: Config,
+ val client: HBaseClient,
+ val serDe: StorageSerDe,
+ val io: StorageIO) extends VertexBulkFetcher {
+
+ import AsynchbaseStorage._
+ import Extensions.DeferOps
+
+ import scala.collection.JavaConverters._
+
+ override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
+ val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) =>
+ val distinctColumns = columns.toSet
+ val scan = AsynchbasePatcher.newScanner(client, hTableName)
+ scan.setFamily(Serializable.vertexCf)
+ scan.setMaxVersions(1)
+
+ scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
+ case null => Seq.empty
+ case kvsLs =>
+ kvsLs.asScala.flatMap { kvs =>
+ serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs.asScala, None)
+ .filter(v => distinctColumns(v.serviceColumn))
+ }
+ }
+ }
+ Future.sequence(futures).map(_.flatten)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
new file mode 100644
index 0000000..560dd2b
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.hbase.async.HBaseClient
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsynchbaseVertexFetcher(val graph: S2GraphLike,
+ val config: Config,
+ val client: HBaseClient,
+ val serDe: StorageSerDe,
+ val io: StorageIO) extends VertexFetcher {
+ import AsynchbaseStorage._
+
+ private def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+ val rpc = buildRequest(serDe, queryRequest, vertex)
+ AsynchbaseStorage.fetchKeyValues(client, rpc)
+ }
+
+ override def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
+ def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
+ if (kvs.isEmpty) Nil
+ else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
+ }
+
+ val futures = vertices.map { vertex =>
+ val queryParam = QueryParam.Empty
+ val q = Query.toQuery(Seq(vertex), Seq(queryParam))
+ val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+
+ fetchKeyValues(queryRequest, vertex).map { kvs =>
+ fromResult(kvs, vertex.serviceColumn.schemaVersion)
+ } recoverWith {
+ case ex: Throwable => Future.successful(Nil)
+ }
+ }
+
+ Future.sequence(futures).map(_.flatten)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
new file mode 100644
index 0000000..2ca4b35
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2GraphLike}
+import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.HBaseType
+import org.rocksdb.RocksDB
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksEdgeBulkFetcher(val graph: S2GraphLike,
+ val config: Config,
+ val db: RocksDB,
+ val vdb: RocksDB,
+ val serDe: StorageSerDe,
+ val io: StorageIO) extends EdgeBulkFetcher {
+ import RocksStorage._
+
+ override def fetchEdgesAll()(implicit ec: ExecutionContext) = {
+ val edges = new ArrayBuffer[S2EdgeLike]()
+ Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) =>
+ val distinctLabels = labels.toSet
+
+ val iter = db.newIterator()
+ try {
+ iter.seekToFirst()
+ while (iter.isValid) {
+ val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis())
+
+ serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
+ .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
+ .foreach { edge =>
+ edges += edge
+ }
+
+
+ iter.next()
+ }
+
+ } finally {
+ iter.close()
+ }
+ }
+
+ Future.successful(edges)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala
new file mode 100644
index 0000000..628c5e1
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeFetcher.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.VertexId
+import org.rocksdb.RocksDB
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksEdgeFetcher(val graph: S2GraphLike,
+ val config: Config,
+ val db: RocksDB,
+ val vdb: RocksDB,
+ val serDe: StorageSerDe,
+ val io: StorageIO) extends EdgeFetcher {
+ import RocksStorage._
+
+ override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
+ val futures = for {
+ queryRequest <- queryRequests
+ } yield {
+ val parentEdges = prevStepEdges.getOrElse(queryRequest.vertex.id, Nil)
+ val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
+ val rpc = buildRequest(graph, serDe, queryRequest, edge)
+ fetchKeyValues(vdb, db, rpc).map { kvs =>
+ val queryParam = queryRequest.queryParam
+ val stepResult = io.toEdges(kvs, queryRequest, queryRequest.prevStepScore, false, parentEdges)
+ val edgeWithScores = stepResult.edgeWithScores.filter { case edgeWithScore =>
+ val edge = edgeWithScore.edge
+ val duration = queryParam.durationOpt.getOrElse((Long.MinValue, Long.MaxValue))
+ edge.ts >= duration._1 && edge.ts < duration._2
+ }
+
+ stepResult.copy(edgeWithScores = edgeWithScores)
+ }
+ }
+
+ Future.sequence(futures)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala
new file mode 100644
index 0000000..6513442
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticEdgeFetcher.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.{QueryRequest, S2EdgeLike, S2GraphLike}
+import org.apache.s2graph.core.storage.{OptimisticEdgeFetcher, SKeyValue, StorageIO, StorageSerDe}
+import org.rocksdb.RocksDB
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksOptimisticEdgeFetcher(val graph: S2GraphLike,
+ val config: Config,
+ val db: RocksDB,
+ val vdb: RocksDB,
+ val serDe: StorageSerDe,
+ val io: StorageIO) extends OptimisticEdgeFetcher {
+
+ override protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+ val request = RocksStorage.buildRequest(graph, serDe, queryRequest, edge)
+
+ RocksStorage.fetchKeyValues(vdb, db, request)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala
new file mode 100644
index 0000000..ec77d3f
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksOptimisticMutator.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.rocks
+
+import java.util.concurrent.locks.ReentrantLock
+
+import com.google.common.cache.{Cache, LoadingCache}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.s2graph.core.S2GraphLike
+import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.utils.logger
+import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class RocksOptimisticMutator(val graph: S2GraphLike,
+ val serDe: StorageSerDe,
+ val optimisticEdgeFetcher: OptimisticEdgeFetcher,
+ val db: RocksDB,
+ val vdb: RocksDB,
+ val lockMap: LoadingCache[String, ReentrantLock]) extends OptimisticMutator {
+
+ override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = {
+ if (kvs.isEmpty) {
+ Future.successful(MutateResponse.Success)
+ } else {
+ val ret = {
+ val (kvsV, kvsE) = kvs.partition(kv => Bytes.equals(kv.cf, SKeyValue.VertexCf))
+ val writeBatchV = buildWriteBatch(kvsV)
+ val writeBatchE = buildWriteBatch(kvsE)
+ val writeOptions = new WriteOptions
+ try {
+ vdb.write(writeOptions, writeBatchV)
+ db.write(writeOptions, writeBatchE)
+ true
+ } catch {
+ case e: Exception =>
+ logger.error(s"writeAsyncSimple failed.", e)
+ false
+ } finally {
+ writeBatchV.close()
+ writeBatchE.close()
+ writeOptions.close()
+ }
+ }
+
+ Future.successful(new MutateResponse(ret))
+ }
+ }
+
+
+ override def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext) = {
+ def op = {
+ val writeOptions = new WriteOptions
+ try {
+ val fetchedValue = db.get(requestKeyValue.row)
+ val innerRet = expectedOpt match {
+ case None =>
+ if (fetchedValue == null) {
+
+ db.put(writeOptions, requestKeyValue.row, requestKeyValue.value)
+ true
+ } else {
+ false
+ }
+ case Some(kv) =>
+ if (fetchedValue == null) {
+ false
+ } else {
+ if (Bytes.compareTo(fetchedValue, kv.value) == 0) {
+ db.put(writeOptions, requestKeyValue.row, requestKeyValue.value)
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ Future.successful(new MutateResponse(innerRet))
+ } catch {
+ case e: RocksDBException =>
+ logger.error(s"Write lock failed", e)
+ Future.successful(MutateResponse.Failure)
+ } finally {
+ writeOptions.close()
+ }
+ }
+
+ withLock(requestKeyValue.row)(op)
+ }
+
+ private def buildWriteBatch(kvs: Seq[SKeyValue]): WriteBatch = {
+ val writeBatch = new WriteBatch()
+ kvs.foreach { kv =>
+ kv.operation match {
+ case SKeyValue.Put => writeBatch.put(kv.row, kv.value)
+ case SKeyValue.Delete => writeBatch.remove(kv.row)
+ case SKeyValue.Increment => writeBatch.merge(kv.row, kv.value)
+ case _ => throw new RuntimeException(s"not supported rpc operation. ${kv.operation}")
+ }
+ }
+ writeBatch
+ }
+
+ private def withLock[A](key: Array[Byte])(op: => A): A = {
+ val lockKey = Bytes.toString(key)
+ val lock = lockMap.get(lockKey)
+
+ try {
+ lock.lock
+ op
+ } finally {
+ lock.unlock()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
index b24e375..8948e13 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala
@@ -19,22 +19,30 @@
package org.apache.s2graph.core.storage.rocks
+import java.util.Base64
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.hash.Hashing
import com.typesafe.config.Config
+import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.{Storage, StorageManagement, StorageReadable, StorageSerDe}
-import org.apache.s2graph.core.storage.rocks.RocksHelper.RocksRPC
+import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange}
+import org.apache.s2graph.core.storage.serde.StorageSerializable
+import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.types.VertexId
import org.apache.s2graph.core.utils.logger
import org.rocksdb._
import org.rocksdb.util.SizeUnit
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Random, Try}
object RocksStorage {
+ val table = Array.emptyByteArray
+ val qualifier = Array.emptyByteArray
RocksDB.loadLibrary()
@@ -129,6 +137,84 @@ object RocksStorage {
throw e
}
}
+
+ def buildRequest(graph: S2GraphLike, serDe: StorageSerDe, queryRequest: QueryRequest, edge: S2EdgeLike): RocksRPC = {
+ queryRequest.queryParam.tgtVertexInnerIdOpt match {
+ case None => // indexEdges
+ val queryParam = queryRequest.queryParam
+ val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
+ val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
+ val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
+ val labelWithDirBytes = indexEdge.labelWithDir.bytes
+ val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
+
+ val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
+
+ val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
+ val (startKey, stopKey) =
+ if (queryParam.intervalOpt.isDefined) {
+ val _startKey = queryParam.cursorOpt match {
+ case Some(cursor) => Base64.getDecoder.decode(cursor)
+ case None => Bytes.add(baseKey, intervalMaxBytes)
+ }
+ (_startKey, Bytes.add(baseKey, intervalMinBytes))
+ } else {
+ val _startKey = queryParam.cursorOpt match {
+ case Some(cursor) => Base64.getDecoder.decode(cursor)
+ case None => baseKey
+ }
+ (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
+ }
+
+ Right(ScanWithRange(SKeyValue.EdgeCf, startKey, stopKey, queryParam.innerOffset, queryParam.innerLimit))
+
+ case Some(tgtId) => // snapshotEdge
+ val kv = serDe.snapshotEdgeSerializer(graph.elementBuilder.toRequestEdge(queryRequest, Nil).toSnapshotEdge).toKeyValues.head
+ Left(GetRequest(SKeyValue.EdgeCf, kv.row))
+ }
+ }
+
+ def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike): RocksRPC = {
+ val startKey = vertex.id.bytes
+ val stopKey = Bytes.add(startKey, Array.fill(1)(Byte.MaxValue))
+
+ Right(ScanWithRange(SKeyValue.VertexCf, startKey, stopKey, 0, Byte.MaxValue))
+ }
+
+ def fetchKeyValues(vdb: RocksDB, db: RocksDB, rpc: RocksRPC)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
+ rpc match {
+ case Left(GetRequest(cf, key)) =>
+ val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db
+ val v = _db.get(key)
+
+ val kvs =
+ if (v == null) Seq.empty
+ else Seq(SKeyValue(table, key, cf, qualifier, v, System.currentTimeMillis()))
+
+ Future.successful(kvs)
+ case Right(ScanWithRange(cf, startKey, stopKey, offset, limit)) =>
+ val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db
+ val kvs = new ArrayBuffer[SKeyValue]()
+ val iter = _db.newIterator()
+
+ try {
+ var idx = 0
+ iter.seek(startKey)
+ val (startOffset, len) = (offset, limit)
+ while (iter.isValid && Bytes.compareTo(iter.key, stopKey) <= 0 && idx < startOffset + len) {
+ if (idx >= startOffset) {
+ kvs += SKeyValue(table, iter.key, cf, qualifier, iter.value, System.currentTimeMillis())
+ }
+
+ iter.next()
+ idx += 1
+ }
+ } finally {
+ iter.close()
+ }
+ Future.successful(kvs)
+ }
+ }
}
class RocksStorage(override val graph: S2GraphLike,
@@ -150,12 +236,18 @@ class RocksStorage(override val graph: S2GraphLike,
.maximumSize(1000 * 10 * 10 * 10 * 10)
.build[String, ReentrantLock](cacheLoader)
- override val management: StorageManagement = new RocksStorageManagement(config, vdb, db)
+ private lazy val optimisticEdgeFetcher = new RocksOptimisticEdgeFetcher(graph, config, db, vdb, serDe, io)
+ private lazy val optimisticMutator = new RocksOptimisticMutator(graph, serDe, optimisticEdgeFetcher, db, vdb, lockMap)
+ private lazy val _mutator = new DefaultOptimisticMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator)
+ override val management: StorageManagement = new RocksStorageManagement(config, vdb, db)
override val serDe: StorageSerDe = new RocksStorageSerDe(graph)
- override val reader: StorageReadable = new RocksStorageReadable(graph, config, db, vdb, serDe, io)
-
- override val mutator: Mutator = new RocksStorageWritable(graph, serDe, reader, db, vdb, lockMap)
+ override val edgeFetcher: EdgeFetcher = new RocksEdgeFetcher(graph, config, db, vdb, serDe, io)
+ override val edgeBulkFetcher: EdgeBulkFetcher = new RocksEdgeBulkFetcher(graph, config, db, vdb, serDe, io)
+ override val vertexFetcher: VertexFetcher = new RocksVertexFetcher(graph, config, db, vdb, serDe, io)
+ override val vertexBulkFetcher: VertexBulkFetcher = new RocksVertexBulkFetcher(graph, config, db, vdb, serDe, io)
+ override val edgeMutator: EdgeMutator = _mutator
+ override val vertexMutator: VertexMutator = _mutator
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
deleted file mode 100644
index 27e3efd..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.rocks
-
-import java.util.Base64
-
-import com.typesafe.config.Config
-import org.apache.hadoop.hbase.util.Bytes
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.schema.{Label, ServiceColumn}
-import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange}
-import org.apache.s2graph.core.storage.serde.StorageSerializable
-import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.types.{HBaseType, VertexId}
-import org.rocksdb.RocksDB
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
-
-class RocksStorageReadable(val graph: S2GraphLike,
- val config: Config,
- val db: RocksDB,
- val vdb: RocksDB,
- val serDe: StorageSerDe,
- override val io: StorageIO) extends StorageReadable {
-
- private val table = Array.emptyByteArray
- private val qualifier = Array.emptyByteArray
-
- private def buildRequest(queryRequest: QueryRequest, edge: S2EdgeLike): RocksRPC = {
- queryRequest.queryParam.tgtVertexInnerIdOpt match {
- case None => // indexEdges
- val queryParam = queryRequest.queryParam
- val indexEdgeOpt = edge.edgesWithIndex.filter(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq).headOption
- val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
- val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
- val labelWithDirBytes = indexEdge.labelWithDir.bytes
- val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
-
- val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
-
- val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
- val (startKey, stopKey) =
- if (queryParam.intervalOpt.isDefined) {
- val _startKey = queryParam.cursorOpt match {
- case Some(cursor) => Base64.getDecoder.decode(cursor)
- case None => Bytes.add(baseKey, intervalMaxBytes)
- }
- (_startKey, Bytes.add(baseKey, intervalMinBytes))
- } else {
- val _startKey = queryParam.cursorOpt match {
- case Some(cursor) => Base64.getDecoder.decode(cursor)
- case None => baseKey
- }
- (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
- }
-
- Right(ScanWithRange(SKeyValue.EdgeCf, startKey, stopKey, queryParam.innerOffset, queryParam.innerLimit))
-
- case Some(tgtId) => // snapshotEdge
- val kv = serDe.snapshotEdgeSerializer(graph.elementBuilder.toRequestEdge(queryRequest, Nil).toSnapshotEdge).toKeyValues.head
- Left(GetRequest(SKeyValue.EdgeCf, kv.row))
- }
- }
-
- private def buildRequest(queryRequest: QueryRequest, vertex: S2VertexLike): RocksRPC = {
- val startKey = vertex.id.bytes
- val stopKey = Bytes.add(startKey, Array.fill(1)(Byte.MaxValue))
-
- Right(ScanWithRange(SKeyValue.VertexCf, startKey, stopKey, 0, Byte.MaxValue))
-// val kv = serDe.vertexSerializer(vertex).toKeyValues.head
-// Left(GetRequest(SKeyValue.VertexCf, kv.row))
- }
-
- override def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
- val futures = for {
- queryRequest <- queryRequests
- } yield {
- val parentEdges = prevStepEdges.getOrElse(queryRequest.vertex.id, Nil)
- val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges)
- val rpc = buildRequest(queryRequest, edge)
- fetchKeyValues(rpc).map { kvs =>
- val queryParam = queryRequest.queryParam
- val stepResult = io.toEdges(kvs, queryRequest, queryRequest.prevStepScore, false, parentEdges)
- val edgeWithScores = stepResult.edgeWithScores.filter { case edgeWithScore =>
- val edge = edgeWithScore.edge
- val duration = queryParam.durationOpt.getOrElse((Long.MinValue, Long.MaxValue))
- edge.ts >= duration._1 && edge.ts < duration._2
- }
-
- stepResult.copy(edgeWithScores = edgeWithScores)
- }
- }
-
- Future.sequence(futures)
- }
-
- private def fetchKeyValues(rpc: RocksRPC)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
- rpc match {
- case Left(GetRequest(cf, key)) =>
- val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db
- val v = _db.get(key)
-
- val kvs =
- if (v == null) Seq.empty
- else Seq(SKeyValue(table, key, cf, qualifier, v, System.currentTimeMillis()))
-
- Future.successful(kvs)
- case Right(ScanWithRange(cf, startKey, stopKey, offset, limit)) =>
- val _db = if (Bytes.equals(cf, SKeyValue.VertexCf)) vdb else db
- val kvs = new ArrayBuffer[SKeyValue]()
- val iter = _db.newIterator()
-
- try {
- var idx = 0
- iter.seek(startKey)
- val (startOffset, len) = (offset, limit)
- while (iter.isValid && Bytes.compareTo(iter.key, stopKey) <= 0 && idx < startOffset + len) {
- if (idx >= startOffset) {
- kvs += SKeyValue(table, iter.key, cf, qualifier, iter.value, System.currentTimeMillis())
- }
-
- iter.next()
- idx += 1
- }
- } finally {
- iter.close()
- }
- Future.successful(kvs)
- }
- }
-
- override def fetchEdgesAll()(implicit ec: ExecutionContext) = {
- val edges = new ArrayBuffer[S2EdgeLike]()
- Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) =>
- val distinctLabels = labels.toSet
-
- val iter = db.newIterator()
- try {
- iter.seekToFirst()
- while (iter.isValid) {
- val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis())
-
- serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
- .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
- .foreach { edge =>
- edges += edge
- }
-
-
- iter.next()
- }
-
- } finally {
- iter.close()
- }
- }
-
- Future.successful(edges)
- }
-
- override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
- import scala.collection.mutable
-
- val vertices = new ArrayBuffer[S2VertexLike]()
- ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.foreach { case (hTableName, columns) =>
- val distinctColumns = columns.toSet
-
- val iter = vdb.newIterator()
- val buffer = mutable.ListBuffer.empty[SKeyValue]
- var oldVertexIdBytes = Array.empty[Byte]
- var minusPos = 0
-
- try {
- iter.seekToFirst()
- while (iter.isValid) {
- val row = iter.key()
- if (!Bytes.equals(oldVertexIdBytes, 0, oldVertexIdBytes.length - minusPos, row, 0, row.length - 1)) {
- if (buffer.nonEmpty)
- serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
- .filter(v => distinctColumns(v.serviceColumn))
- .foreach { vertex =>
- vertices += vertex
- }
-
- oldVertexIdBytes = row
- minusPos = 1
- buffer.clear()
- }
- val kv = SKeyValue(table, iter.key(), SKeyValue.VertexCf, qualifier, iter.value(), System.currentTimeMillis())
- buffer += kv
-
- iter.next()
- }
- if (buffer.nonEmpty)
- serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(buffer, None)
- .filter(v => distinctColumns(v.serviceColumn))
- .foreach { vertex =>
- vertices += vertex
- }
-
- } finally {
- iter.close()
- }
- }
-
- Future.successful(vertices)
- }
-
- override def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext) = {
- fetchKeyValues(buildRequest(queryRequest, edge))
- }
-
- override def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext) = {
- fetchKeyValues(buildRequest(queryRequest, vertex))
- }
-}