You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2021/08/19 13:36:24 UTC
[incubator-tuweni] branch main updated: Add a new handler to poll
endpoint directly
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push:
new e748c26 Add a new handler to poll endpoint directly
new 330795f Merge pull request #340 from atoulme/add_polling_cache_handler
e748c26 is described below
commit e748c26e28a337a659d8a489d2c20b5cde96fc9d
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Thu Aug 19 05:57:53 2021 -0700
Add a new handler to poll endpoint directly
---
.../kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt | 30 ++++++++-
.../tuweni/jsonrpc/methods/MethodsHandler.kt | 71 +++++++++++++++++++++-
.../tuweni/jsonrpc/methods/MethodsHandlerTest.kt | 31 +++++++++-
3 files changed, 127 insertions(+), 5 deletions(-)
diff --git a/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt b/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt
index 5305818..20d5804 100644
--- a/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt
+++ b/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt
@@ -19,10 +19,38 @@ package org.apache.tuweni.eth
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.annotation.JsonProperty
+/**
+ * JSONRPCRequest represents a JSON-RPC request to a JSON-RPC service.
+ */
@JsonIgnoreProperties(ignoreUnknown = true)
data class JSONRPCRequest(
@JsonProperty("id") val id: Int,
@JsonProperty("method") val method: String,
@JsonProperty("params") val params: Array<String>,
@JsonProperty("jsonrpc") val jsonrpc: String = "2.0"
-)
+) {
+
+ companion object {
+ /**
+ * Deserialize a string into a JSON-RPC request.
+ * The request is incomplete, as no id is set.
+ *
+ * The serialized form follows this formula:
+ * <method>|<params, joined by comma>
+ *
+ * Example:
+ * - eth_getBlockByNumber|latest,true
+ */
+ fun deserialize(serialized: String): JSONRPCRequest {
+ val segments = serialized.split("|")
+ return JSONRPCRequest(id = 0, method = segments[0], params = segments[1].split(",").toTypedArray())
+ }
+ }
+
+ override fun equals(other: Any?) = other is JSONRPCRequest && this.method == other.method && params.contentEquals(other.params)
+ override fun hashCode() = 31 * method.hashCode() + params.contentHashCode()
+
+ fun serializeRequest(): String {
+ return this.method + "|" + this.params.joinToString(",")
+ }
+}
diff --git a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt
index b881d76..45a18c3 100644
--- a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt
+++ b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt
@@ -20,12 +20,18 @@ import com.netflix.concurrency.limits.limit.FixedLimit
import com.netflix.concurrency.limits.limiter.SimpleLimiter
import io.opentelemetry.api.metrics.LongCounter
import io.opentelemetry.api.metrics.common.Labels
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
import org.apache.tuweni.eth.JSONRPCRequest
import org.apache.tuweni.eth.JSONRPCResponse
import org.apache.tuweni.eth.methodNotEnabled
import org.apache.tuweni.eth.methodNotFound
import org.apache.tuweni.eth.tooManyRequests
import org.apache.tuweni.kv.KeyValueStore
+import org.slf4j.LoggerFactory
+import kotlin.coroutines.CoroutineContext
class MethodsRouter(val methodsMap: Map<String, suspend (JSONRPCRequest) -> JSONRPCResponse>) {
@@ -123,7 +129,7 @@ class CachingHandler(
if (!found) {
return delegateHandler(request)
} else {
- val serializedRequest = serializeRequest(request)
+ val serializedRequest = request.serializeRequest()
val response = cacheStore.get(serializedRequest)
return if (response == null) {
cacheMissCounter.add(1)
@@ -138,8 +144,67 @@ class CachingHandler(
}
}
}
+}
+
+class CachingPollingHandler(
+ private val cachedRequests: List<JSONRPCRequest>,
+ private val pollPeriodMillis: Long,
+ private val cacheStore: KeyValueStore<JSONRPCRequest, JSONRPCResponse>,
+ private val cacheHitCounter: LongCounter,
+ private val cacheMissCounter: LongCounter,
+ override val coroutineContext: CoroutineContext = Dispatchers.Default,
+ private val delegateHandler: suspend (JSONRPCRequest) -> JSONRPCResponse,
+) : CoroutineScope {
+
+ companion object {
+ private val logger = LoggerFactory.getLogger(CachingPollingHandler::class.java)
+ }
+
+ init {
+ poll()
+ }
+
+ private fun poll() {
+ launch {
+ try {
+ var id = 1337
+ for (cachedRequest in cachedRequests) {
+ val newResponse = delegateHandler(cachedRequest.copy(id = id))
+ id++
+ if (newResponse.error == null) {
+ cacheStore.put(cachedRequest, newResponse)
+ } else {
+ logger.warn("{}, got error:\n{}", cachedRequest, newResponse.error)
+ }
+ }
+ } catch (e: Exception) {
+ logger.error("Error polling JSON-RPC endpoint", e)
+ }
+ delay(pollPeriodMillis)
+ poll()
+ }
+ }
- private fun serializeRequest(request: JSONRPCRequest): String {
- return request.method + "|" + request.params.joinToString(",")
+ suspend fun handleRequest(request: JSONRPCRequest): JSONRPCResponse {
+ var found = false
+ if (cachedRequests.contains(request)) {
+ found = true
+ }
+ if (!found) {
+ return delegateHandler(request)
+ } else {
+ val response = cacheStore.get(request)
+ if (response == null) {
+ cacheMissCounter.add(1)
+ val newResponse = delegateHandler(request)
+ if (newResponse.error == null) {
+ cacheStore.put(request, newResponse)
+ }
+ return newResponse
+ } else {
+ cacheHitCounter.add(1)
+ return response.copy(id = request.id)
+ }
+ }
}
}
diff --git a/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt b/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt
index 8624a03..ef88102 100644
--- a/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt
+++ b/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt
@@ -176,7 +176,7 @@ class CachingHandlerTest {
val meterSdk = SdkMeterProvider.builder().build()
val meter = meterSdk.get("handler")
val handler = CachingHandler(listOf("foo"), kv, meter.longCounterBuilder("foo").build(), meter.longCounterBuilder("bar").build()) {
- if (it.params.size > 0) {
+ if (it.params.isNotEmpty()) {
JSONRPCResponse(id = 1, error = JSONRPCError(1234, ""))
} else {
JSONRPCResponse(id = 1)
@@ -193,3 +193,32 @@ class CachingHandlerTest {
assertEquals(1, map.size)
}
}
+
+class CachingPollingHandlerTest {
+
+ @Test
+ fun testCache() = runBlocking {
+ val map = HashMap<JSONRPCRequest, JSONRPCResponse>()
+ val kv = MapKeyValueStore.open(map)
+ val meterSdk = SdkMeterProvider.builder().build()
+ val meter = meterSdk.get("handler")
+ val handler = CachingPollingHandler(listOf(JSONRPCRequest(1, "foo", arrayOf())), 1000, kv, meter.longCounterBuilder("foo").build(), meter.longCounterBuilder("bar").build()) {
+ if (it.params.isNotEmpty()) {
+ JSONRPCResponse(id = 1, error = JSONRPCError(1234, ""))
+ } else {
+ JSONRPCResponse(id = 1)
+ }
+ }
+ delay(500)
+ assertEquals(1, map.size)
+ handler.handleRequest(JSONRPCRequest(id = 1, method = "foo", params = arrayOf()))
+ assertEquals(1, map.size)
+ handler.handleRequest(JSONRPCRequest(id = 1, method = "bar", params = arrayOf()))
+ assertEquals(1, map.size)
+ handler.handleRequest(JSONRPCRequest(id = 1, method = "foo", params = arrayOf()))
+ assertEquals(1, map.size)
+ val errorResp = handler.handleRequest(JSONRPCRequest(id = 1, method = "foo", params = arrayOf("bleh")))
+ assertEquals(1, map.size)
+ assertNotNull(errorResp.error)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org