You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/11 03:05:44 UTC
[04/11] incubator-s2graph git commit: separate Storage into multiple
small interfaces such as EdgeFetcher/VertexMutator, ...
separate Storage into multiple small interfaces such as EdgeFetcher/VertexMutator, ...
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/43f627e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/43f627e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/43f627e5
Branch: refs/heads/master
Commit: 43f627e551fd0b744a858c9e6e7feba7fd68e58c
Parents: 2357d81
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed May 9 15:58:19 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed May 9 16:56:37 2018 +0900
----------------------------------------------------------------------
.../apache/s2graph/core/EdgeBulkFetcher.scala | 28 ++
.../org/apache/s2graph/core/EdgeFetcher.scala | 35 ++
.../org/apache/s2graph/core/EdgeMutator.scala | 38 ++
.../scala/org/apache/s2graph/core/Fetcher.scala | 36 --
.../org/apache/s2graph/core/Management.scala | 4 +-
.../scala/org/apache/s2graph/core/Mutator.scala | 41 ---
.../scala/org/apache/s2graph/core/S2Graph.scala | 51 ++-
.../org/apache/s2graph/core/S2GraphLike.scala | 25 +-
.../apache/s2graph/core/TraversalHelper.scala | 6 +-
.../apache/s2graph/core/VertexBulkFetcher.scala | 26 ++
.../org/apache/s2graph/core/VertexFetcher.scala | 31 ++
.../org/apache/s2graph/core/VertexMutator.scala | 28 ++
.../s2graph/core/fetcher/FetcherManager.scala | 106 ++++++
.../core/fetcher/MemoryModelEdgeFetcher.scala | 54 +++
.../s2graph/core/model/ImportStatus.scala | 59 ---
.../apache/s2graph/core/model/Importer.scala | 122 ------
.../s2graph/core/model/MemoryModelFetcher.scala | 59 ---
.../s2graph/core/model/ModelManager.scala | 103 ------
.../core/storage/DefaultOptimisticMutator.scala | 190 ++++++++++
.../core/storage/OptimisticEdgeFetcher.scala | 56 +++
.../core/storage/OptimisticMutator.scala | 63 ++++
.../apache/s2graph/core/storage/Storage.scala | 72 +---
.../s2graph/core/storage/StorageReadable.scala | 49 +--
.../s2graph/core/storage/StorageWritable.scala | 65 ----
.../storage/WriteWriteConflictResolver.scala | 6 +-
.../hbase/AsynchbaseEdgeBulkFetcher.scala | 69 ++++
.../storage/hbase/AsynchbaseEdgeFetcher.scala | 120 ++++++
.../hbase/AsynchbaseOptimisticEdgeFetcher.scala | 35 ++
.../hbase/AsynchbaseOptimisticMutator.scala | 142 +++++++
.../core/storage/hbase/AsynchbaseStorage.scala | 188 +++++++++-
.../hbase/AsynchbaseStorageReadable.scala | 367 -------------------
.../hbase/AsynchbaseStorageWritable.scala | 142 -------
.../hbase/AsynchbaseVertexBulkFetcher.scala | 63 ++++
.../storage/hbase/AsynchbaseVertexFetcher.scala | 61 +++
.../storage/rocks/RocksEdgeBulkFetcher.scala | 68 ++++
.../core/storage/rocks/RocksEdgeFetcher.scala | 60 +++
.../rocks/RocksOptimisticEdgeFetcher.scala | 41 +++
.../storage/rocks/RocksOptimisticMutator.scala | 133 +++++++
.../core/storage/rocks/RocksStorage.scala | 104 +++++-
.../storage/rocks/RocksStorageReadable.scala | 234 ------------
.../storage/rocks/RocksStorageWritable.scala | 133 -------
.../storage/rocks/RocksVertexBulkFetcher.scala | 88 +++++
.../core/storage/rocks/RocksVertexFetcher.scala | 61 +++
.../core/storage/serde/MutationHelper.scala | 188 ----------
.../s2graph/core/utils/ImportStatus.scala | 59 +++
.../apache/s2graph/core/utils/Importer.scala | 122 ++++++
.../s2graph/core/fetcher/EdgeFetcherTest.scala | 87 +++++
.../apache/s2graph/core/model/FetcherTest.scala | 87 -----
48 files changed, 2244 insertions(+), 1761 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
new file mode 100644
index 0000000..646f5f4
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import com.typesafe.config.Config
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait EdgeBulkFetcher {
+ def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
new file mode 100644
index 0000000..f28a161
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.types.VertexId
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait EdgeFetcher {
+
+ def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
+
+ def fetches(queryRequests: Seq[QueryRequest],
+ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
+
+ def close(): Unit = {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
new file mode 100644
index 0000000..dc0099e
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.storage.MutateResponse
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait EdgeMutator {
+ def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]]
+
+ def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]]
+
+ def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]]
+
+ def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+ def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+ requestTs: Long,
+ retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean]
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
deleted file mode 100644
index 57d2f29..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.types.VertexId
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait Fetcher {
-
- def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] =
- Future.successful(this)
-
- def fetches(queryRequests: Seq[QueryRequest],
- prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
-
- def close(): Unit = {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 7ff5a9e..9046449 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -19,7 +19,7 @@
package org.apache.s2graph.core
-import java.util
+
import java.util.concurrent.Executors
import com.typesafe.config.{Config, ConfigFactory}
@@ -29,7 +29,7 @@ import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.types.HBaseType._
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.JSONParser._
-import org.apache.s2graph.core.model.Importer
+import org.apache.s2graph.core.utils.Importer
import play.api.libs.json._
import scala.concurrent.{ExecutionContext, Future}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
deleted file mode 100644
index 53161e1..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait Mutator {
- def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
-
- def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]]
-
- def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]]
-
- def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]]
-
- def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse]
-
- def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
- requestTs: Long,
- retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean]
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 4b2274a..c4cb48f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -27,11 +27,11 @@ import com.typesafe.config.{Config, ConfigFactory}
import org.apache.commons.configuration.{BaseConfiguration, Configuration}
import org.apache.s2graph.core.index.IndexProvider
import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
-import org.apache.s2graph.core.model.ModelManager
+import org.apache.s2graph.core.fetcher.FetcherManager
import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
import org.apache.s2graph.core.storage.rocks.RocksStorage
-import org.apache.s2graph.core.storage.{MutateResponse, Storage}
+import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage}
import org.apache.s2graph.core.types._
import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
import org.apache.tinkerpop.gremlin.process.traversal.{P, TraversalStrategies}
@@ -187,7 +187,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
override val management = new Management(this)
- override val modelManager = new ModelManager(this)
+ override val modelManager = new FetcherManager(this)
override val indexProvider = IndexProvider.apply(config)
@@ -251,21 +251,34 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
}
//TODO:
- override def getFetcher(column: ServiceColumn): Fetcher = {
- getStorage(column.service).reader
+ override def getVertexFetcher(column: ServiceColumn): VertexFetcher = {
+ getStorage(column.service).vertexFetcher
+ }
+ override def getVertexBulkFetcher: VertexBulkFetcher = {
+ defaultStorage.vertexBulkFetcher
}
- override def getFetcher(label: Label): Fetcher = {
+ override def getEdgeFetcher(label: Label): EdgeFetcher = {
if (label.fetchConfigExist) modelManager.getFetcher(label)
- else getStorage(label).reader
+ else getStorage(label).edgeFetcher
+ }
+
+ override def getEdgeBulkFetcher: EdgeBulkFetcher = {
+ defaultStorage.edgeBulkFetcher
}
- override def getMutator(column: ServiceColumn): Mutator = {
- getStorage(column.service).mutator
+ override def getVertexMutator(column: ServiceColumn): VertexMutator = {
+ getStorage(column.service).vertexMutator
}
- override def getMutator(label: Label): Mutator = {
- getStorage(label).mutator
+ override def getEdgeMutator(label: Label): EdgeMutator = {
+ getStorage(label).edgeMutator
+ }
+
+ /** optional */
+ override def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher = {
+// getStorage(label).optimisticEdgeFetcher
+ null
}
//TODO:
@@ -296,8 +309,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
override def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = {
val verticesWithIdx = vertices.zipWithIndex
- val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) =>
- getStorage(service).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2)))
+ val futures = verticesWithIdx.groupBy { case (v, idx) => v.serviceColumn }.map { case (serviceColumn, vertexGroup) =>
+ getVertexFetcher(serviceColumn).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2)))
}
Future.sequence(futures).map { ls =>
@@ -309,7 +322,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
val futures = for {
edge <- edges
} yield {
- getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (edgeOpt, _) =>
+ getOptimisticEdgeFetcher(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (edgeOpt, _) =>
edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, edge.innerLabel))
}
}
@@ -324,7 +337,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike],
withWait: Boolean = false): Future[Seq[MutateResponse]] = {
val futures = vertices.map { vertex =>
- getMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, withWait)
+ getVertexMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, withWait)
}
Future.sequence(futures)
}
@@ -351,7 +364,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) =>
val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) =>
- val mutator = getMutator(label)
+ val mutator = getEdgeMutator(label)
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
@@ -369,7 +382,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) =>
val edges = edgeGroup.map(_._1)
val idxs = edgeGroup.map(_._2)
- val mutator = getMutator(label)
+ val mutator = getEdgeMutator(label)
val zkQuorum = label.hbaseZkAddr
mutator.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets =>
@@ -497,7 +510,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
override def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = {
val edgesWithIdx = edges.zipWithIndex
val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) =>
- getMutator(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
+ getEdgeMutator(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2)))
}
Future.sequence(futures).map { ls =>
@@ -507,7 +520,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
override def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = {
val label = edge.innerLabel
- val mutator = getMutator(label)
+ val mutator = getEdgeMutator(label)
mutator.updateDegree(label.hbaseZkAddr, edge, degreeVal)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index 6ed78b0..5e2c168 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -31,9 +31,9 @@ import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
import org.apache.s2graph.core.index.IndexProvider
-import org.apache.s2graph.core.model.ModelManager
+import org.apache.s2graph.core.fetcher.FetcherManager
import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn}
-import org.apache.s2graph.core.storage.{MutateResponse, Storage}
+import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage}
import org.apache.s2graph.core.types.{InnerValLike, VertexId}
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
import org.apache.tinkerpop.gremlin.structure
@@ -69,7 +69,7 @@ trait S2GraphLike extends Graph {
val traversalHelper: TraversalHelper
- val modelManager: ModelManager
+ val modelManager: FetcherManager
lazy val MaxRetryNum: Int = config.getInt("max.retry.number")
lazy val MaxBackOff: Int = config.getInt("max.back.off")
@@ -93,13 +93,20 @@ trait S2GraphLike extends Graph {
def getStorage(label: Label): Storage
- def getFetcher(column: ServiceColumn): Fetcher
+ def getVertexFetcher(column: ServiceColumn): VertexFetcher
- def getFetcher(label: Label): Fetcher
+ def getVertexBulkFetcher(): VertexBulkFetcher
- def getMutator(label: Label): Mutator
+ def getEdgeFetcher(label: Label): EdgeFetcher
- def getMutator(column: ServiceColumn): Mutator
+ def getEdgeBulkFetcher(): EdgeBulkFetcher
+
+ /** optional */
+ def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher
+
+ def getEdgeMutator(label: Label): EdgeMutator
+
+ def getVertexMutator(column: ServiceColumn): VertexMutator
def flushStorage(): Unit
@@ -204,7 +211,7 @@ trait S2GraphLike extends Graph {
if (ids.isEmpty) {
//TODO: default storage need to be fixed.
- Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator
+ Await.result(getVertexBulkFetcher().fetchVerticesAll(), WaitTimeout).iterator
} else {
val vertices = ids.collect {
case s2Vertex: S2VertexLike => s2Vertex
@@ -229,7 +236,7 @@ trait S2GraphLike extends Graph {
def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
if (edgeIds.isEmpty) {
// FIXME
- Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator
+ Await.result(getEdgeBulkFetcher().fetchEdgesAll(), WaitTimeout).iterator
} else {
Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
index d19dd1f..0a4a49b 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala
@@ -204,7 +204,7 @@ class TraversalHelper(graph: S2GraphLike) {
val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) =>
for {
prev <- prevFuture
- cur <- graph.getFetcher(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
+ cur <- graph.getEdgeFetcher(label).fetches(reqWithIdxs.map(_._1), prevStepEdges)
} yield {
prev ++ reqWithIdxs.map(_._2).zip(cur).toMap
}
@@ -256,7 +256,7 @@ class TraversalHelper(graph: S2GraphLike) {
*/
graph.mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess))
} else {
- graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+ graph.getEdgeMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
}
case _ =>
@@ -264,7 +264,7 @@ class TraversalHelper(graph: S2GraphLike) {
* read: x
* write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices)
*/
- graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
+ graph.getEdgeMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum)
}
ret
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
new file mode 100644
index 0000000..cbebab5
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait VertexBulkFetcher {
+ def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
new file mode 100644
index 0000000..5c10d18
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.types.VertexId
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait VertexFetcher {
+ def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
+ def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
+ def close(): Unit = {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
new file mode 100644
index 0000000..18be890
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import org.apache.s2graph.core.storage.MutateResponse
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait VertexMutator {
+ def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
new file mode 100644
index 0000000..26db7ff
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.fetcher
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.utils.{Importer, logger}
+import org.apache.s2graph.core.{EdgeFetcher, S2GraphLike}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+object FetcherManager {
+ val ClassNameKey = "className"
+}
+
+class FetcherManager(s2GraphLike: S2GraphLike) {
+
+ import FetcherManager._
+
+ private val fetcherPool = scala.collection.mutable.Map.empty[String, EdgeFetcher]
+
+ private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, Importer]
+
+ def toImportLockKey(label: Label): String = label.label
+
+ def getFetcher(label: Label): EdgeFetcher = {
+ fetcherPool.getOrElse(toImportLockKey(label), throw new IllegalStateException(s"$label is not imported."))
+ }
+
+ def initImporter(config: Config): Importer = {
+ val className = config.getString(ClassNameKey)
+
+ Class.forName(className)
+ .getConstructor(classOf[S2GraphLike])
+ .newInstance(s2GraphLike)
+ .asInstanceOf[Importer]
+ }
+
+ def initFetcher(config: Config)(implicit ec: ExecutionContext): Future[EdgeFetcher] = {
+ val className = config.getString(ClassNameKey)
+
+ val fetcher = Class.forName(className)
+ .getConstructor(classOf[S2GraphLike])
+ .newInstance(s2GraphLike)
+ .asInstanceOf[EdgeFetcher]
+
+ fetcher.init(config)
+
+ Future.successful(fetcher)
+ }
+
+ def importModel(label: Label, config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
+ val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new java.util.function.Function[String, Importer] {
+ override def apply(k: String): Importer = {
+ val importer = initImporter(config.getConfig("importer"))
+
+ //TODO: Update Label's extra options.
+ importer
+ .run(config.getConfig("importer"))
+ .map { importer =>
+ logger.info(s"Close importer")
+ importer.close()
+
+ initFetcher(config.getConfig("fetcher")).map { fetcher =>
+ importer.setStatus(true)
+
+ fetcherPool
+ .remove(k)
+ .foreach { oldFetcher =>
+ logger.info(s"Delete old storage ($k) => $oldFetcher")
+ oldFetcher.close()
+ }
+
+ fetcherPool += (k -> fetcher)
+ }
+ }
+ .onComplete { _ =>
+ logger.info(s"ImportLock release: $k")
+ ImportLock.remove(k)
+ }
+
+ importer
+ }
+ })
+
+ Future.successful(importer)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
new file mode 100644
index 0000000..bf90d69
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.fetcher
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.types.VertexId
+
+import scala.concurrent.{ExecutionContext, Future}
+
+/**
+ * Reference implementation for Fetcher interface.
+ * it only produce constant edges.
+ */
+class MemoryModelEdgeFetcher(val graph: S2GraphLike) extends EdgeFetcher {
+ val builder = graph.elementBuilder
+ val ranges = (0 until 10)
+
+
+ override def fetches(queryRequests: Seq[QueryRequest],
+ prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
+ val stepResultLs = queryRequests.map { queryRequest =>
+ val queryParam = queryRequest.queryParam
+ val edges = ranges.map { ith =>
+ val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString)
+
+ graph.toEdge(queryRequest.vertex.innerIdVal,
+ tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction)
+ }
+
+ val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label))
+ StepResult(edgeWithScores, Nil, Nil)
+ }
+
+ Future.successful(stepResultLs)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
deleted file mode 100644
index 189a6d0..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.model
-
-import java.util.concurrent.atomic.AtomicInteger
-
-trait ImportStatus {
- val done: AtomicInteger
-
- def isCompleted: Boolean
-
- def percentage: Int
-
- val total: Int
-}
-
-class ImportRunningStatus(val total: Int) extends ImportStatus {
- require(total > 0, s"Total should be positive: $total")
-
- val done = new AtomicInteger(0)
-
- def isCompleted: Boolean = total == done.get
-
- def percentage = 100 * done.get / total
-}
-
-case object ImportDoneStatus extends ImportStatus {
- val total = 1
-
- val done = new AtomicInteger(1)
-
- def isCompleted: Boolean = true
-
- def percentage = 100
-}
-
-object ImportStatus {
- def apply(total: Int): ImportStatus = new ImportRunningStatus(total)
-
- def unapply(importResult: ImportStatus): Option[(Boolean, Int, Int)] =
- Some((importResult.isCompleted, importResult.total, importResult.done.get))
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
deleted file mode 100644
index e3084dd..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.model
-
-import java.io.File
-
-import com.typesafe.config.Config
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.s2graph.core.{Fetcher, S2GraphLike}
-import org.apache.s2graph.core.utils.logger
-
-import scala.concurrent.{ExecutionContext, Future}
-
-object Importer {
- def toHDFSConfiguration(hdfsConfDir: String): Configuration = {
- val conf = new Configuration
-
- val hdfsConfDirectory = new File(hdfsConfDir)
- if (hdfsConfDirectory.exists()) {
- if (!hdfsConfDirectory.isDirectory || !hdfsConfDirectory.canRead) {
- throw new IllegalStateException(s"HDFS configuration directory ($hdfsConfDirectory) cannot be read.")
- }
-
- val path = hdfsConfDirectory.getAbsolutePath
- conf.addResource(new Path(s"file:///$path/core-site.xml"))
- conf.addResource(new Path(s"file:///$path/hdfs-site.xml"))
- } else {
- logger.warn("RocksDBImporter doesn't have valid hadoop configuration directory..")
- }
- conf
- }
-}
-
-trait Importer {
- @volatile var isFinished: Boolean = false
-
- def run(config: Config)(implicit ec: ExecutionContext): Future[Importer]
-
- def status: Boolean = isFinished
-
- def setStatus(otherStatus: Boolean): Boolean = {
- this.isFinished = otherStatus
- this.isFinished
- }
-
- def close(): Unit
-}
-
-case class IdentityImporter(graph: S2GraphLike) extends Importer {
- override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
- Future.successful(this)
- }
-
- override def close(): Unit = {}
-}
-
-object HDFSImporter {
-
- import scala.collection.JavaConverters._
-
- val PathsKey = "paths"
- val HDFSConfDirKey = "hdfsConfDir"
-
- def extractPaths(config: Config): Map[String, String] = {
- config.getConfigList(PathsKey).asScala.map { e =>
- val key = e.getString("src")
- val value = e.getString("tgt")
-
- key -> value
- }.toMap
- }
-}
-
-case class HDFSImporter(graph: S2GraphLike) extends Importer {
-
- import HDFSImporter._
-
- override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
- Future {
- val paths = extractPaths(config)
- val hdfsConfiDir = config.getString(HDFSConfDirKey)
-
- val hadoopConfig = Importer.toHDFSConfiguration(hdfsConfiDir)
- val fs = FileSystem.get(hadoopConfig)
-
- def copyToLocal(remoteSrc: String, localSrc: String): Unit = {
- val remoteSrcPath = new Path(remoteSrc)
- val localSrcPath = new Path(localSrc)
-
- fs.copyToLocalFile(remoteSrcPath, localSrcPath)
- }
-
- paths.foreach { case (srcPath, tgtPath) =>
- copyToLocal(srcPath, tgtPath)
- }
-
- this
- }
- }
-
- // override def status: ImportStatus = ???
-
- override def close(): Unit = {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
deleted file mode 100644
index 2130066..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.model
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.types.VertexId
-
-import scala.concurrent.{ExecutionContext, Future}
-
-/**
- * Reference implementation for Fetcher interface.
- * it only produce constant edges.
- */
-class MemoryModelFetcher(val graph: S2GraphLike) extends Fetcher {
- val builder = graph.elementBuilder
- val ranges = (0 until 10)
-
- override def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = {
- Future.successful(this)
- }
-
- override def fetches(queryRequests: Seq[QueryRequest],
- prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
- val stepResultLs = queryRequests.map { queryRequest =>
- val queryParam = queryRequest.queryParam
- val edges = ranges.map { ith =>
- val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString)
-
- graph.toEdge(queryRequest.vertex.innerIdVal,
- tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction)
- }
-
- val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label))
- StepResult(edgeWithScores, Nil, Nil)
- }
-
- Future.successful(stepResultLs)
- }
-
- override def close(): Unit = {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
deleted file mode 100644
index 3cad13c..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.model
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.core.utils.logger
-import org.apache.s2graph.core.{Fetcher, S2GraphLike}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-object ModelManager {
- val ClassNameKey = "className"
-}
-
-class ModelManager(s2GraphLike: S2GraphLike) {
-
- import ModelManager._
-
- private val fetcherPool = scala.collection.mutable.Map.empty[String, Fetcher]
- private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, Importer]
-
- def toImportLockKey(label: Label): String = label.label
-
- def getFetcher(label: Label): Fetcher = {
- fetcherPool.getOrElse(toImportLockKey(label), throw new IllegalStateException(s"$label is not imported."))
- }
-
- def initImporter(config: Config): Importer = {
- val className = config.getString(ClassNameKey)
-
- Class.forName(className)
- .getConstructor(classOf[S2GraphLike])
- .newInstance(s2GraphLike)
- .asInstanceOf[Importer]
- }
-
- def initFetcher(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = {
- val className = config.getString(ClassNameKey)
-
- val fetcher = Class.forName(className)
- .getConstructor(classOf[S2GraphLike])
- .newInstance(s2GraphLike)
- .asInstanceOf[Fetcher]
-
- fetcher.init(config)
- }
-
- def importModel(label: Label, config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
- val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new java.util.function.Function[String, Importer] {
- override def apply(k: String): Importer = {
- val importer = initImporter(config.getConfig("importer"))
-
- //TODO: Update Label's extra options.
- importer
- .run(config.getConfig("importer"))
- .map { importer =>
- logger.info(s"Close importer")
- importer.close()
-
- initFetcher(config.getConfig("fetcher")).map { fetcher =>
- importer.setStatus(true)
-
- fetcherPool
- .remove(k)
- .foreach { oldFetcher =>
- logger.info(s"Delete old storage ($k) => $oldFetcher")
- oldFetcher.close()
- }
-
- fetcherPool += (k -> fetcher)
- }
- }
- .onComplete { _ =>
- logger.info(s"ImportLock release: $k")
- ImportLock.remove(k)
- }
-
- importer
- }
- })
-
- Future.successful(importer)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
new file mode 100644
index 0000000..82cc27a
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.LabelMeta
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class DefaultOptimisticMutator(graph: S2GraphLike,
+ serDe: StorageSerDe,
+ optimisticEdgeFetcher: OptimisticEdgeFetcher,
+ optimisticMutator: OptimisticMutator) extends VertexMutator with EdgeMutator {
+
+ lazy val io: StorageIO = new StorageIO(graph, serDe)
+
+ lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, optimisticMutator, optimisticEdgeFetcher)
+
+ private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
+ optimisticMutator.writeToStorage(cluster, kvs, withWait)
+
+ def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+ requestTs: Long,
+ retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
+ if (stepInnerResult.isEmpty) Future.successful(true)
+ else {
+ val head = stepInnerResult.edgeWithScores.head
+ val zkQuorum = head.edge.innerLabel.hbaseZkAddr
+ val futures = for {
+ edgeWithScore <- stepInnerResult.edgeWithScores
+ } yield {
+ val edge = edgeWithScore.edge
+
+ val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+ val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+
+ val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+ val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
+ serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ io.buildIncrementsAsync(indexEdge, -1L)
+ }
+
+ /* reverted direction */
+ val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+ val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+ serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ io.buildIncrementsAsync(indexEdge, -1L)
+ }
+
+ val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+
+ writeToStorage(zkQuorum, mutations, withWait = true)
+ }
+
+ Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
+ }
+ }
+
+ def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+ if (vertex.op == GraphUtil.operations("delete")) {
+ writeToStorage(zkQuorum,
+ serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
+ } else if (vertex.op == GraphUtil.operations("deleteAll")) {
+ logger.info(s"deleteAll for vertex is truncated. $vertex")
+ Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
+ } else {
+ writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
+ }
+ }
+
+ def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
+ val mutations = _edges.flatMap { edge =>
+ val (_, edgeUpdate) =
+ if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
+ else S2Edge.buildOperation(None, Seq(edge))
+
+ val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+
+ if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+ io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+ }
+
+ writeToStorage(zkQuorum, mutations, withWait).map { ret =>
+ _edges.zipWithIndex.map { case (edge, idx) =>
+ idx -> ret.isSuccess
+ }
+ }
+ }
+
+ def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+ def mutateEdgesInner(edges: Seq[S2EdgeLike],
+ checkConsistency: Boolean,
+ withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+ assert(edges.nonEmpty)
+ // TODO:: remove after code review: unreachable code
+ if (!checkConsistency) {
+
+ val futures = edges.map { edge =>
+ val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
+
+ val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+ val mutations =
+ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+
+ if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+
+ writeToStorage(zkQuorum, mutations, withWait)
+ }
+ Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
+ } else {
+ optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
+ conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
+ }
+ }
+ }
+
+ val edgeWithIdxs = _edges.zipWithIndex
+ val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
+ (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
+ } toSeq
+
+ val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
+ val edges = edgeGroup.map(_._1)
+ val idxs = edgeGroup.map(_._2)
+ // After deleteAll, process others
+ val mutateEdgeFutures = edges.toList match {
+ case head :: tail =>
+ val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
+
+ //TODO: decide what we will do on failure on vertex put
+ val puts = io.buildVertexPutsAsync(head)
+ val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
+ Seq(edgeFuture, vertexFuture)
+ case Nil => Nil
+ }
+
+ val composed = for {
+ // deleteRet <- Future.sequence(deleteAllFutures)
+ mutateRet <- Future.sequence(mutateEdgeFutures)
+ } yield mutateRet
+
+ composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
+ }
+
+ Future.sequence(mutateEdges).map { squashedRets =>
+ squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
+ }
+ }
+
+ def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
+ val futures = for {
+ edge <- edges
+ } yield {
+ val kvs = for {
+ relEdge <- edge.relatedEdges
+ edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
+ } yield {
+ val countWithTs = edge.propertyValueInner(LabelMeta.count)
+ val countVal = countWithTs.innerVal.toString().toLong
+ io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
+ }
+ writeToStorage(zkQuorum, kvs, withWait = withWait)
+ }
+
+ Future.sequence(futures)
+ }
+
+ def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+ val kvs = io.buildDegreePuts(edge, degreeVal)
+
+ writeToStorage(zkQuorum, kvs, withWait = true)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala
new file mode 100644
index 0000000..4111cc4
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait OptimisticEdgeFetcher {
+ val io: StorageIO
+ protected def fetchKeyValues(queryRequest: QueryRequest,
+ edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+
+ def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = {
+ val queryParam = QueryParam(labelName = edge.innerLabel.label,
+ direction = GraphUtil.fromDirection(edge.getDir()),
+ tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal),
+ cacheTTLInMillis = -1)
+ val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam))
+ val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam)
+
+ fetchKeyValues(queryRequest, edge).map { kvs =>
+ val (edgeOpt, kvOpt) =
+ if (kvs.isEmpty) (None, None)
+ else {
+ import CanSKeyValue._
+ val snapshotEdgeOpt = io.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil)
+ val _kvOpt = kvs.headOption
+ (snapshotEdgeOpt, _kvOpt)
+ }
+ (edgeOpt, kvOpt)
+ } recoverWith { case ex: Throwable =>
+ logger.error(s"fetchQueryParam failed. fallback return.", ex)
+ throw new FetchTimeoutException(s"${edge.toLogString}")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
new file mode 100644
index 0000000..22269df
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage
+
+import scala.concurrent.{ExecutionContext, Future}
+
+trait OptimisticMutator {
+ /**
+ * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
+ * note that this should be return true on all success.
+ * we assumes that each storage implementation has client as member variable.
+ *
+ * @param cluster : where this key values should be stored.
+ * @param kvs : sequence of SKeyValue that need to be stored in storage.
+ * @param withWait : flag to control wait ack from storage.
+ * note that in AsynchbaseStorage(which support asynchronous operations), even with true,
+ * it never block thread, but rather submit work and notified by event loop when storage send ack back.
+ * @return ack message from storage.
+ */
+ def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
+
+ /**
+ * write requestKeyValue into storage if the current value in storage that is stored matches.
+ * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
+ *
+ * Most important thing is this have to be 'atomic' operation.
+ * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
+ * either blocked or failed on write-write conflict case.
+ *
+ * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
+ * prevent wrong data for read.
+ *
+ * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
+ * compareAndSet to synchronize.
+ *
+ * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
+ * for storage that does not support concurrency control, then storage implementation
+ * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
+ * and write(writeLock).
+ *
+ * @param requestKeyValue
+ * @param expectedOpt
+ * @return
+ */
+ def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index d2500a6..36ecfcb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -34,52 +34,30 @@ abstract class Storage(val graph: S2GraphLike,
val management: StorageManagement
/*
- * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage
- * then convert them into Edge/Vertex
- */
- val reader: StorageReadable
-
- /*
- * Serialize Edge/Vertex, to common KeyValue, SKeyValue that
- * can be stored aligned to backend storage's physical schema.
- * Also Deserialize storage backend's KeyValue to SKeyValue.
- */
+ * Serialize Edge/Vertex, to common KeyValue, SKeyValue that
+ * can be stored aligned to backend storage's physical schema.
+ * Also Deserialize storage backend's KeyValue to SKeyValue.
+ */
val serDe: StorageSerDe
- /*
- * Responsible to connect physical storage backend to store GraphElement(Edge/Vertex).
- */
- val mutator: Mutator
+ val edgeFetcher: EdgeFetcher
- /*
- * Common helper to translate SKeyValue to Edge/Vertex and vice versa.
- * Note that it require storage backend specific implementation for serialize/deserialize.
- */
- lazy val io: StorageIO = new StorageIO(graph, serDe)
+ val edgeBulkFetcher: EdgeBulkFetcher
- /*
- * Common helper to resolve write-write conflict on snapshot edge with same EdgeId.
- * Note that it require storage backend specific implementations for
- * all of StorageWritable, StorageReadable, StorageSerDe, StorageIO
- */
-// lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, reader)
-// lazy val mutationHelper: MutationHelper = new MutationHelper(this)
+ val vertexFetcher: VertexFetcher
+ val vertexBulkFetcher: VertexBulkFetcher
- /** Fetch **/
- def fetches(queryRequests: Seq[QueryRequest],
- prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] =
- reader.fetches(queryRequests, prevStepEdges)
+ val edgeMutator: EdgeMutator
- def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] =
- reader.fetchVertices(vertices)
+ val vertexMutator: VertexMutator
- def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = reader.fetchEdgesAll()
-
- def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = reader.fetchVerticesAll()
+ /*
+ * Common helper to translate SKeyValue to Edge/Vertex and vice versa.
+ * Note that it require storage backend specific implementation for serialize/deserialize.
+ */
+ lazy val io: StorageIO = new StorageIO(graph, serDe)
- def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] =
- reader.fetchSnapshotEdgeInner(edge)
/** Management **/
def flush(): Unit = management.flush()
@@ -94,24 +72,4 @@ abstract class Storage(val graph: S2GraphLike,
def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName)
- def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
- requestTs: Long,
- retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] =
- mutator.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum)
-
- def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
- mutator.mutateVertex(zkQuorum: String, vertex, withWait)
-
- def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] =
- mutator.mutateStrongEdges(zkQuorum, _edges, withWait)
-
-
- def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] =
- mutator.mutateWeakEdges(zkQuorum, _edges, withWait)
-
- def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] =
- mutator.incrementCounts(zkQuorum, edges, withWait)
-
- def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] =
- mutator.updateDegree(zkQuorum, edge, degreeVal)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
index b10feb9..c3abd03 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala
@@ -27,7 +27,7 @@ import org.apache.s2graph.core.utils.logger
import scala.concurrent.{ExecutionContext, Future}
-trait StorageReadable extends Fetcher {
+trait StorageReadable extends EdgeFetcher {
val io: StorageIO
val serDe: StorageSerDe
// /**
@@ -44,9 +44,14 @@ trait StorageReadable extends Fetcher {
def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
+
+
+
+
+
protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
- protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
+// protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]]
def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = {
@@ -73,25 +78,25 @@ trait StorageReadable extends Fetcher {
}
}
- def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
- def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
- if (kvs.isEmpty) Nil
- else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
- }
-
- val futures = vertices.map { vertex =>
- val queryParam = QueryParam.Empty
- val q = Query.toQuery(Seq(vertex), Seq(queryParam))
- val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
-
- fetchKeyValues(queryRequest, vertex).map { kvs =>
- fromResult(kvs, vertex.serviceColumn.schemaVersion)
- } recoverWith {
- case ex: Throwable => Future.successful(Nil)
- }
- }
-
- Future.sequence(futures).map(_.flatten)
- }
+// def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = {
+// def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = {
+// if (kvs.isEmpty) Nil
+// else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq
+// }
+//
+// val futures = vertices.map { vertex =>
+// val queryParam = QueryParam.Empty
+// val q = Query.toQuery(Seq(vertex), Seq(queryParam))
+// val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam)
+//
+// fetchKeyValues(queryRequest, vertex).map { kvs =>
+// fromResult(kvs, vertex.serviceColumn.schemaVersion)
+// } recoverWith {
+// case ex: Throwable => Future.successful(Nil)
+// }
+// }
+//
+// Future.sequence(futures).map(_.flatten)
+// }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
deleted file mode 100644
index 8c2fb27..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage
-
-import org.apache.s2graph.core.Mutator
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait OptimisticMutator extends Mutator {
- /**
- * decide how to store given key values Seq[SKeyValue] into storage using storage's client.
- * note that this should be return true on all success.
- * we assumes that each storage implementation has client as member variable.
- *
- * @param cluster : where this key values should be stored.
- * @param kvs : sequence of SKeyValue that need to be stored in storage.
- * @param withWait : flag to control wait ack from storage.
- * note that in AsynchbaseStorage(which support asynchronous operations), even with true,
- * it never block thread, but rather submit work and notified by event loop when storage send ack back.
- * @return ack message from storage.
- */
- def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
-
- /**
- * write requestKeyValue into storage if the current value in storage that is stored matches.
- * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge.
- *
- * Most important thing is this have to be 'atomic' operation.
- * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be
- * either blocked or failed on write-write conflict case.
- *
- * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to
- * prevent wrong data for read.
- *
- * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction,
- * compareAndSet to synchronize.
- *
- * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'.
- * for storage that does not support concurrency control, then storage implementation
- * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues)
- * and write(writeLock).
- *
- * @param requestKeyValue
- * @param expectedOpt
- * @return
- */
- def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
index bfc5bc6..18159f6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala
@@ -32,7 +32,7 @@ class WriteWriteConflictResolver(graph: S2GraphLike,
serDe: StorageSerDe,
io: StorageIO,
mutator: OptimisticMutator,
- fetcher: StorageReadable) {
+ optimisticEdgeFetcher: OptimisticEdgeFetcher) {
val BackoffTimeout = graph.BackoffTimeout
val MaxRetryNum = graph.MaxRetryNum
val MaxBackOff = graph.MaxBackOff
@@ -68,7 +68,7 @@ class WriteWriteConflictResolver(graph: S2GraphLike,
case FetchTimeoutException(retryEdge) =>
logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}")
/* fetch failed. re-fetch should be done */
- fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
+ optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
@@ -90,7 +90,7 @@ class WriteWriteConflictResolver(graph: S2GraphLike,
val future = if (failedStatusCode == 0) {
// acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge.
/* fetch failed. re-fetch should be done */
- fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
+ optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt)
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
new file mode 100644
index 0000000..3d25dd9
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage.hbase
+
+import java.util
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.storage.serde.Serializable
+import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2Graph, S2GraphLike}
+import org.apache.s2graph.core.storage.{CanSKeyValue, StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.utils.{CanDefer, Extensions}
+import org.hbase.async.{HBaseClient, KeyValue}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class AsynchbaseEdgeBulkFetcher(val graph: S2GraphLike,
+ val config: Config,
+ val client: HBaseClient,
+ val serDe: StorageSerDe,
+ val io: StorageIO) extends EdgeBulkFetcher {
+ import Extensions.DeferOps
+ import CanDefer._
+ import scala.collection.JavaConverters._
+ import AsynchbaseStorage._
+
+ override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
+ val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
+ val distinctLabels = labels.toSet
+ val scan = AsynchbasePatcher.newScanner(client, hTableName)
+ scan.setFamily(Serializable.edgeCf)
+ scan.setMaxVersions(1)
+
+ scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
+ case null => Seq.empty
+ case kvsLs =>
+ kvsLs.asScala.flatMap { kvs =>
+ kvs.asScala.flatMap { kv =>
+ val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
+
+ serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION)
+ .fromKeyValues(Seq(kv), None)
+ .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
+ }
+ }
+ }
+ }
+
+ Future.sequence(futures).map(_.flatten)
+ }
+}