You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by da...@apache.org on 2018/07/13 06:29:43 UTC

[1/3] incubator-s2graph git commit: add background task on ResourceManager onEvict.

Repository: incubator-s2graph
Updated Branches:
  refs/heads/master c1698e31e -> 07a5af39a


add background task on ResourceManager onEvict.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/447eca4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/447eca4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/447eca4c

Branch: refs/heads/master
Commit: 447eca4c24e3915deaa86c8a2e2222976308ea92
Parents: 08d6a3e
Author: DO YUNG YOON <st...@apache.org>
Authored: Mon Jul 9 16:27:22 2018 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Mon Jul 9 16:27:22 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/core/ResourceManager.scala   | 28 +++++++++++++++-----
 .../scala/org/apache/s2graph/core/S2Graph.scala |  8 +++---
 .../s2graph/core/utils/SafeUpdateCache.scala    |  2 +-
 3 files changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/447eca4c/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
index 051ca9f..14ff767 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/ResourceManager.scala
@@ -19,6 +19,8 @@
 
 package org.apache.s2graph.core
 
+import java.util.concurrent.{Executors, TimeUnit}
+
 import com.typesafe.config.impl.ConfigImpl
 import com.typesafe.config._
 import org.apache.s2graph.core.schema.{Label, ServiceColumn}
@@ -47,7 +49,7 @@ object ResourceManager {
   val EdgeMutatorKey = classOf[EdgeMutator].getName
   val VertexMutatorKey = classOf[VertexMutator].getName
 
-  val DefaultMaxSize = 1000
+  val DefaultMaxSize = 10
   val DefaultCacheTTL = -1
   val DefaultConfig = ConfigFactory.parseMap(Map(MaxSizeKey -> DefaultMaxSize, TtlKey -> DefaultCacheTTL).asJava)
 }
@@ -59,6 +61,13 @@ class ResourceManager(graph: S2GraphLike,
 
   import scala.collection.JavaConverters._
 
+  def shutdown(): Unit = {
+    cache.asMap().asScala.foreach { case (_, (obj, _, _)) =>
+      onEvict(obj)
+    }
+  }
+  val scheduler = Executors.newScheduledThreadPool(1)
+  val waitForEvictionInSeconds = 10
   val maxSize = Try(_config.getInt(ResourceManager.MaxSizeKey)).getOrElse(DefaultMaxSize)
   val cacheTTL = Try(_config.getInt(ResourceManager.CacheTTL)).getOrElse(DefaultCacheTTL)
 
@@ -72,21 +81,28 @@ class ResourceManager(graph: S2GraphLike,
     cache.asMap().asScala.toSeq.collect { case (_, (obj: EdgeFetcher, _, _)) => obj }
   }
 
+
   def onEvict(oldValue: AnyRef): Unit = {
     oldValue match {
       case o: Option[_] => o.foreach { case v: AutoCloseable =>
-        v.close()
-        logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.")
+        scheduler.schedule(newCloseTask(v), waitForEvictionInSeconds, TimeUnit.SECONDS)
       }
 
       case v: AutoCloseable =>
-        v.close()
-        logger.info(s"[${oldValue.getClass.getName}]: $oldValue evicted.")
-
+        scheduler.schedule(newCloseTask(v), waitForEvictionInSeconds, TimeUnit.SECONDS)
       case _ => logger.info(s"Class does't have close() method ${oldValue.getClass.getName}")
     }
   }
 
+  private def newCloseTask(v: AutoCloseable) = {
+    new Runnable {
+      override def run(): Unit = {
+        v.close()
+        logger.info(s"[${v.getClass.getName}]: $v evicted.")
+      }
+    }
+  }
+
   def getOrElseUpdateVertexFetcher(column: ServiceColumn,
                                    cacheTTLInSecs: Option[Int] = None): Option[VertexFetcher] = {
     val cacheKey = VertexFetcherKey + "_" + column.service.serviceName + "_" + column.columnName

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/447eca4c/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
index 7d6e20b..9657e10 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala
@@ -295,6 +295,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
     if (running.compareAndSet(true, false)) {
       flushStorage()
       Schema.shutdown(modelDataDelete)
+      resourceManager.shutdown()
       defaultStorage.shutdown()
       localLongId.set(0l)
     }
@@ -353,9 +354,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap
       mutateVertices(getStorage(service))(service.cluster, vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2)))
     }
 
