You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2018/05/11 03:05:49 UTC

[09/11] incubator-s2graph git commit: add cacheTTLInSecs on SafeUpdateCache.withCache.

add cacheTTLInSecs on SafeUpdateCache.withCache.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e8230417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e8230417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e8230417

Branch: refs/heads/master
Commit: e823041715dcca2d330bb25410a0a7cf420bc522
Parents: 9132f74
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu May 10 10:35:35 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu May 10 10:35:35 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/core/Management.scala    |  8 ++++----
 .../apache/s2graph/core/ResourceManager.scala   | 20 ++++++++++++--------
 .../s2graph/core/utils/SafeUpdateCache.scala    |  9 ++++++---
 3 files changed, 22 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8230417/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index c3aef7a..f534423 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -313,7 +313,7 @@ class Management(graph: S2GraphLike) {
 
   def updateEdgeFetcher(label: Label, options: String): Unit = {
     val newLabel = Label.updateOption(label, options)
-    graph.resourceManager.getOrElseUpdateEdgeFetcher(newLabel, forceUpdate = true)
+    graph.resourceManager.getOrElseUpdateEdgeFetcher(newLabel, cacheTTLInSecs = Option(-1))
   }
 
   def updateVertexFetcher(serviceName: String, columnName: String, options: String): Unit = {
@@ -325,7 +325,7 @@ class Management(graph: S2GraphLike) {
 
   def updateVertexFetcher(column: ServiceColumn, options: String): Unit = {
     val newColumn = ServiceColumn.updateOption(column, options)
-    graph.resourceManager.getOrElseUpdateVertexFetcher(newColumn, forceUpdate = true)
+    graph.resourceManager.getOrElseUpdateVertexFetcher(newColumn, cacheTTLInSecs = Option(-1))
   }
 
   def updateEdgeMutator(labelName: String, options: String): Unit = {
@@ -336,7 +336,7 @@ class Management(graph: S2GraphLike) {
 
   def updateEdgeMutator(label: Label, options: String): Unit = {
     val newLabel = Label.updateOption(label, options)
-    graph.resourceManager.getOrElseUpdateEdgeMutator(newLabel, forceUpdate = true)
+    graph.resourceManager.getOrElseUpdateEdgeMutator(newLabel, cacheTTLInSecs = Option(-1))
   }
 
   def updateVertexMutator(serviceName: String, columnName: String, options: String): Unit = {
@@ -348,7 +348,7 @@ class Management(graph: S2GraphLike) {
 
   def updateVertexMutator(column: ServiceColumn, options: String): Unit = {
     val newColumn = ServiceColumn.updateOption(column, options)
-    graph.resourceManager.getOrElseUpdateVertexMutator(newColumn, forceUpdate = true)
+    graph.resourceManager.getOrElseUpdateVertexMutator(newColumn, cacheTTLInSecs = Option(-1))
   }
 
   def createStorageTable(zkAddr: String,

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8230417/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
index b877603..a8a3a34 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
@@ -59,9 +59,10 @@ class ResourceManager(graph: S2GraphLike,
     cache.asMap().asScala.toSeq.collect { case (_, (obj: EdgeFetcher, _, _)) => obj }
   }
 
-  def getOrElseUpdateVertexFetcher(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexFetcher] = {
+  def getOrElseUpdateVertexFetcher(column: ServiceColumn,
+                                   cacheTTLInSecs: Option[Int] = None): Option[VertexFetcher] = {
     val cacheKey = VertexFetcherKey + "_" + column.service.serviceName + "_" + column.columnName
-    cache.withCache(cacheKey, false, forceUpdate) {
+    cache.withCache(cacheKey, false, cacheTTLInSecs) {
       column.toFetcherConfig.map { fetcherConfig =>
         val className = fetcherConfig.getString(ClassNameKey)
         val fetcher = Class.forName(className)
@@ -76,10 +77,11 @@ class ResourceManager(graph: S2GraphLike,
     }
   }
 
-  def getOrElseUpdateEdgeFetcher(label: Label, forceUpdate: Boolean = false): Option[EdgeFetcher] = {
+  def getOrElseUpdateEdgeFetcher(label: Label,
+                                 cacheTTLInSecs: Option[Int] = None): Option[EdgeFetcher] = {
     val cacheKey = EdgeFetcherKey + "_" + label.label
 
-    cache.withCache(cacheKey, false, forceUpdate) {
+    cache.withCache(cacheKey, false, cacheTTLInSecs) {
       label.toFetcherConfig.map { fetcherConfig =>
         val className = fetcherConfig.getString(ClassNameKey)
         val fetcher = Class.forName(className)
@@ -94,9 +96,10 @@ class ResourceManager(graph: S2GraphLike,
     }
   }
 
-  def getOrElseUpdateVertexMutator(column: ServiceColumn, forceUpdate: Boolean = false): Option[VertexMutator] = {
+  def getOrElseUpdateVertexMutator(column: ServiceColumn,
+                                   cacheTTLInSecs: Option[Int] = None): Option[VertexMutator] = {
     val cacheKey = VertexMutatorKey + "_" + column.service.serviceName + "_" + column.columnName
-    cache.withCache(cacheKey, false, forceUpdate) {
+    cache.withCache(cacheKey, false, cacheTTLInSecs) {
       column.toMutatorConfig.map { mutatorConfig =>
         val className = mutatorConfig.getString(ClassNameKey)
         val fetcher = Class.forName(className)
@@ -111,9 +114,10 @@ class ResourceManager(graph: S2GraphLike,
     }
   }
 
-  def getOrElseUpdateEdgeMutator(label: Label, forceUpdate: Boolean = false): Option[EdgeMutator] = {
+  def getOrElseUpdateEdgeMutator(label: Label,
+                                 cacheTTLInSecs: Option[Int] = None): Option[EdgeMutator] = {
     val cacheKey = EdgeMutatorKey + "_" + label.label
-    cache.withCache(cacheKey, false, forceUpdate) {
+    cache.withCache(cacheKey, false, cacheTTLInSecs) {
       label.toMutatorConfig.map { mutatorConfig =>
         val className = mutatorConfig.getString(ClassNameKey)
         val fetcher = Class.forName(className)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8230417/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index 51e45c2..3ecb02f 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -86,7 +86,7 @@ class SafeUpdateCache(val config: Config)
   import java.lang.{Long => JLong}
   import SafeUpdateCache._
   val maxSize = config.getInt(SafeUpdateCache.MaxSizeKey)
-  val ttl = config.getInt(SafeUpdateCache.TtlKey)
+  val systemTtl = config.getInt(SafeUpdateCache.TtlKey)
   private val cache = CacheBuilder.newBuilder().maximumSize(maxSize)
     .build[JLong, (AnyRef, Int, AtomicBoolean)]()
 
@@ -116,7 +116,7 @@ class SafeUpdateCache(val config: Config)
 
   def withCache[T <: AnyRef](key: String,
                              broadcast: Boolean,
-                             forceUpdate: Boolean = false)(op: => T): T = {
+                             cacheTTLInSecs: Option[Int] = None)(op: => T): T = {
     val cacheKey = toCacheKey(key)
     val cachedValWithTs = cache.getIfPresent(cacheKey)
 
@@ -129,7 +129,10 @@ class SafeUpdateCache(val config: Config)
       val (_cachedVal, updatedAt, isUpdating) = cachedValWithTs
       val cachedVal = _cachedVal.asInstanceOf[T]
 
-      if (!forceUpdate && toTs() < updatedAt + ttl) cachedVal // in cache TTL
+      val ttl = cacheTTLInSecs.getOrElse(systemTtl)
+      val isValidCacheVal = toTs() < updatedAt + ttl
+
+      if (isValidCacheVal) cachedVal // in cache TTL
       else {
         val running = isUpdating.getAndSet(true)