You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2021/08/19 13:36:24 UTC

[incubator-tuweni] branch main updated: Add a new handler to poll endpoint directly

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/main by this push:
     new e748c26  Add a new handler to poll endpoint directly
     new 330795f  Merge pull request #340 from atoulme/add_polling_cache_handler
e748c26 is described below

commit e748c26e28a337a659d8a489d2c20b5cde96fc9d
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Thu Aug 19 05:57:53 2021 -0700

    Add a new handler to poll endpoint directly
---
 .../kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt | 30 ++++++++-
 .../tuweni/jsonrpc/methods/MethodsHandler.kt       | 71 +++++++++++++++++++++-
 .../tuweni/jsonrpc/methods/MethodsHandlerTest.kt   | 31 +++++++++-
 3 files changed, 127 insertions(+), 5 deletions(-)

diff --git a/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt b/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt
index 5305818..20d5804 100644
--- a/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt
+++ b/eth/src/main/kotlin/org/apache/tuweni/eth/JSONRPCRequest.kt
@@ -19,10 +19,38 @@ package org.apache.tuweni.eth
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties
 import com.fasterxml.jackson.annotation.JsonProperty
 
+/**
+ * JSONRPCRequest represents a JSON-RPC request to a JSON-RPC service.
+ */
 @JsonIgnoreProperties(ignoreUnknown = true)
 data class JSONRPCRequest(
   @JsonProperty("id") val id: Int,
   @JsonProperty("method") val method: String,
   @JsonProperty("params") val params: Array<String>,
   @JsonProperty("jsonrpc") val jsonrpc: String = "2.0"
-)
+) {
+
+  companion object {
+    /**
+     * Deserialize a string into a JSON-RPC request.
+     * The request is incomplete, as no id is set.
+     *
+     * The serialized form follows this formula:
+     * <method>|<params, joined by comma>
+     *
+     * Example:
+     * - eth_getBlockByNumber|latest,true
+     */
+    fun deserialize(serialized: String): JSONRPCRequest {
+      val segments = serialized.split("|")
+      return JSONRPCRequest(id = 0, method = segments[0], params = segments[1].split(",").toTypedArray())
+    }
+  }
+
+  override fun equals(other: Any?) = other is JSONRPCRequest && this.method == other.method && params.contentEquals(other.params)
+  override fun hashCode() = 31 * method.hashCode() + params.contentHashCode()
+
+  fun serializeRequest(): String {
+    return this.method + "|" + this.params.joinToString(",")
+  }
+}
diff --git a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt
index b881d76..45a18c3 100644
--- a/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt
+++ b/jsonrpc/src/main/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandler.kt
@@ -20,12 +20,18 @@ import com.netflix.concurrency.limits.limit.FixedLimit
 import com.netflix.concurrency.limits.limiter.SimpleLimiter
 import io.opentelemetry.api.metrics.LongCounter
 import io.opentelemetry.api.metrics.common.Labels
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
 import org.apache.tuweni.eth.JSONRPCRequest
 import org.apache.tuweni.eth.JSONRPCResponse
 import org.apache.tuweni.eth.methodNotEnabled
 import org.apache.tuweni.eth.methodNotFound
 import org.apache.tuweni.eth.tooManyRequests
 import org.apache.tuweni.kv.KeyValueStore