-    indexProvider.mutateVerticesAsync(vertices)
-    Future.sequence(futures).map{ ls =>
-      ls.flatten.toSeq.sortBy(_._2).map(_._1)
+    Future.sequence(futures).flatMap { ls =>
+      indexProvider.mutateVerticesAsync(vertices).map { _ =>
+        ls.flatten.toSeq.sortBy(_._2).map(_._1)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/447eca4c/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
index 755b8d0..bf1f22c 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/SafeUpdateCache.scala
@@ -161,8 +161,8 @@ class SafeUpdateCache(val config: Config)
               put(key, cachedVal, false)
               logger.error(s"withCache update failed: $cacheKey", ex)
             case Success(newValue) =>
-              put(key, newValue, broadcast = (broadcast && newValue != cachedVal))
 
+              put(key, newValue, broadcast = (broadcast && newValue != cachedVal))
               onEvict(cachedVal)
 
               cachedVal match {


[2/3] incubator-s2graph git commit: Merge branch 'S2GRAPH-230'

Posted by da...@apache.org.
Merge branch 'S2GRAPH-230'

* S2GRAPH-230:
  add background task on ResourceManager onEvict.


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/be8f0419
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/be8f0419
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/be8f0419

Branch: refs/heads/master
Commit: be8f0419b3d4feceb66dafad112a9435eef713c5
Parents: c1698e3 447eca4
Author: daewon <da...@apache.org>
Authored: Fri Jul 13 15:28:01 2018 +0900
Committer: daewon <da...@apache.org>
Committed: Fri Jul 13 15:28:01 2018 +0900

----------------------------------------------------------------------
 .../apache/s2graph/core/ResourceManager.scala   | 28 +++++++++++++++-----
 .../scala/org/apache/s2graph/core/S2Graph.scala |  8 +++---
 .../s2graph/core/utils/SafeUpdateCache.scala    |  2 +-
 3 files changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------



[3/3] incubator-s2graph git commit: [S2GRAPH-230] ResourceManager onEvict cause segmentation fault with AnnoyModelFetcher

Posted by da...@apache.org.
[S2GRAPH-230] ResourceManager onEvict cause segmentation fault with AnnoyModelFetcher

JIRA:
  [S2GRAPH-230] https://issues.apache.org/jira/browse/S2GRAPH-230

Pull Request:
  Closes #179

Author
  DO YUNG YOON <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/07a5af39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/07a5af39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/07a5af39

Branch: refs/heads/master
Commit: 07a5af39a5a52aad2e61c34d685fe00a6bb8e2cc
Parents: be8f041
Author: daewon <da...@apache.org>
Authored: Fri Jul 13 15:28:27 2018 +0900
Committer: daewon <da...@apache.org>
Committed: Fri Jul 13 15:28:27 2018 +0900

----------------------------------------------------------------------
 CHANGES | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/07a5af39/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 0595ad5..734f498 100644
--- a/CHANGES
+++ b/CHANGES
@@ -83,6 +83,7 @@ Release Notes - S2Graph - Version 0.2.0
     * [S2GRAPH-214] - Add REAME for movielens examples
     * [S2GRAPH-216] - Provide a transform directive in the GraphQL query result.
     * [S2GRAPH-221] - Unify configurations for bulk and mutate in S2GraphSink.
+    * [S2GRAPH-230] - ResourceManager onEvict cause segmentation fault with AnnoyModelFetcher 
     * [S2GRAPH-231] - Change the GraphQL type name to a valid string.
 
 ** New Feature