You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/11 03:05:49 UTC
[09/11] incubator-s2graph git commit: add cacheTTLInSecs on
SafeUpdateCache.withCache.
add cacheTTLInSecs on SafeUpdateCache.withCache.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e8230417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e8230417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e8230417
Branch: refs/heads/master
Commit: e823041715dcca2d330bb25410a0a7cf420bc522
Parents: 9132f74
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu May 10 10:35:35 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu May 10 10:35:35 2018 +0900
----------------------------------------------------------------------
.../org/apache/s2graph/core/Management.scala | 8 ++++----
.../apache/s2graph/core/ResourceManager.scala | 20 ++++++++++++--------
.../s2graph/core/utils/SafeUpdateCache.scala | 9 ++++++---
3 files changed, 22 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8230417/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index c3aef7a..f534423 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -313,7 +313,7 @@ class Management(graph: S2GraphLike) {
def updateEdgeFetcher(label: Label, options: String): Unit = {
val newLabel = Label.updateOption(label, options)
- graph.resourceManager.getOrElseUpdateEdgeFetcher(newLabel, forceUpdate = true)
+ graph.resourceManager.getOrElseUpdateEdgeFetcher(newLabel, cacheTTLInSecs = Option(-1))
}
def updateVertexFetcher(serviceName: String, columnName: String, options: String): Unit = {
@@ -325,7 +325,7 @@ class Management(graph: S2GraphLike) {
def updateVertexFetcher(column: ServiceColumn, options: String): Unit = {
val newColumn = ServiceColumn.updateOption(column, options)
- graph.resourceManager.getOrElseUpdateVertexFetcher(newColumn, forceUpdate = true)
+ graph.resourceManager.getOrElseUpdateVertexFetcher(newColumn, cacheTTLInSecs = Option(-1))
}
def updateEdgeMutator(labelName: String, options: String): Unit = {
@@ -336,7 +336,7 @@ class Management(graph: S2GraphLike) {
def updateEdgeMutator(label: Label, options: String): Unit = {
val newLabel = Label.updateOption(label, options)
- graph.resourceManager.getOrElseUpdateEdgeMutator(newLabel, forceUpdate = true)
+ graph.resourceManager.getOrElseUpdateEdgeMutator(newLabel, cacheTTLInSecs = Option(-1))
}
def updateVertexMutator(serviceName: String, columnName: String, options: String): Unit = {
@@ -348,7 +348,7 @@ class Management(graph: S2GraphLike) {
def updateVertexMutator(column: ServiceColumn, options: String): Unit = {
val newColumn = ServiceColumn.updateOption(column, options)
- graph.resourceManager.getOrElseUpdateVertexMutator(newColumn, forceUpdate = true)
+ graph.resourceManager.getOrElseUpdateVertexMutator(newColumn, cacheTTLInSecs = Option(-1))
}
def createStorageTable(zkAddr: String,
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8230417/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
index b877603..a8a3a34 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
@@ -59,9 +59,10 @@ class ResourceManager(graph: S2GraphLike,
cache.asMap().asScala.toSeq.collect { case (_, (obj: EdgeFetcher, _, _)) => obj }
}
- def getOrElseUpdateVertexFetcher(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexFetcher] = {
+ def getOrElseUpdateVertexFetcher(column: ServiceColumn,
+ cacheTTLInSecs: Option[Int] = None): Option[VertexFetcher] = {
val cacheKey = VertexFetcherKey + "_" + column.service.serviceName + "_" + column.columnName
- cache.withCache(cacheKey, false, forceUpdate) {
+ cache.withCache(cacheKey, false, cacheTTLInSecs) {
column.toFetcherConfig.map { fetcherConfig =>
val className = fetcherConfig.getString(ClassNameKey)
val fetcher = Class.forName(className)
@@ -76,10 +77,11 @@ class ResourceManager(graph: S2GraphLike,
}
}
- def getOrElseUpdateEdgeFetcher(label: Label, forceUpdate: Boolean = false): Option[EdgeFetcher] = {
+ def getOrElseUpdateEdgeFetcher(label: Label,
+ cacheTTLInSecs: Option[Int] = None): Option[EdgeFetcher] = {
val cacheKey = EdgeFetcherKey + "_" + label.label
- cache.withCache(cacheKey, false, forceUpdate) {
+ cache.withCache(cacheKey, false, cacheTTLInSecs) {
label.toFetcherConfig.map { fetcherConfig =>
val className = fetcherConfig.getString(ClassNameKey)
val fetcher = Class.forName(className)
@@ -94,9 +96,10 @@ class ResourceManager(graph: S2GraphLike,
}
}
- def getOrElseUpdateVertexMutator(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexMutator] = {
+ def getOrElseUpdateVertexMutator(column: ServiceColumn,
+ cacheTTLInSecs: Option[Int] = None): Option[VertexMutator] = {
val cacheKey = VertexMutatorKey + "_" + column.service.serviceName + "_" + column.columnName
- cache.withCache(cacheKey, false, forceUpdate) {
+ cache.withCache(cacheKey, false, cacheTTLInSecs) {
column.toMutatorConfig.map { mutatorConfig =>
val className = mutatorConfig.getString(ClassNameKey)
val fetcher = Class.forName(className)
@@ -111,9 +114,10 @@ class ResourceManager(graph: S2GraphLike,
}
}
- def getOrElseUpdateEdgeMutator(label: Label, forceUpdate: Boolean = false): Option[EdgeMutator] = {
+ def getOrElseUpdateEdgeMutator(label: Label,
+ cacheTTLInSecs: Option[Int] = None): Option[EdgeMutator] = {
val cacheKey = EdgeMutatorKey + "_" + label.label
- cache.withCache(cacheKey, false, forceUpdate) {
+ cache.withCache(cacheKey, false, cacheTTLInSecs) {
label.toMutatorConfig.map { mutatorConfig =>
val className = mutatorConfig.getString(ClassNameKey)
val fetcher = Class.forName(className)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8230417/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index 51e45c2..3ecb02f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -86,7 +86,7 @@ class SafeUpdateCache(val config: Config)
import java.lang.{Long => JLong}
import SafeUpdateCache._
val maxSize = config.getInt(SafeUpdateCache.MaxSizeKey)
- val ttl = config.getInt(SafeUpdateCache.TtlKey)
+ val systemTtl = config.getInt(SafeUpdateCache.TtlKey)
private val cache = CacheBuilder.newBuilder().maximumSize(maxSize)
.build[JLong, (AnyRef, Int, AtomicBoolean)]()
@@ -116,7 +116,7 @@ class SafeUpdateCache(val config: Config)
def withCache[T <: AnyRef](key: String,
broadcast: Boolean,
- forceUpdate: Boolean = false)(op: => T): T = {
+ cacheTTLInSecs: Option[Int] = None)(op: => T): T = {
val cacheKey = toCacheKey(key)
val cachedValWithTs = cache.getIfPresent(cacheKey)
@@ -129,7 +129,10 @@ class SafeUpdateCache(val config: Config)
val (_cachedVal, updatedAt, isUpdating) = cachedValWithTs
val cachedVal = _cachedVal.asInstanceOf[T]
- if (!forceUpdate && toTs() < updatedAt + ttl) cachedVal // in cache TTL
+ val ttl = cacheTTLInSecs.getOrElse(systemTtl)
+ val isValidCacheVal = toTs() < updatedAt + ttl
+
+ if (isValidCacheVal) cachedVal // in cache TTL
else {
val running = isUpdating.getAndSet(true)