You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2017/10/30 10:23:34 UTC
[6/8] incubator-s2graph git commit: remove type parameter on Storage
Trait (#13)
remove type parameter on Storage Trait (#13)
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/55d194ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/55d194ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/55d194ed
Branch: refs/heads/master
Commit: 55d194ed8edfb0ec78ce5723c379c20fa7462632
Parents: f416194
Author: daewon <da...@apache.org>
Authored: Mon Oct 30 17:07:48 2017 +0900
Committer: Doyung Yoon <st...@apache.org>
Committed: Mon Oct 30 10:07:48 2017 +0200
----------------------------------------------------------------------
.../scala/org/apache/s2graph/core/S2Graph.scala | 54 ++++++--------------
.../apache/s2graph/core/storage/SKeyValue.scala | 13 +++--
.../apache/s2graph/core/storage/Storage.scala | 16 ++----
.../s2graph/core/storage/StorageReadable.scala | 30 +++++------
.../storage/WriteWriteConflictResolver.scala | 6 +--
.../core/storage/hbase/AsynchbaseStorage.scala | 5 +-
.../hbase/AsynchbaseStorageReadable.scala | 46 ++++++++++++++---
7 files changed, 84 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 92f68dc..34db9e4 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -133,7 +133,7 @@ object S2Graph {
new S2Graph(configuration)(ec)
}
- def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_] = {
+ def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage = {
val storageBackend = config.getString("s2graph.storage.backend")
logger.info(s"[InitStorage]: $storageBackend")
@@ -908,7 +908,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
/**
* TODO: we need to some way to handle malformed configuration for storage.
*/
- val storagePool: scala.collection.mutable.Map[String, Storage[_]] = {
+ val storagePool: scala.collection.mutable.Map[String, Storage] = {
val labels = Label.findAll()
val services = Service.findAll()
@@ -919,12 +919,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
confWithFallback(conf)
}.toSet
- val pools = new java.util.HashMap[Config, Storage[_]]()
+ val pools = new java.util.HashMap[Config, Storage]()
configs.foreach { config =>
pools.put(config, S2Graph.initStorage(this, config)(ec))
}
- val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_]]()
+ val m = new java.util.concurrent.ConcurrentHashMap[String, Storage]()
labels.foreach { label =>
if (label.storageConfigOpt.isDefined) {
@@ -941,7 +941,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
m
}
- val defaultStorage: Storage[_] = S2Graph.initStorage(this, config)(ec)
+ val defaultStorage: Storage = S2Graph.initStorage(this, config)(ec)
/** QueryLevel FutureCache */
val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty)
@@ -953,11 +953,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
val indexProvider = IndexProvider.apply(config)
- def getStorage(service: Service): Storage[_] = {
+ def getStorage(service: Service): Storage = {
storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage)
}
- def getStorage(label: Label): Storage[_] = {
+ def getStorage(label: Label): Storage = {
storagePool.getOrElse(s"label:${label.label}", defaultStorage)
}
@@ -975,7 +975,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
val futures = for {
edge <- edges
} yield {
- getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (_, edgeOpt, _) =>
+ getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (edgeOpt, _) =>
edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, edge.innerLabel))
}
}
@@ -1145,31 +1145,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
- def getVertices[Q](storage: Storage[Q])(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = {
- def fromResult(kvs: Seq[SKeyValue],
- version: String): Option[S2Vertex] = {
- if (kvs.isEmpty) None
- else storage.vertexDeserializer(version).fromKeyValues(kvs, None)
- // .map(S2Vertex(graph, _))
- }
-
- val futures = vertices.map { vertex =>
- val queryParam = QueryParam.Empty
- val q = Query.toQuery(Seq(vertex), Seq(queryParam))
- val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
- val rpc = storage.buildRequest(queryRequest, vertex)
- storage.fetchKeyValues(rpc).map { kvs =>
- fromResult(kvs, vertex.serviceColumn.schemaVersion)
- } recoverWith { case ex: Throwable =>
- Future.successful(None)
- }
- }
-
- Future.sequence(futures).map { result => result.toList.flatten }
- }
val verticesWithIdx = vertices.zipWithIndex
val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
- getVertices(getStorage(service))(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2)))
+ getStorage(service).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2)))
}
Future.sequence(futures).map { ls =>
@@ -1272,7 +1250,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
}
- private def deleteAllFetchedEdgesAsyncOld(storage: Storage[_])(stepInnerResult: StepResult,
+ private def deleteAllFetchedEdgesAsyncOld(storage: Storage)(stepInnerResult: StepResult,
requestTs: Long,
retryNum: Int): Future[Boolean] = {
if (stepInnerResult.isEmpty) Future.successful(true)
@@ -1426,7 +1404,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
}
- private def mutateStrongEdges(storage: Storage[_])(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = {
+ private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = {
val edgeWithIdxs = _edges.zipWithIndex
val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
@@ -1462,7 +1440,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
- private def mutateEdgesInner(storage: Storage[_])(edges: Seq[S2Edge],
+ private def mutateEdgesInner(storage: Storage)(edges: Seq[S2Edge],
checkConsistency: Boolean,
withWait: Boolean): Future[MutateResponse] = {
assert(edges.nonEmpty)
@@ -1483,14 +1461,14 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
} else {
- storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
+ storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
storage.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
}
}
}
def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[MutateResponse]] = {
- def mutateVertex(storage: Storage[_])(vertex: S2Vertex, withWait: Boolean): Future[MutateResponse] = {
+ def mutateVertex(storage: Storage)(vertex: S2Vertex, withWait: Boolean): Future[MutateResponse] = {
if (vertex.op == GraphUtil.operations("delete")) {
storage.writeToStorage(vertex.hbaseZkAddr,
storage.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
@@ -1502,7 +1480,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
}
}
- def mutateVertices(storage: Storage[_])(vertices: Seq[S2Vertex],
+ def mutateVertices(storage: Storage)(vertices: Seq[S2Vertex],
withWait: Boolean = false): Future[Seq[MutateResponse]] = {
val futures = vertices.map { vertex => mutateVertex(storage)(vertex, withWait) }
Future.sequence(futures)
@@ -1518,7 +1496,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph
def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
- def incrementCounts(storage: Storage[_])(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
+ def incrementCounts(storage: Storage)(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = {
val futures = for {
edge <- edges
} yield {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
index 924d9a3..775afda 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala
@@ -56,18 +56,17 @@ trait CanSKeyValue[T] {
}
object CanSKeyValue {
+ def instance[T](f: T => SKeyValue): CanSKeyValue[T] = new CanSKeyValue[T] {
+ override def toSKeyValue(from: T): SKeyValue = f.apply(from)
+ }
// For asyncbase KeyValues
- implicit val asyncKeyValue = new CanSKeyValue[KeyValue] {
- def toSKeyValue(kv: KeyValue): SKeyValue = {
- SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), kv.value(), kv.timestamp())
- }
+ implicit val asyncKeyValue = instance[KeyValue] { kv =>
+ SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), kv.value(), kv.timestamp())
}
// For asyncbase KeyValues
- implicit val sKeyValue = new CanSKeyValue[SKeyValue] {
- def toSKeyValue(kv: SKeyValue): SKeyValue = kv
- }
+ implicit val sKeyValue = instance[SKeyValue](identity)
// For hbase KeyValues
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 2fe6e42..e4eafbf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -27,7 +27,7 @@ import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializa
import org.apache.s2graph.core.types._
import scala.concurrent.{ExecutionContext, Future}
-abstract class Storage[Q](val graph: S2Graph,
+abstract class Storage(val graph: S2Graph,
val config: Config) {
/* Storage backend specific resource management */
val management: StorageManagement
@@ -39,7 +39,7 @@ abstract class Storage[Q](val graph: S2Graph,
* Given QueryRequest/Vertex/Edge, fetch KeyValue from storage
* then convert them into Edge/Vertex
*/
- val fetcher: StorageReadable[Q]
+ val fetcher: StorageReadable
/*
* Serialize Edge/Vertex, to common KeyValue, SKeyValue that
@@ -61,7 +61,6 @@ abstract class Storage[Q](val graph: S2Graph,
*/
lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher)
-
/** IO **/
def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] =
serDe.snapshotEdgeSerializer(snapshotEdge)
@@ -115,22 +114,18 @@ abstract class Storage[Q](val graph: S2Graph,
mutator.writeLock(requestKeyValue, expectedOpt)
/** Fetch **/
-
- def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q = fetcher.buildRequest(queryRequest, edge)
-
- def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex): Q = fetcher.buildRequest(queryRequest, vertex)
-
def fetches(queryRequests: Seq[QueryRequest],
prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] =
fetcher.fetches(queryRequests, prevStepEdges)
- def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = fetcher.fetchKeyValues(rpc)
+ def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] =
+ fetcher.fetchVertices(vertices)
def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]] = fetcher.fetchEdgesAll()
def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]] = fetcher.fetchVerticesAll()
- def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] =
+ def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] =
fetcher.fetchSnapshotEdgeInner(edge)
/** Conflict Resolver **/
@@ -149,6 +144,5 @@ abstract class Storage[Q](val graph: S2Graph,
def shutdown(): Unit = management.shutdown()
-
def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index 7a0d8ef..03b01fd 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -26,21 +26,9 @@ import org.apache.s2graph.core.utils.logger
import scala.concurrent.{ExecutionContext, Future}
-trait StorageReadable[Q] {
+trait StorageReadable {
val io: StorageIO
- /**
- * build proper request which is specific into storage to call fetchIndexEdgeKeyValues or fetchSnapshotEdgeKeyValues.
- * for example, Asynchbase use GetRequest, Scanner so this method is responsible to build
- * client request(GetRequest, Scanner) based on user provided query.
- *
- * @param queryRequest
- * @return
- */
- def buildRequest(queryRequest: QueryRequest, edge: S2Edge): Q
-
- def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex): Q
-
- /**
+ /**
* responsible to fire parallel fetch call into storage and create future that will return merged result.
*
* @param queryRequests
@@ -50,13 +38,18 @@ trait StorageReadable[Q] {
def fetches(queryRequests: Seq[QueryRequest],
prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
- def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+// private def fetchKeyValues(rpc: Q)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+ def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext): Future[Seq[S2Vertex]]
def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2Edge]]
def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2Vertex]]
- def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = {
+ protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+
+ protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2Vertex)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+
+ def fetchSnapshotEdgeInner(edge: S2Edge)(implicit ec: ExecutionContext): Future[(Option[S2Edge], Option[SKeyValue])] = {
val queryParam = QueryParam(labelName = edge.innerLabel.label,
direction = GraphUtil.fromDirection(edge.labelWithDir.dir),
tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
@@ -64,15 +57,16 @@ trait StorageReadable[Q] {
val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
- fetchKeyValues(buildRequest(queryRequest, edge)).map { kvs =>
+ fetchKeyValues(queryRequest, edge).map { kvs =>
val (edgeOpt, kvOpt) =
if (kvs.isEmpty) (None, None)
else {
+ import CanSKeyValue._
val snapshotEdgeOpt = io.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
val _kvOpt = kvs.headOption
(snapshotEdgeOpt, _kvOpt)
}
- (queryParam, edgeOpt, kvOpt)
+ (edgeOpt, kvOpt)
} recoverWith { case ex: Throwable =>
logger.error(s"fetchQueryParam failed. fallback return.", ex)
throw new FetchTimeoutException(s"${edge.toLogString}")
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
index 79b764d..227cfa7 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
@@ -32,7 +32,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
serDe: StorageSerDe,
io: StorageIO,
mutator: StorageWritable,
- fetcher: StorageReadable[_]) {
+ fetcher: StorageReadable) {
val BackoffTimeout = graph.BackoffTimeout
val MaxRetryNum = graph.MaxRetryNum
@@ -69,7 +69,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
case FetchTimeoutException(retryEdge) =>
logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
/* fetch failed. re-fetch should be done */
- fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
+ fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
@@ -91,7 +91,7 @@ class WriteWriteConflictResolver(graph: S2Graph,
val future = if (failedStatusCode == 0) {
// acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
/* fetch failed. re-fetch should be done */
- fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) =>
+ fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index 54007d5..ef1350a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -27,7 +27,6 @@ import com.typesafe.config.Config
import org.apache.commons.io.FileUtils
import org.apache.s2graph.core._
import org.apache.s2graph.core.storage._
-import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage.AsyncRPC
import org.apache.s2graph.core.utils._
import org.hbase.async._
import org.apache.s2graph.core.storage.serde._
@@ -140,7 +139,7 @@ object AsynchbaseStorage {
class AsynchbaseStorage(override val graph: S2Graph,
- override val config: Config) extends Storage[AsyncRPC](graph, config) {
+ override val config: Config) extends Storage(graph, config) {
/**
* since some runtime environment such as spark cluster has issue with guava version, that is used in Asynchbase.
@@ -156,7 +155,7 @@ class AsynchbaseStorage(override val graph: S2Graph,
override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph)
- override val fetcher: StorageReadable[AsyncRPC] = new AsynchbaseStorageReadable(graph, config, client, serDe, io)
+ override val fetcher: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io)
// val hbaseExecutor: ExecutorService =
// if (config.getString("hbase.zookeeper.quorum") == "localhost")
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/55d194ed/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
index 1cb6109..0dc8491 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala
@@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ *
*/
package org.apache.s2graph.core.storage.hbase
@@ -42,7 +42,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
val config: Config,
val client: HBaseClient,
val serDe: StorageSerDe,
- override val io: StorageIO) extends StorageReadable[AsyncRPC] {
+ override val io: StorageIO) extends StorageReadable {
import Extensions.DeferOps
import CanDefer._
@@ -67,7 +67,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
* @param queryRequest
* @return
*/
- override def buildRequest(queryRequest: QueryRequest, edge: S2Edge) = {
+ private def buildRequest(queryRequest: QueryRequest, edge: S2Edge) = {
import Serializable._
val queryParam = queryRequest.queryParam
val label = queryParam.label
@@ -168,15 +168,26 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
* @param vertex
* @return
*/
- override def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex) = {
+ private def buildRequest(queryRequest: QueryRequest, vertex: S2Vertex) = {
val kvs = serDe.vertexSerializer(vertex).toKeyValues
val get = new GetRequest(vertex.hbaseTableName.getBytes, kvs.head.row, Serializable.vertexCf)
// get.setTimeout(this.singleGetTimeout.toShort)
get.setFailfast(true)
get.maxVersions(1)
+
Left(get)
}
+ override def fetchKeyValues(queryRequest: QueryRequest, edge: S2Edge)(implicit ec: ExecutionContext) = {
+ val rpc = buildRequest(queryRequest, edge)
+ fetchKeyValues(rpc)
+ }
+
+ override def fetchKeyValues(queryRequest: QueryRequest, vertex: S2Vertex)(implicit ec: ExecutionContext) = {
+ val rpc = buildRequest(queryRequest, vertex)
+ fetchKeyValues(rpc)
+ }
+
/**
* responsible to fire parallel fetch call into storage and create future that will return merged result.
*
@@ -201,7 +212,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
}.toFuture(emptyStepResult)
}
- override def fetchKeyValues(rpc: AsyncRPC)(implicit ec: ExecutionContext) = {
+ def fetchKeyValues(rpc: AsyncRPC)(implicit ec: ExecutionContext) = {
val defer = fetchKeyValuesInner(rpc)
defer.toFuture(emptyKeyValues).map { kvsArr =>
kvsArr.map { kv =>
@@ -224,7 +235,8 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
kvs.flatMap { kv =>
val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
- serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
+ serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION)
+ .fromKeyValues(Seq(kv), None)
.filter(e => distinctLabels(e.innerLabel) && e.direction == "out" && !e.isDegree)
}
}
@@ -234,6 +246,27 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
Future.sequence(futures).map(_.flatten)
}
+ override def fetchVertices(vertices: Seq[S2Vertex])(implicit ec: ExecutionContext) = {
+ def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2Vertex] = {
+ if (kvs.isEmpty) Nil
+ else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
+ }
+
+ val futures = vertices.map { vertex =>
+ val queryParam = QueryParam.Empty
+ val q = Query.toQuery(Seq(vertex), Seq(queryParam))
+ val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+
+ fetchKeyValues(queryRequest, vertex).map { kvs =>
+ fromResult(kvs, vertex.serviceColumn.schemaVersion)
+ } recoverWith {
+ case ex: Throwable => Future.successful(Nil)
+ }
+ }
+
+ Future.sequence(futures).map(_.flatten)
+ }
+
override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) =>
val distinctColumns = columns.toSet
@@ -351,4 +384,5 @@ class AsynchbaseStorageReadable(val graph: S2Graph,
throw new RuntimeException(s"toCacheKeyBytes: $hbaseRpc")
}
}
+
}