You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/11 03:05:46 UTC
[06/11] incubator-s2graph git commit: Start implement ResourceManager.
Start implement ResourceManager.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/be83d07c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/be83d07c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/be83d07c
Branch: refs/heads/master
Commit: be83d07ca5ecb271bd3678c38734d1176182c286
Parents: 43f627e
Author: DO YUNG YOON <st...@apache.org>
Authored: Wed May 9 19:25:15 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Wed May 9 19:25:15 2018 +0900
----------------------------------------------------------------------
.../org/apache/s2graph/core/schema/schema.sql | 1 +
.../apache/s2graph/core/EdgeBulkFetcher.scala | 28 ---
.../org/apache/s2graph/core/EdgeFetcher.scala | 3 +
.../org/apache/s2graph/core/EdgeMutator.scala | 7 +
.../org/apache/s2graph/core/Management.scala | 57 +++++-
.../apache/s2graph/core/ResourceManager.scala | 130 +++++++++++++
.../scala/org/apache/s2graph/core/S2Graph.scala | 49 ++---
.../apache/s2graph/core/S2GraphFactory.scala | 2 +-
.../org/apache/s2graph/core/S2GraphLike.scala | 38 ++--
.../apache/s2graph/core/VertexBulkFetcher.scala | 26 ---
.../org/apache/s2graph/core/VertexFetcher.scala | 4 +
.../org/apache/s2graph/core/VertexMutator.scala | 5 +
.../s2graph/core/fetcher/FetcherManager.scala | 106 -----------
.../core/fetcher/MemoryModelEdgeFetcher.scala | 26 ++-
.../apache/s2graph/core/io/Conversions.scala | 6 +-
.../org/apache/s2graph/core/schema/Label.scala | 34 ++--
.../s2graph/core/schema/ServiceColumn.scala | 47 ++++-
.../storage/DefaultOptimisticEdgeMutator.scala | 176 +++++++++++++++++
.../core/storage/DefaultOptimisticMutator.scala | 190 -------------------
.../DefaultOptimisticVertexMutator.scala | 44 +++++
.../apache/s2graph/core/storage/Storage.scala | 4 -
.../hbase/AsynchbaseEdgeBulkFetcher.scala | 69 -------
.../storage/hbase/AsynchbaseEdgeFetcher.scala | 31 ++-
.../core/storage/hbase/AsynchbaseStorage.scala | 7 +-
.../hbase/AsynchbaseVertexBulkFetcher.scala | 63 ------
.../storage/hbase/AsynchbaseVertexFetcher.scala | 26 +++
.../storage/rocks/RocksEdgeBulkFetcher.scala | 68 -------
.../core/storage/rocks/RocksEdgeFetcher.scala | 35 +++-
.../core/storage/rocks/RocksStorage.scala | 7 +-
.../storage/rocks/RocksVertexBulkFetcher.scala | 88 ---------
.../core/storage/rocks/RocksVertexFetcher.scala | 53 ++++++
.../s2graph/core/utils/SafeUpdateCache.scala | 6 +-
.../s2graph/core/fetcher/EdgeFetcherTest.scala | 12 +-
.../apache/s2graph/graphql/GraphQLServer.scala | 8 +-
.../org/apache/s2graph/graphql/HttpServer.scala | 4 +-
35 files changed, 710 insertions(+), 750 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql b/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
index 6b9b71e..4f7f832 100644
--- a/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
+++ b/s2core/src/main/resources/org/apache/s2graph/core/schema/schema.sql
@@ -48,6 +48,7 @@ CREATE TABLE `service_columns` (
`column_name` varchar(64) NOT NULL,
`column_type` varchar(8) NOT NULL,
`schema_version` varchar(8) NOT NULL default 'v2',
+ `options` text,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_service_id_column_name` (`service_id`, `column_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
deleted file mode 100644
index 646f5f4..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import com.typesafe.config.Config
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait EdgeBulkFetcher {
- def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
index f28a161..c3760e0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala
@@ -31,5 +31,8 @@ trait EdgeFetcher {
def fetches(queryRequests: Seq[QueryRequest],
prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]]
+ def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]]
+
def close(): Unit = {}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
index dc0099e..252d129 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala
@@ -19,11 +19,13 @@
package org.apache.s2graph.core
+import com.typesafe.config.Config
import org.apache.s2graph.core.storage.MutateResponse
import scala.concurrent.{ExecutionContext, Future}
trait EdgeMutator {
+
def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]]
def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]]
@@ -35,4 +37,9 @@ trait EdgeMutator {
def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
requestTs: Long,
retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean]
+
+ def close(): Unit = {}
+
+ def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 9046449..c3aef7a 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -95,14 +95,15 @@ object Management {
columnName: String,
columnType: String,
props: Seq[Prop],
- schemaVersion: String = DEFAULT_VERSION) = {
+ schemaVersion: String = DEFAULT_VERSION,
+ options: Option[String] = None) = {
Schema withTx { implicit session =>
val serviceOpt = Service.findByName(serviceName, useCache = false)
serviceOpt match {
case None => throw new RuntimeException(s"create service $serviceName has not been created.")
case Some(service) =>
- val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false)
+ val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, options, useCache = false)
for {
Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props
} yield {
@@ -304,13 +305,50 @@ class Management(graph: S2GraphLike) {
import Management._
- def importModel(labelName: String, options: String): Future[Importer] = {
- Label.updateOption(labelName, options)
+ def updateEdgeFetcher(labelName: String, options: String): Unit = {
+ val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
- val label = Label.findByName(labelName, false).getOrElse(throw new LabelNotExistException(labelName))
- val config = ConfigFactory.parseString(options)
+ updateEdgeFetcher(label, options)
+ }
+
+ def updateEdgeFetcher(label: Label, options: String): Unit = {
+ val newLabel = Label.updateOption(label, options)
+ graph.resourceManager.getOrElseUpdateEdgeFetcher(newLabel, forceUpdate = true)
+ }
+
+ def updateVertexFetcher(serviceName: String, columnName: String, options: String): Unit = {
+ val service = Service.findByName(serviceName).getOrElse(throw new IllegalArgumentException(s"$serviceName is not exist."))
+ val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new IllegalArgumentException(s"$columnName is not exist."))
+
+ updateVertexFetcher(column, options)
+ }
+
+ def updateVertexFetcher(column: ServiceColumn, options: String): Unit = {
+ val newColumn = ServiceColumn.updateOption(column, options)
+ graph.resourceManager.getOrElseUpdateVertexFetcher(newColumn, forceUpdate = true)
+ }
+
+ def updateEdgeMutator(labelName: String, options: String): Unit = {
+ val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName))
+
+ updateEdgeMutator(label, options)
+ }
+
+ def updateEdgeMutator(label: Label, options: String): Unit = {
+ val newLabel = Label.updateOption(label, options)
+ graph.resourceManager.getOrElseUpdateEdgeMutator(newLabel, forceUpdate = true)
+ }
+
+ def updateVertexMutator(serviceName: String, columnName: String, options: String): Unit = {
+ val service = Service.findByName(serviceName).getOrElse(throw new IllegalArgumentException(s"$serviceName is not exist."))
+ val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new IllegalArgumentException(s"$columnName is not exist."))
+
+ updateVertexMutator(column, options)
+ }
- graph.modelManager.importModel(label, config)(importEx)
+ def updateVertexMutator(column: ServiceColumn, options: String): Unit = {
+ val newColumn = ServiceColumn.updateOption(column, options)
+ graph.resourceManager.getOrElseUpdateVertexMutator(newColumn, forceUpdate = true)
}
def createStorageTable(zkAddr: String,
@@ -375,14 +413,15 @@ class Management(graph: S2GraphLike) {
columnName: String,
columnType: String,
props: Seq[Prop],
- schemaVersion: String = DEFAULT_VERSION): ServiceColumn = {
+ schemaVersion: String = DEFAULT_VERSION,
+ options: Option[String] = None): ServiceColumn = {
val serviceColumnTry = Schema withTx { implicit session =>
val serviceOpt = Service.findByName(serviceName, useCache = false)
serviceOpt match {
case None => throw new RuntimeException(s"create service $serviceName has not been created.")
case Some(service) =>
- val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, useCache = false)
+ val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion, options, useCache = false)
for {
Prop(propName, defaultValue, dataType, storeInGlobalIndex) <- props
} yield {
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
new file mode 100644
index 0000000..b877603
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.s2graph.core.schema.{Label, ServiceColumn}
+import org.apache.s2graph.core.utils.SafeUpdateCache
+import scala.concurrent.ExecutionContext
+
+
+object ResourceManager {
+
+ import SafeUpdateCache._
+
+ import scala.collection.JavaConverters._
+
+ val ClassNameKey = "className"
+ val EdgeFetcherKey = classOf[EdgeFetcher].getClass().getName
+
+ val VertexFetcherKey = classOf[VertexFetcher].getClass().getName
+
+ val EdgeMutatorKey = classOf[EdgeMutator].getClass.getName
+ val VertexMutatorKey = classOf[VertexMutator].getClass.getName
+
+ val DefaultConfig = ConfigFactory.parseMap(Map(MaxSizeKey -> 1000, TtlKey -> -1).asJava)
+}
+
+class ResourceManager(graph: S2GraphLike,
+ _config: Config)(implicit ec: ExecutionContext) {
+
+ import ResourceManager._
+
+ import scala.collection.JavaConverters._
+
+ val cache = new SafeUpdateCache(_config)
+
+ def getAllVertexFetchers(): Seq[VertexFetcher] = {
+ cache.asMap().asScala.toSeq.collect { case (_, (obj: VertexFetcher, _, _)) => obj }
+ }
+
+ def getAllEdgeFetchers(): Seq[EdgeFetcher] = {
+ cache.asMap().asScala.toSeq.collect { case (_, (obj: EdgeFetcher, _, _)) => obj }
+ }
+
+ def getOrElseUpdateVertexFetcher(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexFetcher] = {
+ val cacheKey = VertexFetcherKey + "_" + column.service.serviceName + "_" + column.columnName
+ cache.withCache(cacheKey, false, forceUpdate) {
+ column.toFetcherConfig.map { fetcherConfig =>
+ val className = fetcherConfig.getString(ClassNameKey)
+ val fetcher = Class.forName(className)
+ .getConstructor(classOf[S2GraphLike])
+ .newInstance(graph)
+ .asInstanceOf[VertexFetcher]
+
+ fetcher.init(fetcherConfig)
+
+ fetcher
+ }
+ }
+ }
+
+ def getOrElseUpdateEdgeFetcher(label: Label, forceUpdate: Boolean = false): Option[EdgeFetcher] = {
+ val cacheKey = EdgeFetcherKey + "_" + label.label
+
+ cache.withCache(cacheKey, false, forceUpdate) {
+ label.toFetcherConfig.map { fetcherConfig =>
+ val className = fetcherConfig.getString(ClassNameKey)
+ val fetcher = Class.forName(className)
+ .getConstructor(classOf[S2GraphLike])
+ .newInstance(graph)
+ .asInstanceOf[EdgeFetcher]
+
+ fetcher.init(fetcherConfig)
+
+ fetcher
+ }
+ }
+ }
+
+ def getOrElseUpdateVertexMutator(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexMutator] = {
+ val cacheKey = VertexMutatorKey + "_" + column.service.serviceName + "_" + column.columnName
+ cache.withCache(cacheKey, false, forceUpdate) {
+ column.toMutatorConfig.map { mutatorConfig =>
+ val className = mutatorConfig.getString(ClassNameKey)
+ val fetcher = Class.forName(className)
+ .getConstructor(classOf[S2GraphLike])
+ .newInstance(graph)
+ .asInstanceOf[VertexMutator]
+
+ fetcher.init(mutatorConfig)
+
+ fetcher
+ }
+ }
+ }
+
+ def getOrElseUpdateEdgeMutator(label: Label, forceUpdate: Boolean = false): Option[EdgeMutator] = {
+ val cacheKey = EdgeMutatorKey + "_" + label.label
+ cache.withCache(cacheKey, false, forceUpdate) {
+ label.toMutatorConfig.map { mutatorConfig =>
+ val className = mutatorConfig.getString(ClassNameKey)
+ val fetcher = Class.forName(className)
+ .getConstructor(classOf[S2GraphLike])
+ .newInstance(graph)
+ .asInstanceOf[EdgeMutator]
+
+ fetcher.init(mutatorConfig)
+
+ fetcher
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index c4cb48f..09fd55e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -20,30 +20,27 @@
package org.apache.s2graph.core
import java.util
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{ExecutorService, Executors}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.commons.configuration.{BaseConfiguration, Configuration}
import org.apache.s2graph.core.index.IndexProvider
import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy
-import org.apache.s2graph.core.fetcher.FetcherManager
import org.apache.s2graph.core.schema._
import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage
import org.apache.s2graph.core.storage.rocks.RocksStorage
import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage}
import org.apache.s2graph.core.types._
-import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger}
-import org.apache.tinkerpop.gremlin.process.traversal.{P, TraversalStrategies}
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer
+import org.apache.s2graph.core.utils.{Extensions, Importer, logger}
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies
import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph}
import scala.collection.JavaConversions._
import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
-import scala.concurrent.duration.Duration
-import scala.util.{Random, Try}
+import scala.util.Try
object S2Graph {
@@ -94,6 +91,7 @@ object S2Graph {
val numOfThread = Runtime.getRuntime.availableProcessors()
val threadPool = Executors.newFixedThreadPool(numOfThread)
val ec = ExecutionContext.fromExecutor(threadPool)
+ val resourceManagerEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numOfThread))
val DefaultServiceName = ""
val DefaultColumnName = "vertex"
@@ -187,7 +185,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
override val management = new Management(this)
- override val modelManager = new FetcherManager(this)
+ override val resourceManager: ResourceManager = new ResourceManager(this, config)(S2Graph.resourceManagerEc)
override val indexProvider = IndexProvider.apply(config)
@@ -250,32 +248,39 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
storagePool.getOrElse(s"label:${label.label}", defaultStorage)
}
- //TODO:
+ /* Currently, each getter on Fetcher and Mutator missing proper implementation
+ * Please discuss what is proper way to maintain resources here and provide
+ * right implementation(S2GRAPH-213).
+ * */
override def getVertexFetcher(column: ServiceColumn): VertexFetcher = {
- getStorage(column.service).vertexFetcher
- }
- override def getVertexBulkFetcher: VertexBulkFetcher = {
- defaultStorage.vertexBulkFetcher
+ resourceManager.getOrElseUpdateVertexFetcher(column)
+ .getOrElse(defaultStorage.vertexFetcher)
}
override def getEdgeFetcher(label: Label): EdgeFetcher = {
- if (label.fetchConfigExist) modelManager.getFetcher(label)
- else getStorage(label).edgeFetcher
+ resourceManager.getOrElseUpdateEdgeFetcher(label)
+ .getOrElse(defaultStorage.edgeFetcher)
}
- override def getEdgeBulkFetcher: EdgeBulkFetcher = {
- defaultStorage.edgeBulkFetcher
+ override def getAllVertexFetchers(): Seq[VertexFetcher] = {
+ resourceManager.getAllVertexFetchers()
+ }
+
+ override def getAllEdgeFetchers(): Seq[EdgeFetcher] = {
+ resourceManager.getAllEdgeFetchers()
}
override def getVertexMutator(column: ServiceColumn): VertexMutator = {
- getStorage(column.service).vertexMutator
+ resourceManager.getOrElseUpdateVertexMutator(column)
+ .getOrElse(defaultStorage.vertexMutator)
}
override def getEdgeMutator(label: Label): EdgeMutator = {
- getStorage(label).edgeMutator
+ resourceManager.getOrElseUpdateEdgeMutator(label)
+ .getOrElse(defaultStorage.edgeMutator)
}
- /** optional */
+ //TODO:
override def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher = {
// getStorage(label).optimisticEdgeFetcher
null
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
index cce05af..0a667ef 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphFactory.scala
@@ -66,7 +66,7 @@ object S2GraphFactory {
val DefaultService = management.createService(DefaultServiceName, "localhost", "s2graph", 0, None).get
// Management.deleteColumn(DefaultServiceName, DefaultColumnName)
- val DefaultColumn = ServiceColumn.findOrInsert(DefaultService.id.get, DefaultColumnName, Some("integer"), HBaseType.DEFAULT_VERSION, useCache = false)
+ val DefaultColumn = ServiceColumn.findOrInsert(DefaultService.id.get, DefaultColumnName, Some("integer"), HBaseType.DEFAULT_VERSION, options = None, useCache = false)
val DefaultColumnMetas = {
ColumnMeta.findOrInsert(DefaultColumn.id.get, "test", "string", "-", storeInGlobalIndex = true, useCache = false)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
index 5e2c168..99423d6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala
@@ -19,22 +19,20 @@
package org.apache.s2graph.core
-import java.util
-import java.util.concurrent.{CompletableFuture, TimeUnit}
-import java.util.concurrent.atomic.AtomicLong
import java.lang.{Boolean => JBoolean, Long => JLong}
+import java.util
import java.util.Optional
+import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.{CompletableFuture, TimeUnit}
import com.typesafe.config.Config
import org.apache.commons.configuration.Configuration
import org.apache.s2graph.core.GraphExceptions.LabelNotExistException
-import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName}
import org.apache.s2graph.core.features.{S2Features, S2GraphVariables}
import org.apache.s2graph.core.index.IndexProvider
-import org.apache.s2graph.core.fetcher.FetcherManager
import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn}
import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage}
-import org.apache.s2graph.core.types.{InnerValLike, VertexId}
+import org.apache.s2graph.core.types.VertexId
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer
import org.apache.tinkerpop.gremlin.structure
import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions
@@ -44,10 +42,10 @@ import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Element, Graph,
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, Future}
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
trait S2GraphLike extends Graph {
@@ -69,7 +67,7 @@ trait S2GraphLike extends Graph {
val traversalHelper: TraversalHelper
- val modelManager: FetcherManager
+ val resourceManager: ResourceManager
lazy val MaxRetryNum: Int = config.getInt("max.retry.number")
lazy val MaxBackOff: Int = config.getInt("max.back.off")
@@ -95,11 +93,11 @@ trait S2GraphLike extends Graph {
def getVertexFetcher(column: ServiceColumn): VertexFetcher
- def getVertexBulkFetcher(): VertexBulkFetcher
-
def getEdgeFetcher(label: Label): EdgeFetcher
- def getEdgeBulkFetcher(): EdgeBulkFetcher
+ def getAllVertexFetchers(): Seq[VertexFetcher]
+
+ def getAllEdgeFetchers(): Seq[EdgeFetcher]
/** optional */
def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher
@@ -211,7 +209,13 @@ trait S2GraphLike extends Graph {
if (ids.isEmpty) {
//TODO: default storage need to be fixed.
- Await.result(getVertexBulkFetcher().fetchVerticesAll(), WaitTimeout).iterator
+ val futures = getAllVertexFetchers.map { vertexFetcher =>
+ vertexFetcher.fetchVerticesAll()
+ }
+
+ val future = Future.sequence(futures)
+
+ Await.result(future, WaitTimeout).flatten.iterator
} else {
val vertices = ids.collect {
case s2Vertex: S2VertexLike => s2Vertex
@@ -236,7 +240,13 @@ trait S2GraphLike extends Graph {
def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = {
if (edgeIds.isEmpty) {
// FIXME
- Await.result(getEdgeBulkFetcher().fetchEdgesAll(), WaitTimeout).iterator
+ val futures = getAllEdgeFetchers().map { edgeFetcher =>
+ edgeFetcher.fetchEdgesAll()
+ }
+
+ val future = Future.sequence(futures)
+
+ Await.result(future, WaitTimeout).flatten.iterator
} else {
Await.result(edgesAsync(edgeIds: _*), WaitTimeout)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
deleted file mode 100644
index cbebab5..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core
-
-import scala.concurrent.{ExecutionContext, Future}
-
-trait VertexBulkFetcher {
- def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
index 5c10d18..b641e7f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala
@@ -26,6 +26,10 @@ import scala.concurrent.{ExecutionContext, Future}
trait VertexFetcher {
def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
+
def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
+
+ def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]]
+
def close(): Unit = {}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
index 18be890..d1c8ecf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala
@@ -19,10 +19,15 @@
package org.apache.s2graph.core
+import com.typesafe.config.Config
import org.apache.s2graph.core.storage.MutateResponse
import scala.concurrent.{ExecutionContext, Future}
trait VertexMutator {
+ def close(): Unit = {}
+
+ def init(config: Config)(implicit ec: ExecutionContext): Unit = {}
+
def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse]
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
deleted file mode 100644
index 26db7ff..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.fetcher
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.core.utils.{Importer, logger}
-import org.apache.s2graph.core.{EdgeFetcher, S2GraphLike}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-object FetcherManager {
- val ClassNameKey = "className"
-}
-
-class FetcherManager(s2GraphLike: S2GraphLike) {
-
- import FetcherManager._
-
- private val fetcherPool = scala.collection.mutable.Map.empty[String, EdgeFetcher]
-
- private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, Importer]
-
- def toImportLockKey(label: Label): String = label.label
-
- def getFetcher(label: Label): EdgeFetcher = {
- fetcherPool.getOrElse(toImportLockKey(label), throw new IllegalStateException(s"$label is not imported."))
- }
-
- def initImporter(config: Config): Importer = {
- val className = config.getString(ClassNameKey)
-
- Class.forName(className)
- .getConstructor(classOf[S2GraphLike])
- .newInstance(s2GraphLike)
- .asInstanceOf[Importer]
- }
-
- def initFetcher(config: Config)(implicit ec: ExecutionContext): Future[EdgeFetcher] = {
- val className = config.getString(ClassNameKey)
-
- val fetcher = Class.forName(className)
- .getConstructor(classOf[S2GraphLike])
- .newInstance(s2GraphLike)
- .asInstanceOf[EdgeFetcher]
-
- fetcher.init(config)
-
- Future.successful(fetcher)
- }
-
- def importModel(label: Label, config: Config)(implicit ec: ExecutionContext): Future[Importer] = {
- val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new java.util.function.Function[String, Importer] {
- override def apply(k: String): Importer = {
- val importer = initImporter(config.getConfig("importer"))
-
- //TODO: Update Label's extra options.
- importer
- .run(config.getConfig("importer"))
- .map { importer =>
- logger.info(s"Close importer")
- importer.close()
-
- initFetcher(config.getConfig("fetcher")).map { fetcher =>
- importer.setStatus(true)
-
- fetcherPool
- .remove(k)
- .foreach { oldFetcher =>
- logger.info(s"Delete old storage ($k) => $oldFetcher")
- oldFetcher.close()
- }
-
- fetcherPool += (k -> fetcher)
- }
- }
- .onComplete { _ =>
- logger.info(s"ImportLock release: $k")
- ImportLock.remove(k)
- }
-
- importer
- }
- })
-
- Future.successful(importer)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
index bf90d69..110d615 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala
@@ -37,18 +37,26 @@ class MemoryModelEdgeFetcher(val graph: S2GraphLike) extends EdgeFetcher {
override def fetches(queryRequests: Seq[QueryRequest],
prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = {
val stepResultLs = queryRequests.map { queryRequest =>
- val queryParam = queryRequest.queryParam
- val edges = ranges.map { ith =>
- val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString)
+ toEdges(queryRequest)
+ }
- graph.toEdge(queryRequest.vertex.innerIdVal,
- tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction)
- }
+ Future.successful(stepResultLs)
+ }
- val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label))
- StepResult(edgeWithScores, Nil, Nil)
+ override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
+ Future.successful(Nil)
+ }
+
+ private def toEdges(queryRequest: QueryRequest) = {
+ val queryParam = queryRequest.queryParam
+ val edges = ranges.map { ith =>
+ val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString)
+
+ graph.toEdge(queryRequest.vertex.innerIdVal,
+ tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction)
}
- Future.successful(stepResultLs)
+ val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label))
+ StepResult(edgeWithScores, Nil, Nil)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
index 15f1231..948cdd8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/io/Conversions.scala
@@ -75,7 +75,8 @@ object Conversions {
(JsPath \ "serviceId").read[Int] and
(JsPath \ "columnName").read[String] and
(JsPath \ "columnType").read[String] and
- (JsPath \ "schemaVersion").read[String]
+ (JsPath \ "schemaVersion").read[String] and
+ (JsPath \ "options").readNullable[String]
)(ServiceColumn.apply _)
implicit val serviceColumnWrites: Writes[ServiceColumn] = (
@@ -83,7 +84,8 @@ object Conversions {
(JsPath \ "serviceId").write[Int] and
(JsPath \ "columnName").write[String] and
(JsPath \ "columnType").write[String] and
- (JsPath \ "schemaVersion").write[String]
+ (JsPath \ "schemaVersion").write[String] and
+ (JsPath \ "options").writeNullable[String]
)(unlift(ServiceColumn.unapply))
implicit val columnMetaReads: Reads[ColumnMeta] = (
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
index cca1769..f3ce5e0 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
@@ -60,7 +60,9 @@ object Label extends SQLSyntaxSupport[Label] {
select *
from labels
where label = ${labelName}
- and deleted_at is null """.map { rs => Label(rs) }.single.apply()
+ and deleted_at is null """.map { rs =>
+ Label(rs)
+ }.single.apply()
if (useCache) withCache(cacheKey)(labelOpt)
else labelOpt
@@ -109,15 +111,17 @@ object Label extends SQLSyntaxSupport[Label] {
.map { rs => Label(rs) }.single.apply())
}
- def findById(id: Int)(implicit session: DBSession = AutoSession): Label = {
+ def findById(id: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Label = {
val cacheKey = className + "id=" + id
- withCache(cacheKey)(
- sql"""
+ lazy val sql = sql"""
select *
from labels
where id = ${id}
and deleted_at is null"""
- .map { rs => Label(rs) }.single.apply()).get
+ .map { rs => Label(rs) }.single.apply()
+
+ if (useCache) withCache(cacheKey)(sql).get
+ else sql.get
}
def findByTgtColumnId(columnId: Int)(implicit session: DBSession = AutoSession): List[Label] = {
@@ -191,8 +195,8 @@ object Label extends SQLSyntaxSupport[Label] {
val serviceId = service.id.get
/** insert serviceColumn */
- val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion)
- val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion)
+ val srcCol = ServiceColumn.findOrInsert(srcServiceId, srcColumnName, Some(srcColumnType), schemaVersion, None)
+ val tgtCol = ServiceColumn.findOrInsert(tgtServiceId, tgtColumnName, Some(tgtColumnType), schemaVersion, None)
if (srcCol.columnType != srcColumnType) throw new RuntimeException(s"source service column type not matched ${srcCol.columnType} != ${srcColumnType}")
if (tgtCol.columnType != tgtColumnType) throw new RuntimeException(s"target service column type not matched ${tgtCol.columnType} != ${tgtColumnType}")
@@ -259,18 +263,18 @@ object Label extends SQLSyntaxSupport[Label] {
cnt
}
- def updateOption(labelName: String, options: String)(implicit session: DBSession = AutoSession) = {
+ def updateOption(label: Label, options: String)(implicit session: DBSession = AutoSession) = {
scala.util.Try(Json.parse(options)).getOrElse(throw new RuntimeException("invalid Json option"))
- logger.info(s"update options of label $labelName, ${options}")
- val cnt = sql"""update labels set options = $options where label = $labelName""".update().apply()
- val label = Label.findByName(labelName, useCache = false).get
+ logger.info(s"update options of label ${label.label}, ${options}")
+ val cnt = sql"""update labels set options = $options where id = ${label.id.get}""".update().apply()
+ val updatedLabel = findById(label.id.get, useCache = false)
val cacheKeys = List(s"id=${label.id.get}", s"label=${label.label}")
cacheKeys.foreach { key =>
expireCache(className + key)
expireCaches(className + key)
}
- cnt
+ updatedLabel
}
def delete(id: Int)(implicit session: DBSession = AutoSession) = {
@@ -390,12 +394,14 @@ case class Label(id: Option[Int], label: String,
lazy val storageConfigOpt: Option[Config] = toStorageConfig
- lazy val fetchConfigExist: Boolean = toFetcherConfig.isDefined
-
def toFetcherConfig: Option[Config] = {
Schema.toConfig(extraOptions, "fetcher")
}
+ def toMutatorConfig: Option[Config] = {
+ Schema.toConfig(extraOptions, "mutator")
+ }
+
def toStorageConfig: Option[Config] = {
Schema.toConfig(extraOptions, "storage")
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
index cc1698a..61f1a09 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/ServiceColumn.scala
@@ -19,9 +19,11 @@
package org.apache.s2graph.core.schema
+import com.typesafe.config.Config
import org.apache.s2graph.core.JSONParser
import org.apache.s2graph.core.JSONParser._
import org.apache.s2graph.core.types.{HBaseType, InnerValLike, InnerValLikeWithTs}
+import org.apache.s2graph.core.utils.logger
import play.api.libs.json.Json
import scalikejdbc._
@@ -29,10 +31,11 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] {
import Schema._
val className = ServiceColumn.getClass.getSimpleName
- val Default = ServiceColumn(Option(0), -1, "default", "string", "v4")
+ val Default = ServiceColumn(Option(0), -1, "default", "string", "v4", None)
def valueOf(rs: WrappedResultSet): ServiceColumn = {
- ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version"))
+ ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"),
+ rs.string("column_type").toLowerCase(), rs.string("schema_version"), rs.stringOpt("options"))
}
def findByServiceId(serviceId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Seq[ServiceColumn] = {
@@ -65,9 +68,9 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] {
""".map { rs => ServiceColumn.valueOf(rs) }.single.apply()
}
}
- def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String)(implicit session: DBSession = AutoSession) = {
- sql"""insert into service_columns(service_id, column_name, column_type, schema_version)
- values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion})""".execute.apply()
+ def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String, options: Option[String])(implicit session: DBSession = AutoSession) = {
+ sql"""insert into service_columns(service_id, column_name, column_type, schema_version, options)
+ values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion}, ${options})""".execute.apply()
}
def delete(id: Int)(implicit session: DBSession = AutoSession) = {
val serviceColumn = findById(id, useCache = false)
@@ -79,11 +82,14 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] {
expireCaches(className + key)
}
}
- def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String = HBaseType.DEFAULT_VERSION, useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
+ def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String],
+ schemaVersion: String = HBaseType.DEFAULT_VERSION,
+ options: Option[String],
+ useCache: Boolean = true)(implicit session: DBSession = AutoSession): ServiceColumn = {
find(serviceId, columnName, useCache) match {
case Some(sc) => sc
case None =>
- insert(serviceId, columnName, columnType, schemaVersion)
+ insert(serviceId, columnName, columnType, schemaVersion, options)
// val cacheKey = s"serviceId=$serviceId:columnName=$columnName"
val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName
expireCache(className + cacheKey)
@@ -101,12 +107,29 @@ object ServiceColumn extends SQLSyntaxSupport[ServiceColumn] {
ls
}
+ def updateOption(serviceColumn: ServiceColumn, options: String)(implicit session: DBSession = AutoSession) = {
+ scala.util.Try(Json.parse(options)).getOrElse(throw new RuntimeException("invalid Json option"))
+ logger.info(s"update options of service column ${serviceColumn.service.serviceName} ${serviceColumn.columnName}, ${options}")
+ val cnt = sql"""update service_columns set options = $options where id = ${serviceColumn.id.get}""".update().apply()
+ val column = findById(serviceColumn.id.get, useCache = false)
+
+ val cacheKeys = List(s"id=${column.id.get}",
+ s"serviceId=${serviceColumn.serviceId}:columnName=${serviceColumn.columnName}")
+
+ cacheKeys.foreach { key =>
+ expireCache(className + key)
+ expireCaches(className + key)
+ }
+
+ column
+ }
}
case class ServiceColumn(id: Option[Int],
serviceId: Int,
columnName: String,
columnType: String,
- schemaVersion: String) {
+ schemaVersion: String,
+ options: Option[String]) {
lazy val service = Service.findById(serviceId)
lazy val metasWithoutCache = ColumnMeta.timestamp +: ColumnMeta.findAllByColumn(id.get, false) :+ ColumnMeta.lastModifiedAtColumn
@@ -147,5 +170,13 @@ case class ServiceColumn(id: Option[Int],
}
}
+ lazy val extraOptions = Schema.extraOptions(options)
+ def toFetcherConfig: Option[Config] = {
+ Schema.toConfig(extraOptions, "fetcher")
+ }
+
+ def toMutatorConfig: Option[Config] = {
+ Schema.toConfig(extraOptions, "mutator")
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala
new file mode 100644
index 0000000..4deecf5
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticEdgeMutator.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.LabelMeta
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class DefaultOptimisticEdgeMutator(graph: S2GraphLike,
+ serDe: StorageSerDe,
+ optimisticEdgeFetcher: OptimisticEdgeFetcher,
+ optimisticMutator: OptimisticMutator,
+ io: StorageIO) extends EdgeMutator {
+ lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, optimisticMutator, optimisticEdgeFetcher)
+
+ private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
+ optimisticMutator.writeToStorage(cluster, kvs, withWait)
+
+ override def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+ requestTs: Long,
+ retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
+ if (stepInnerResult.isEmpty) Future.successful(true)
+ else {
+ val head = stepInnerResult.edgeWithScores.head
+ val zkQuorum = head.edge.innerLabel.hbaseZkAddr
+ val futures = for {
+ edgeWithScore <- stepInnerResult.edgeWithScores
+ } yield {
+ val edge = edgeWithScore.edge
+
+ val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+ val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
+
+ val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+ val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
+ serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ io.buildIncrementsAsync(indexEdge, -1L)
+ }
+
+ /* reverted direction */
+ val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
+ val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+ serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
+ io.buildIncrementsAsync(indexEdge, -1L)
+ }
+
+ val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+
+ writeToStorage(zkQuorum, mutations, withWait = true)
+ }
+
+ Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
+ }
+ }
+
+ override def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
+ val mutations = _edges.flatMap { edge =>
+ val (_, edgeUpdate) =
+ if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
+ else S2Edge.buildOperation(None, Seq(edge))
+
+ val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+
+ if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+ io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+ }
+
+ writeToStorage(zkQuorum, mutations, withWait).map { ret =>
+ _edges.zipWithIndex.map { case (edge, idx) =>
+ idx -> ret.isSuccess
+ }
+ }
+ }
+
+ override def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+ def mutateEdgesInner(edges: Seq[S2EdgeLike],
+ checkConsistency: Boolean,
+ withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+ assert(edges.nonEmpty)
+ // TODO:: remove after code review: unreachable code
+ if (!checkConsistency) {
+
+ val futures = edges.map { edge =>
+ val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
+
+ val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+ val mutations =
+ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+
+ if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
+
+ writeToStorage(zkQuorum, mutations, withWait)
+ }
+ Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
+ } else {
+ optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
+ conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
+ }
+ }
+ }
+
+ val edgeWithIdxs = _edges.zipWithIndex
+ val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
+ (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
+ } toSeq
+
+ val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
+ val edges = edgeGroup.map(_._1)
+ val idxs = edgeGroup.map(_._2)
+ // After deleteAll, process others
+ val mutateEdgeFutures = edges.toList match {
+ case head :: tail =>
+ val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
+
+ //TODO: decide what we will do on failure on vertex put
+ val puts = io.buildVertexPutsAsync(head)
+ val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
+ Seq(edgeFuture, vertexFuture)
+ case Nil => Nil
+ }
+
+ val composed = for {
+ // deleteRet <- Future.sequence(deleteAllFutures)
+ mutateRet <- Future.sequence(mutateEdgeFutures)
+ } yield mutateRet
+
+ composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
+ }
+
+ Future.sequence(mutateEdges).map { squashedRets =>
+ squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
+ }
+ }
+
+ override def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
+ val futures = for {
+ edge <- edges
+ } yield {
+ val kvs = for {
+ relEdge <- edge.relatedEdges
+ edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
+ } yield {
+ val countWithTs = edge.propertyValueInner(LabelMeta.count)
+ val countVal = countWithTs.innerVal.toString().toLong
+ io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
+ }
+ writeToStorage(zkQuorum, kvs, withWait = withWait)
+ }
+
+ Future.sequence(futures)
+ }
+
+ override def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+ val kvs = io.buildDegreePuts(edge, degreeVal)
+
+ writeToStorage(zkQuorum, kvs, withWait = true)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
deleted file mode 100644
index 82cc27a..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.s2graph.core.storage
-
-import org.apache.s2graph.core._
-import org.apache.s2graph.core.schema.LabelMeta
-import org.apache.s2graph.core.utils.logger
-
-import scala.concurrent.{ExecutionContext, Future}
-
-class DefaultOptimisticMutator(graph: S2GraphLike,
- serDe: StorageSerDe,
- optimisticEdgeFetcher: OptimisticEdgeFetcher,
- optimisticMutator: OptimisticMutator) extends VertexMutator with EdgeMutator {
-
- lazy val io: StorageIO = new StorageIO(graph, serDe)
-
- lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, optimisticMutator, optimisticEdgeFetcher)
-
- private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
- optimisticMutator.writeToStorage(cluster, kvs, withWait)
-
- def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
- requestTs: Long,
- retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = {
- if (stepInnerResult.isEmpty) Future.successful(true)
- else {
- val head = stepInnerResult.edgeWithScores.head
- val zkQuorum = head.edge.innerLabel.hbaseZkAddr
- val futures = for {
- edgeWithScore <- stepInnerResult.edgeWithScores
- } yield {
- val edge = edgeWithScore.edge
-
- val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
- val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put))
-
- val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
- val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge =>
- serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- io.buildIncrementsAsync(indexEdge, -1L)
- }
-
- /* reverted direction */
- val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs()))
- val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
- serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++
- io.buildIncrementsAsync(indexEdge, -1L)
- }
-
- val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
-
- writeToStorage(zkQuorum, mutations, withWait = true)
- }
-
- Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
- }
- }
-
- def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
- if (vertex.op == GraphUtil.operations("delete")) {
- writeToStorage(zkQuorum,
- serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
- } else if (vertex.op == GraphUtil.operations("deleteAll")) {
- logger.info(s"deleteAll for vertex is truncated. $vertex")
- Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
- } else {
- writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
- }
- }
-
- def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
- val mutations = _edges.flatMap { edge =>
- val (_, edgeUpdate) =
- if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge)
- else S2Edge.buildOperation(None, Seq(edge))
-
- val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
-
- if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
- io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
- }
-
- writeToStorage(zkQuorum, mutations, withWait).map { ret =>
- _edges.zipWithIndex.map { case (edge, idx) =>
- idx -> ret.isSuccess
- }
- }
- }
-
- def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
- def mutateEdgesInner(edges: Seq[S2EdgeLike],
- checkConsistency: Boolean,
- withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
- assert(edges.nonEmpty)
- // TODO:: remove after code review: unreachable code
- if (!checkConsistency) {
-
- val futures = edges.map { edge =>
- val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
-
- val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
- val mutations =
- io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
-
- if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false)
-
- writeToStorage(zkQuorum, mutations, withWait)
- }
- Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) }
- } else {
- optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) =>
- conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_))
- }
- }
- }
-
- val edgeWithIdxs = _edges.zipWithIndex
- val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
- (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
- } toSeq
-
- val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
- val edges = edgeGroup.map(_._1)
- val idxs = edgeGroup.map(_._2)
- // After deleteAll, process others
- val mutateEdgeFutures = edges.toList match {
- case head :: tail =>
- val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait)
-
- //TODO: decide what we will do on failure on vertex put
- val puts = io.buildVertexPutsAsync(head)
- val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait)
- Seq(edgeFuture, vertexFuture)
- case Nil => Nil
- }
-
- val composed = for {
- // deleteRet <- Future.sequence(deleteAllFutures)
- mutateRet <- Future.sequence(mutateEdgeFutures)
- } yield mutateRet
-
- composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) }
- }
-
- Future.sequence(mutateEdges).map { squashedRets =>
- squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
- }
- }
-
- def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
- val futures = for {
- edge <- edges
- } yield {
- val kvs = for {
- relEdge <- edge.relatedEdges
- edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
- } yield {
- val countWithTs = edge.propertyValueInner(LabelMeta.count)
- val countVal = countWithTs.innerVal.toString().toLong
- io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
- }
- writeToStorage(zkQuorum, kvs, withWait = withWait)
- }
-
- Future.sequence(futures)
- }
-
- def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
- val kvs = io.buildDegreePuts(edge, degreeVal)
-
- writeToStorage(zkQuorum, kvs, withWait = true)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala
new file mode 100644
index 0000000..6be619b
--- /dev/null
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticVertexMutator.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.s2graph.core.storage
+
+import org.apache.s2graph.core.utils.logger
+import org.apache.s2graph.core.{GraphUtil, S2GraphLike, S2VertexLike, VertexMutator}
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class DefaultOptimisticVertexMutator(graph: S2GraphLike,
+ serDe: StorageSerDe,
+ optimisticEdgeFetcher: OptimisticEdgeFetcher,
+ optimisticMutator: OptimisticMutator,
+ io: StorageIO) extends VertexMutator {
+
+ def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+ if (vertex.op == GraphUtil.operations("delete")) {
+ optimisticMutator.writeToStorage(zkQuorum,
+ serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait)
+ } else if (vertex.op == GraphUtil.operations("deleteAll")) {
+ logger.info(s"deleteAll for vertex is truncated. $vertex")
+ Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time
+ } else {
+ optimisticMutator.writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
index 36ecfcb..bf620bf 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala
@@ -42,12 +42,8 @@ abstract class Storage(val graph: S2GraphLike,
val edgeFetcher: EdgeFetcher
- val edgeBulkFetcher: EdgeBulkFetcher
-
val vertexFetcher: VertexFetcher
- val vertexBulkFetcher: VertexBulkFetcher
-
val edgeMutator: EdgeMutator
val vertexMutator: VertexMutator
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
deleted file mode 100644
index 3d25dd9..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.hbase
-
-import java.util
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.core.storage.serde.Serializable
-import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2Graph, S2GraphLike}
-import org.apache.s2graph.core.storage.{CanSKeyValue, StorageIO, StorageSerDe}
-import org.apache.s2graph.core.types.HBaseType
-import org.apache.s2graph.core.utils.{CanDefer, Extensions}
-import org.hbase.async.{HBaseClient, KeyValue}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-class AsynchbaseEdgeBulkFetcher(val graph: S2GraphLike,
- val config: Config,
- val client: HBaseClient,
- val serDe: StorageSerDe,
- val io: StorageIO) extends EdgeBulkFetcher {
- import Extensions.DeferOps
- import CanDefer._
- import scala.collection.JavaConverters._
- import AsynchbaseStorage._
-
- override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
- val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
- val distinctLabels = labels.toSet
- val scan = AsynchbasePatcher.newScanner(client, hTableName)
- scan.setFamily(Serializable.edgeCf)
- scan.setMaxVersions(1)
-
- scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
- case null => Seq.empty
- case kvsLs =>
- kvsLs.asScala.flatMap { kvs =>
- kvs.asScala.flatMap { kv =>
- val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
-
- serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION)
- .fromKeyValues(Seq(kv), None)
- .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
- }
- }
- }
- }
-
- Future.sequence(futures).map(_.flatten)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
index 4239d15..8eafc68 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeFetcher.scala
@@ -25,12 +25,14 @@ import com.stumbleupon.async.Deferred
import com.typesafe.config.Config
import org.apache.hadoop.hbase.util.Bytes
import org.apache.s2graph.core._
-import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe}
+import org.apache.s2graph.core.schema.Label
+import org.apache.s2graph.core.storage.serde.Serializable
+import org.apache.s2graph.core.storage.{CanSKeyValue, StorageIO, StorageSerDe}
import org.apache.s2graph.core.types.{HBaseType, VertexId}
import org.apache.s2graph.core.utils.{CanDefer, DeferCache, Extensions, logger}
import org.hbase.async._
-import scala.concurrent.ExecutionContext
+import scala.concurrent.{ExecutionContext, Future}
class AsynchbaseEdgeFetcher(val graph: S2GraphLike,
val config: Config,
@@ -64,6 +66,31 @@ class AsynchbaseEdgeFetcher(val graph: S2GraphLike,
}.toFuture(emptyStepResult).map(_.asScala)
}
+ override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = {
+ val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) =>
+ val distinctLabels = labels.toSet
+ val scan = AsynchbasePatcher.newScanner(client, hTableName)
+ scan.setFamily(Serializable.edgeCf)
+ scan.setMaxVersions(1)
+
+ scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
+ case null => Seq.empty
+ case kvsLs =>
+ kvsLs.asScala.flatMap { kvs =>
+ kvs.asScala.flatMap { kv =>
+ val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv)
+
+ serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION)
+ .fromKeyValues(Seq(kv), None)
+ .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
+ }
+ }
+ }
+ }
+
+ Future.sequence(futures).map(_.flatten)
+ }
+
/**
* we are using future cache to squash requests into same key on storage.
*
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index f65ee20..89303e6 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -323,17 +323,14 @@ class AsynchbaseStorage(override val graph: S2GraphLike,
private lazy val optimisticEdgeFetcher = new AsynchbaseOptimisticEdgeFetcher(client, serDe, io)
private lazy val optimisticMutator = new AsynchbaseOptimisticMutator(graph, serDe, optimisticEdgeFetcher, client, clientWithFlush)
- private lazy val _mutator = new DefaultOptimisticMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator)
override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients)
override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph)
override val edgeFetcher: EdgeFetcher = new AsynchbaseEdgeFetcher(graph, config, client, serDe, io)
- override val edgeBulkFetcher: EdgeBulkFetcher = new AsynchbaseEdgeBulkFetcher(graph, config, client, serDe, io)
override val vertexFetcher: VertexFetcher = new AsynchbaseVertexFetcher(graph, config, client, serDe, io)
- override val vertexBulkFetcher: VertexBulkFetcher = new AsynchbaseVertexBulkFetcher(graph, config, client, serDe, io)
- override val edgeMutator: EdgeMutator = _mutator
- override val vertexMutator: VertexMutator = _mutator
+ override val edgeMutator: EdgeMutator = new DefaultOptimisticEdgeMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator, io)
+ override val vertexMutator: VertexMutator = new DefaultOptimisticVertexMutator(graph, serDe, optimisticEdgeFetcher, optimisticMutator, io)
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
deleted file mode 100644
index e6bf4e6..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexBulkFetcher.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.hbase
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.schema.ServiceColumn
-import org.apache.s2graph.core.storage.serde.Serializable
-import org.apache.s2graph.core.storage.{StorageIO, StorageSerDe}
-import org.apache.s2graph.core.types.HBaseType
-import org.apache.s2graph.core.utils.Extensions
-import org.apache.s2graph.core.{S2Graph, S2GraphLike, VertexBulkFetcher}
-import org.hbase.async.HBaseClient
-
-import scala.concurrent.{ExecutionContext, Future}
-
-class AsynchbaseVertexBulkFetcher(val graph: S2GraphLike,
- val config: Config,
- val client: HBaseClient,
- val serDe: StorageSerDe,
- val io: StorageIO) extends VertexBulkFetcher {
-
- import AsynchbaseStorage._
- import Extensions.DeferOps
-
- import scala.collection.JavaConverters._
-
- override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
- val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) =>
- val distinctColumns = columns.toSet
- val scan = AsynchbasePatcher.newScanner(client, hTableName)
- scan.setFamily(Serializable.vertexCf)
- scan.setMaxVersions(1)
-
- scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
- case null => Seq.empty
- case kvsLs =>
- kvsLs.asScala.flatMap { kvs =>
- serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs.asScala, None)
- .filter(v => distinctColumns(v.serviceColumn))
- }
- }
- }
- Future.sequence(futures).map(_.flatten)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
index 560dd2b..f16c8e9 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseVertexFetcher.scala
@@ -21,7 +21,11 @@ package org.apache.s2graph.core.storage.hbase
import com.typesafe.config.Config
import org.apache.s2graph.core._
+import org.apache.s2graph.core.schema.ServiceColumn
+import org.apache.s2graph.core.storage.serde.Serializable
import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
+import org.apache.s2graph.core.types.HBaseType
+import org.apache.s2graph.core.utils.Extensions
import org.hbase.async.HBaseClient
import scala.concurrent.{ExecutionContext, Future}
@@ -32,6 +36,9 @@ class AsynchbaseVertexFetcher(val graph: S2GraphLike,
val serDe: StorageSerDe,
val io: StorageIO) extends VertexFetcher {
import AsynchbaseStorage._
+ import Extensions.DeferOps
+ import scala.collection.JavaConverters._
+
private def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] = {
val rpc = buildRequest(serDe, queryRequest, vertex)
@@ -58,4 +65,23 @@ class AsynchbaseVertexFetcher(val graph: S2GraphLike,
Future.sequence(futures).map(_.flatten)
}
+
+ override def fetchVerticesAll()(implicit ec: ExecutionContext) = {
+ val futures = ServiceColumn.findAll().groupBy(_.service.hTableName).toSeq.map { case (hTableName, columns) =>
+ val distinctColumns = columns.toSet
+ val scan = AsynchbasePatcher.newScanner(client, hTableName)
+ scan.setFamily(Serializable.vertexCf)
+ scan.setMaxVersions(1)
+
+ scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map {
+ case null => Seq.empty
+ case kvsLs =>
+ kvsLs.asScala.flatMap { kvs =>
+ serDe.vertexDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(kvs.asScala, None)
+ .filter(v => distinctColumns(v.serviceColumn))
+ }
+ }
+ }
+ Future.sequence(futures).map(_.flatten)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/be83d07c/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
deleted file mode 100644
index 2ca4b35..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksEdgeBulkFetcher.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.storage.rocks
-
-import com.typesafe.config.Config
-import org.apache.s2graph.core.schema.Label
-import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2GraphLike}
-import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageSerDe}
-import org.apache.s2graph.core.types.HBaseType
-import org.rocksdb.RocksDB
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
-
-class RocksEdgeBulkFetcher(val graph: S2GraphLike,
- val config: Config,
- val db: RocksDB,
- val vdb: RocksDB,
- val serDe: StorageSerDe,
- val io: StorageIO) extends EdgeBulkFetcher {
- import RocksStorage._
-
- override def fetchEdgesAll()(implicit ec: ExecutionContext) = {
- val edges = new ArrayBuffer[S2EdgeLike]()
- Label.findAll().groupBy(_.hbaseTableName).toSeq.foreach { case (hTableName, labels) =>
- val distinctLabels = labels.toSet
-
- val iter = db.newIterator()
- try {
- iter.seekToFirst()
- while (iter.isValid) {
- val kv = SKeyValue(table, iter.key(), SKeyValue.EdgeCf, qualifier, iter.value, System.currentTimeMillis())
-
- serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION).fromKeyValues(Seq(kv), None)
- .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree)
- .foreach { edge =>
- edges += edge
- }
-
-
- iter.next()
- }
-
- } finally {
- iter.close()
- }
- }
-
- Future.successful(edges)
- }
-}