+import org.slf4j.LoggerFactory
+import kotlin.coroutines.CoroutineContext
 
 class MethodsRouter(val methodsMap: Map<String, suspend (JSONRPCRequest) -> JSONRPCResponse>) {
 
@@ -123,7 +129,7 @@ class CachingHandler(
     if (!found) {
       return delegateHandler(request)
     } else {
-      val serializedRequest = serializeRequest(request)
+      val serializedRequest = request.serializeRequest()
       val response = cacheStore.get(serializedRequest)
       return if (response == null) {
         cacheMissCounter.add(1)
@@ -138,8 +144,67 @@ class CachingHandler(
       }
     }
   }
+}
+
+class CachingPollingHandler(
+  private val cachedRequests: List<JSONRPCRequest>,
+  private val pollPeriodMillis: Long,
+  private val cacheStore: KeyValueStore<JSONRPCRequest, JSONRPCResponse>,
+  private val cacheHitCounter: LongCounter,
+  private val cacheMissCounter: LongCounter,
+  override val coroutineContext: CoroutineContext = Dispatchers.Default,
+  private val delegateHandler: suspend (JSONRPCRequest) -> JSONRPCResponse,
+) : CoroutineScope {
+
+  companion object {
+    private val logger = LoggerFactory.getLogger(CachingPollingHandler::class.java)
+  }
+
+  init {
+    poll()
+  }
+
+  private fun poll() {
+    launch {
+      try {
+        var id = 1337
+        for (cachedRequest in cachedRequests) {
+          val newResponse = delegateHandler(cachedRequest.copy(id = id))
+          id++
+          if (newResponse.error == null) {
+            cacheStore.put(cachedRequest, newResponse)
+          } else {
+            logger.warn("{}, got error:\n{}", cachedRequest, newResponse.error)
+          }
+        }
+      } catch (e: Exception) {
+        logger.error("Error polling JSON-RPC endpoint", e)
+      }
+      delay(pollPeriodMillis)
+      poll()
+    }
+  }
 
-  private fun serializeRequest(request: JSONRPCRequest): String {
-    return request.method + "|" + request.params.joinToString(",")
+  suspend fun handleRequest(request: JSONRPCRequest): JSONRPCResponse {
+    var found = false
+    if (cachedRequests.contains(request)) {
+      found = true
+    }
+    if (!found) {
+      return delegateHandler(request)
+    } else {
+      val response = cacheStore.get(request)
+      if (response == null) {
+        cacheMissCounter.add(1)
+        val newResponse = delegateHandler(request)
+        if (newResponse.error == null) {
+          cacheStore.put(request, newResponse)
+        }
+        return newResponse
+      } else {
+        cacheHitCounter.add(1)
+        return response.copy(id = request.id)
+      }
+    }
   }
 }
diff --git a/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt b/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt
index 8624a03..ef88102 100644
--- a/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt
+++ b/jsonrpc/src/test/kotlin/org/apache/tuweni/jsonrpc/methods/MethodsHandlerTest.kt
@@ -176,7 +176,7 @@ class CachingHandlerTest {
     val meterSdk = SdkMeterProvider.builder().build()
     val meter = meterSdk.get("handler")
     val handler = CachingHandler(listOf("foo"), kv, meter.longCounterBuilder("foo").build(), meter.longCounterBuilder("bar").build()) {
-      if (it.params.size > 0) {
+      if (it.params.isNotEmpty()) {
         JSONRPCResponse(id = 1, error = JSONRPCError(1234, ""))
       } else {
         JSONRPCResponse(id = 1)
@@ -193,3 +193,32 @@ class CachingHandlerTest {
     assertEquals(1, map.size)
   }
 }
+
+class CachingPollingHandlerTest {
+
+  @Test
+  fun testCache() = runBlocking {
+    val map = HashMap<JSONRPCRequest, JSONRPCResponse>()
+    val kv = MapKeyValueStore.open(map)
+    val meterSdk = SdkMeterProvider.builder().build()
+    val meter = meterSdk.get("handler")
+    val handler = CachingPollingHandler(listOf(JSONRPCRequest(1, "foo", arrayOf())), 1000, kv, meter.longCounterBuilder("foo").build(), meter.longCounterBuilder("bar").build()) {
+      if (it.params.isNotEmpty()) {
+        JSONRPCResponse(id = 1, error = JSONRPCError(1234, ""))
+      } else {
+        JSONRPCResponse(id = 1)
+      }
+    }
+    delay(500)
+    assertEquals(1, map.size)
+    handler.handleRequest(JSONRPCRequest(id = 1, method = "foo", params = arrayOf()))
+    assertEquals(1, map.size)
+    handler.handleRequest(JSONRPCRequest(id = 1, method = "bar", params = arrayOf()))
+    assertEquals(1, map.size)
+    handler.handleRequest(JSONRPCRequest(id = 1, method = "foo", params = arrayOf()))
+    assertEquals(1, map.size)
+    val errorResp = handler.handleRequest(JSONRPCRequest(id = 1, method = "foo", params = arrayOf("bleh")))
+    assertEquals(1, map.size)
+    assertNotNull(errorResp.error)
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org