You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@s2graph.apache.org by st...@apache.org on 2016/08/31 21:31:15 UTC
incubator-s2graph git commit: [S2GRAPH-82]: Merge DeferCache and
FutureCache.
Repository: incubator-s2graph
Updated Branches:
refs/heads/master 69273249a -> 09a7919de
[S2GRAPH-82]: Merge DeferCache and FutureCache.
JIRA:
[S2GRAPH-82] https://issues.apache.org/jira/browse/S2GRAPH-82
Pull Request:
Closes #54
Authors:
Daewon Jeong: <bl...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/09a7919d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/09a7919d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/09a7919d
Branch: refs/heads/master
Commit: 09a7919de88571e2b2a6924ef3d28fcc69b8d593
Parents: 6927324
Author: DO YUNG YOON <st...@apache.org>
Authored: Thu Sep 1 06:35:13 2016 +0900
Committer: DO YUNG YOON <st...@apache.org>
Committed: Thu Sep 1 06:35:13 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
s2core/src/main/resources/reference.conf | 1 +
.../scala/org/apache/s2graph/core/Graph.scala | 1 +
.../core/storage/hbase/AsynchbaseStorage.scala | 33 ++--
.../apache/s2graph/core/utils/DeferCache.scala | 162 +++++++++++++++----
.../apache/s2graph/core/utils/FutureCache.scala | 101 ------------
.../org/apache/s2graph/core/utils/Logger.scala | 3 +
7 files changed, 154 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 28cfbcd..ebb11c6 100644
--- a/CHANGES
+++ b/CHANGES
@@ -74,6 +74,8 @@ Release 0.12.1 - unreleased
S2GRAPH-70: Automate the process of building a distribution package
(Contributed by Jong Wook Kim<jo...@nyu.edu>, committed by DOYUNG YOON)
+ S2GRAPH-82: Merge DeferCache and FutureCache (Committed by Daewon Jeong).
+
BUG FIXES
S2GRAPH-18: Query Option "interval" is Broken.
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/s2core/src/main/resources/reference.conf b/s2core/src/main/resources/reference.conf
index 86dfa67..e5c129b 100644
--- a/s2core/src/main/resources/reference.conf
+++ b/s2core/src/main/resources/reference.conf
@@ -39,6 +39,7 @@ max.back.off=50
future.cache.max.size=100000
future.cache.expire.after.write=10000
future.cache.expire.after.access=5000
+future.cache.metric.interval=60000
# Local Cache
cache.ttl.seconds=60
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
index aee0e95..e5aa6eb 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala
@@ -61,6 +61,7 @@ object Graph {
"future.cache.max.size" -> java.lang.Integer.valueOf(100000),
"future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000),
"future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000),
+ "future.cache.metric.interval" -> java.lang.Integer.valueOf(60000),
"s2graph.storage.backend" -> "hbase",
"query.hardlimit" -> java.lang.Integer.valueOf(100000)
)
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
index c0c369b..19d5cc8 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala
@@ -36,13 +36,13 @@ import org.apache.s2graph.core._
import org.apache.s2graph.core.mysqls.LabelMeta
import org.apache.s2graph.core.storage._
import org.apache.s2graph.core.types.{HBaseType, VertexId}
-import org.apache.s2graph.core.utils.{DeferCache, Extensions, FutureCache, logger}
+import org.apache.s2graph.core.utils._
import org.hbase.async._
import scala.collection.JavaConversions._
import scala.collection.{Map, Seq}
import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, Future, duration}
+import scala.concurrent._
import scala.util.hashing.MurmurHash3
@@ -98,11 +98,13 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
private val emptyKeyValues = new util.ArrayList[KeyValue]()
private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client
+ import CanDefer._
+
/** Future Cache to squash request */
- private val futureCache = new DeferCache[QueryRequestWithResult](config)(ec)
+ private val futureCache = new DeferCache[QueryResult, Deferred, Deferred](config, QueryResult(), "FutureCache", useMetric = true)
/** Simple Vertex Cache */
- private val vertexCache = new FutureCache[Seq[SKeyValue]](config)(ec)
+ private val vertexCache = new DeferCache[Seq[SKeyValue], Promise, Future](config, Seq.empty[SKeyValue])
/**
@@ -277,19 +279,19 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
isInnerCall: Boolean,
parentEdges: Seq[EdgeWithScore]): Deferred[QueryRequestWithResult] = {
- def fetchInner(hbaseRpc: AnyRef): Deferred[QueryRequestWithResult] = {
+ def fetchInner(hbaseRpc: AnyRef): Deferred[QueryResult] = {
fetchKeyValuesInner(hbaseRpc).withCallback { kvs =>
val edgeWithScores = toEdges(kvs, queryRequest.queryParam, prevStepScore, isInnerCall, parentEdges)
val resultEdgesWithScores = if (queryRequest.queryParam.sample >= 0) {
sample(queryRequest, edgeWithScores, queryRequest.queryParam.sample)
} else edgeWithScores
-// QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
- QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty)))
+ QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty[Byte]))
+// QueryRequestWithResult(queryRequest, QueryResult(resultEdgesWithScores, tailCursor = kvs.lastOption.map(_.key).getOrElse(Array.empty)))
} recoverWith { ex =>
logger.error(s"fetchInner failed. fallback return. $hbaseRpc}", ex)
-// QueryResult(isFailure = true)
- QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
+ QueryResult(isFailure = true)
+// QueryRequestWithResult(queryRequest, QueryResult(isFailure = true))
}
}
@@ -297,13 +299,14 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
val cacheTTL = queryParam.cacheTTLInMillis
val request = buildRequest(queryRequest)
-
- if (cacheTTL <= 0) fetchInner(request)
- else {
- val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
- val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
- futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
+ val defer =
+ if (cacheTTL <= 0) fetchInner(request)
+ else {
+ val cacheKeyBytes = Bytes.add(queryRequest.query.cacheKeyBytes, toCacheKeyBytes(request))
+ val cacheKey = queryParam.toCacheKey(cacheKeyBytes)
+ futureCache.getOrElseUpdate(cacheKey, cacheTTL)(fetchInner(request))
}
+ defer withCallback { queryResult => QueryRequestWithResult(queryRequest, queryResult)}
}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala
index 4198010..96f87ed 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/DeferCache.scala
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,83 +19,179 @@
package org.apache.s2graph.core.utils
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{Executors, TimeUnit}
import com.google.common.cache.CacheBuilder
-import com.stumbleupon.async.Deferred
+import com.stumbleupon.async.{Callback, Deferred}
import com.typesafe.config.Config
-import scala.concurrent.ExecutionContext
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.language.higherKinds
-class DeferCache[R](config: Config)(implicit ex: ExecutionContext) {
+trait CanDefer[A, M[_], C[_]] {
+ def promise: M[A]
- import Extensions.DeferOps
+ def future(defer: M[A]): C[A]
- type Value = (Long, Deferred[R])
+ def success(defer: M[A], value: A): Unit
+
+ def failure(defer: M[A], cause: Throwable): Unit
+
+ def onSuccess(defer: C[A])(pf: PartialFunction[A, A])(implicit ec: ExecutionContext)
+
+ def onFailure(defer: C[A])(pf: PartialFunction[Throwable, A])(implicit ec: ExecutionContext)
+}
+
+object CanDefer {
+ implicit def implFuture[A] = new CanDefer[A, Promise, Future] {
+ override def promise: Promise[A] = Promise[A]()
+
+ override def future(defer: Promise[A]): Future[A] = defer.future
+
+ override def success(defer: Promise[A], value: A) = defer.success(value)
+
+ override def failure(defer: Promise[A], cause: Throwable) = defer.failure(cause)
+
+ override def onSuccess(defer: Future[A])(pf: PartialFunction[A, A])(implicit ec: ExecutionContext) = defer onSuccess pf
+
+ override def onFailure(defer: Future[A])(pf: PartialFunction[Throwable, A])(implicit ec: ExecutionContext) = defer onFailure pf
+ }
+
+ implicit def implDeferred[A] = new CanDefer[A, Deferred, Deferred] {
+
+ override def promise: Deferred[A] = new Deferred[A]()
+
+ override def future(defer: Deferred[A]): Deferred[A] = defer
+
+ override def success(defer: Deferred[A], value: A) = defer.callback(value)
+
+ override def failure(defer: Deferred[A], cause: Throwable) = defer.callback(cause)
+
+ override def onSuccess(defer: Deferred[A])(pf: PartialFunction[A, A])(implicit _ec: ExecutionContext) =
+ defer.addCallback(new Callback[A, A] {
+ override def call(arg: A): A = pf(arg)
+ })
+
+ override def onFailure(defer: Deferred[A])(pf: PartialFunction[Throwable, A])(implicit ec: ExecutionContext) =
+ defer.addErrback(new Callback[A, Exception] {
+ override def call(t: Exception): A = pf(t)
+ })
+ }
+}
+
+object DeferCache {
+ private val scheduledThreadPool = Executors.newSingleThreadScheduledExecutor()
+
+ def addScheduleJob(delay: Long)(block: => Unit) =
+ scheduledThreadPool.scheduleWithFixedDelay(new Runnable {
+ override def run(): Unit = block
+ }, 1000, delay, TimeUnit.MILLISECONDS)
+}
+
+/**
+ * @param config
+ * @param ec
+ * @param canDefer: implicit evidence to find out implementation of CanDefer.
+ * @tparam A: actual element type that will be stored in M[_] and C[_].
+ * @tparam M[_]: container type that will be stored in local cache. ex) Promise, Defer.
+ * @tparam C[_]: container type that will be returned to client of this class. Ex) Future, Defer.
+ */
+class DeferCache[A, M[_], C[_]](config: Config, empty: => A, name: String = "", useMetric: Boolean = false)(implicit ec: ExecutionContext, canDefer: CanDefer[A, M, C]) {
+ type Value = (Long, C[A])
private val maxSize = config.getInt("future.cache.max.size")
+ private val metricInterval = config.getInt("future.cache.metric.interval")
private val expireAfterWrite = config.getInt("future.cache.expire.after.write")
private val expireAfterAccess = config.getInt("future.cache.expire.after.access")
- private val futureCache = CacheBuilder.newBuilder()
- .initialCapacity(maxSize)
- .concurrencyLevel(Runtime.getRuntime.availableProcessors())
- .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS)
- .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS)
- .maximumSize(maxSize).build[java.lang.Long, (Long, Deferred[R])]()
-
+ private val futureCache = {
+ val builder = CacheBuilder.newBuilder()
+ .initialCapacity(maxSize)
+ .concurrencyLevel(Runtime.getRuntime.availableProcessors())
+ .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS)
+ .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS)
+ .maximumSize(maxSize)
+
+ if (useMetric && metricInterval > 0) {
+ val cache = builder.recordStats().build[java.lang.Long, (Long, M[A])]()
+ DeferCache.addScheduleJob(delay = metricInterval) { logger.metric(s"${name}: ${cache.stats()}") }
+ cache
+ } else {
+ builder.build[java.lang.Long, (Long, M[A])]()
+ }
+ }
def asMap() = futureCache.asMap()
- def getIfPresent(cacheKey: Long): Value = futureCache.getIfPresent(cacheKey)
+ def getIfPresent(cacheKey: Long): Value = {
+ val (cachedAt, promise) = futureCache.getIfPresent(cacheKey)
+ (cachedAt, canDefer.future(promise))
+ }
private def checkAndExpire(cacheKey: Long,
cachedAt: Long,
cacheTTL: Long,
- oldDefer: Deferred[R])(op: => Deferred[R]): Deferred[R] = {
+ oldFuture: C[A])(op: => C[A]): C[A] = {
if (System.currentTimeMillis() >= cachedAt + cacheTTL) {
// future is too old. so need to expire and fetch new data from storage.
futureCache.asMap().remove(cacheKey)
- val newPromise = new Deferred[R]()
+ val promise = canDefer.promise
val now = System.currentTimeMillis()
- futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match {
+ futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match {
case null =>
// only one thread succeed to come here concurrently
// initiate fetch to storage then add callback on complete to finish promise.
- op withCallback { value =>
- newPromise.callback(value)
+ val result = op
+ canDefer.onSuccess(result) { case value =>
+ canDefer.success(promise, value)
value
}
- newPromise
- case (cachedAt, oldDefer) => oldDefer
+
+ canDefer.onFailure(result) { case e: Throwable =>
+ canDefer.failure(promise, e)
+ empty
+ }
+
+ canDefer.future(promise)
+
+ case (cachedAt, oldPromise) => canDefer.future(oldPromise)
}
} else {
// future is not to old so reuse it.
- oldDefer
+ oldFuture
}
}
- def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Deferred[R]): Deferred[R] = {
+
+ def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => C[A]): C[A] = {
val cacheVal = futureCache.getIfPresent(cacheKey)
cacheVal match {
case null =>
- val promise = new Deferred[R]()
+ val promise = canDefer.promise
val now = System.currentTimeMillis()
- val (cachedAt, defer) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match {
+
+ val (cachedAt, cachedPromise) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match {
case null =>
- op.withCallback { value =>
- promise.callback(value)
+ val result = op
+ canDefer.onSuccess(result) { case value =>
+ canDefer.success(promise, value)
value
}
+
+ canDefer.onFailure(result) { case e: Throwable =>
+ canDefer.failure(promise, e)
+ empty
+ }
+
(now, promise)
+
case oldVal => oldVal
}
- checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op)
+ checkAndExpire(cacheKey, cacheTTL, cachedAt, canDefer.future(cachedPromise))(op)
- case (cachedAt, defer) =>
- checkAndExpire(cacheKey, cacheTTL, cachedAt, defer)(op)
+ case (cachedAt, cachedPromise) =>
+ checkAndExpire(cacheKey, cacheTTL, cachedAt, canDefer.future(cachedPromise))(op)
}
}
}
-
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala
deleted file mode 100644
index d6c4de3..0000000
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/FutureCache.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.s2graph.core.utils
-
-import java.util.concurrent.TimeUnit
-
-import com.google.common.cache.CacheBuilder
-import com.typesafe.config.Config
-
-import scala.concurrent.{ExecutionContext, Future, Promise}
-
-
-class FutureCache[R](config: Config)(implicit ex: ExecutionContext) {
-
- type Value = (Long, Future[R])
-
- private val maxSize = config.getInt("future.cache.max.size")
- private val expireAfterWrite = config.getInt("future.cache.expire.after.write")
- private val expireAfterAccess = config.getInt("future.cache.expire.after.access")
-
- private val futureCache = CacheBuilder.newBuilder()
- .initialCapacity(maxSize)
- .concurrencyLevel(Runtime.getRuntime.availableProcessors())
- .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS)
- .expireAfterAccess(expireAfterAccess, TimeUnit.MILLISECONDS)
- .maximumSize(maxSize).build[java.lang.Long, (Long, Promise[R])]()
-
-
- def asMap() = futureCache.asMap()
-
- def getIfPresent(cacheKey: Long): Value = {
- val (cachedAt, promise) = futureCache.getIfPresent(cacheKey)
- (cachedAt, promise.future)
- }
-
- private def checkAndExpire(cacheKey: Long,
- cachedAt: Long,
- cacheTTL: Long,
- oldFuture: Future[R])(op: => Future[R]): Future[R] = {
- if (System.currentTimeMillis() >= cachedAt + cacheTTL) {
- // future is too old. so need to expire and fetch new data from storage.
- futureCache.asMap().remove(cacheKey)
-
- val newPromise = Promise[R]
- val now = System.currentTimeMillis()
-
- futureCache.asMap().putIfAbsent(cacheKey, (now, newPromise)) match {
- case null =>
- // only one thread succeed to come here concurrently
- // initiate fetch to storage then add callback on complete to finish promise.
- op.onSuccess { case value =>
- newPromise.success(value)
- value
- }
- newPromise.future
- case (cachedAt, oldPromise) => oldPromise.future
- }
- } else {
- // future is not to old so reuse it.
- oldFuture
- }
- }
- def getOrElseUpdate(cacheKey: Long, cacheTTL: Long)(op: => Future[R]): Future[R] = {
- val cacheVal = futureCache.getIfPresent(cacheKey)
- cacheVal match {
- case null =>
- val promise = Promise[R]
- val now = System.currentTimeMillis()
- val (cachedAt, cachedPromise) = futureCache.asMap().putIfAbsent(cacheKey, (now, promise)) match {
- case null =>
- op.onSuccess { case value =>
- promise.success(value)
- value
- }
- (now, promise)
- case oldVal => oldVal
- }
- checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op)
-
- case (cachedAt, cachedPromise) =>
- checkAndExpire(cacheKey, cacheTTL, cachedAt, cachedPromise.future)(op)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/09a7919d/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
index e37080e..4149540 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/utils/Logger.scala
@@ -50,6 +50,9 @@ object logger {
private val logger = LoggerFactory.getLogger("application")
private val errorLogger = LoggerFactory.getLogger("error")
+ private val metricLogger = LoggerFactory.getLogger("metrics")
+
+ def metric[T: Loggable](msg: => T) = metricLogger.info(implicitly[Loggable[T]].toLogMessage(msg))
def info[T: Loggable](msg: => T) = logger.info(implicitly[Loggable[T]].toLogMessage(msg))