You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/12/13 05:20:56 UTC

[incubator-tuweni] branch master updated: remove unused classes

This is an automated email from the ASF dual-hosted git repository.

toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git


The following commit(s) were added to refs/heads/master by this push:
     new ae69233  remove unused classes
     new 1f91a08  Merge pull request #183 from atoulme/remove_unused
ae69233 is described below

commit ae692337f0d68521fc03c18a5d9ca7e846e7c421
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Sat Dec 12 20:41:06 2020 -0800

    remove unused classes
---
 .../tuweni/concurrent/coroutines/AsyncResult.kt    |  14 ---
 .../tuweni/concurrent/coroutines/CoroutineLatch.kt | 105 ------------------
 .../concurrent/coroutines/CoroutineLatchTest.kt    | 118 ---------------------
 .../org/apache/tuweni/devp2p/DiscoveryService.kt   |  27 ++---
 4 files changed, 6 insertions(+), 258 deletions(-)

diff --git a/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/AsyncResult.kt b/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/AsyncResult.kt
index ce0a78a..c5e1d66 100644
--- a/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/AsyncResult.kt
+++ b/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/AsyncResult.kt
@@ -27,11 +27,9 @@ import kotlinx.coroutines.Job
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.suspendCancellableCoroutine
 import org.apache.tuweni.concurrent.AsyncResult
-import org.apache.tuweni.concurrent.CompletableAsyncResult
 import java.util.concurrent.CancellationException
 import java.util.concurrent.CompletionException
 import java.util.function.BiConsumer
-import kotlin.Result
 import kotlin.coroutines.Continuation
 import kotlin.coroutines.CoroutineContext
 import kotlin.coroutines.resume
@@ -77,18 +75,6 @@ fun <T> CoroutineScope.asyncResult(
   return asyncResult
 }
 
-private class AsyncResultCoroutine<T>(
-  override val context: CoroutineContext,
-  val asyncResult: CompletableAsyncResult<T> = AsyncResult.incomplete()
-) : Continuation<T>, CoroutineScope {
-  override val coroutineContext: CoroutineContext get() = context
-  override fun resumeWith(result: Result<T>) {
-    result
-      .onSuccess { asyncResult.complete(it) }
-      .onFailure { asyncResult.completeExceptionally(it) }
-  }
-}
-
 /**
  * Converts this deferred value to an [AsyncResult].
  * The deferred value is cancelled when the returned [AsyncResult] is cancelled or otherwise completed.
diff --git a/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatch.kt b/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatch.kt
deleted file mode 100644
index 0dfd10d..0000000
--- a/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatch.kt
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.concurrent.coroutines
-
-import kotlinx.coroutines.suspendCancellableCoroutine
-import java.util.concurrent.atomic.AtomicInteger
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
-
-/**
- * A co-routine synchronization aid that allows co-routines to wait until a set of operations being performed
- * has completed.
- *
- * The latch is initialized with a given count. If the latch count is greater than zero, the `await()` method will
- * suspend until the count reaches zero due to invocations of the `countDown()` method, at which point all suspended
- * co-routines will be resumed.
- *
- * Unlike the Java `CountDownLatch`, this latch allows the count to be increased via invocation of the `countUp()`
- * method. Increasing the count from zero will result in calls to `await()` suspending again. Note that the count may
- * be negative, requiring multiple calls to `countUp()` before calls to `await()` suspend.
- *
- * @param initial The initial count of the latch, which may be positive, zero, or negative.
- * @constructor A latch.
- */
-class CoroutineLatch(initial: Int) {
-
-  private val atomicCount = AtomicInteger(initial)
-  private var waitingCoroutines = mutableListOf<Continuation<Unit>>()
-
-  /**
-   * The current latch count.
-   */
-  val count: Int
-    get() = atomicCount.get()
-
-  /**
-   * Indicates if the latch is open (`count <= 0`).
-   */
-  val isOpen: Boolean
-    get() = atomicCount.get() <= 0
-
-  /**
-   * Decrease the latch count, potentially opening the latch and awakening suspending co-routines.
-   *
-   * @return `true` if the latch was opened as a result of this invocation.
-   */
-  fun countDown(): Boolean {
-    var toAwaken: List<Continuation<Unit>>? = null
-    synchronized(this) {
-      if (atomicCount.decrementAndGet() == 0) {
-        toAwaken = waitingCoroutines
-        waitingCoroutines = mutableListOf()
-      }
-    }
-    toAwaken?.forEach { it.resume(Unit) }
-    return toAwaken != null
-  }
-
-  /**
-   * Increase the latch count, potentially closing the latch.
-   *
-   * @return `true` if the latch was closed as a result of this invocation.
-   */
-  fun countUp(): Boolean = atomicCount.incrementAndGet() == 1
-
-  /**
-   * Await the latch opening. If already open, return without suspending.
-   */
-  suspend fun await() {
-    if (atomicCount.get() <= 0) {
-      return
-    }
-    suspendCancellableCoroutine { cont: Continuation<Unit> ->
-      try {
-        var suspended: Boolean
-        synchronized(this) {
-          suspended = atomicCount.get() > 0
-          if (suspended) {
-            waitingCoroutines.add(cont)
-          }
-        }
-        if (!suspended) {
-          cont.resume(Unit)
-        }
-      } catch (e: Throwable) {
-        cont.resumeWithException(e)
-      }
-    }
-  }
-}
diff --git a/concurrent-coroutines/src/test/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatchTest.kt b/concurrent-coroutines/src/test/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatchTest.kt
deleted file mode 100644
index 61ab68c..0000000
--- a/concurrent-coroutines/src/test/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatchTest.kt
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.concurrent.coroutines
-
-import kotlinx.coroutines.TimeoutCancellationException
-import kotlinx.coroutines.async
-import kotlinx.coroutines.runBlocking
-import kotlinx.coroutines.withTimeout
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertFalse
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-
-internal class CoroutineLatchTest {
-
-  @Test
-  fun shouldntSuspendWhenLatchIsOpen() = runBlocking {
-    withTimeout(1) {
-      CoroutineLatch(0).await()
-    }
-    withTimeout(1) {
-      CoroutineLatch(-1).await()
-    }
-  }
-
-  @Test
-  fun shouldUnsuspendWhenLatchOpens() = runBlocking {
-    val latch = CoroutineLatch(2)
-    assertFalse(latch.isOpen)
-    assertEquals(2, latch.count)
-
-    var ok = false
-    var done = false
-    val job = async {
-      latch.await()
-      assertTrue(ok, "failed to suspend")
-      done = true
-    }
-
-    Thread.sleep(100)
-    assertFalse(latch.countDown())
-    assertFalse(latch.isOpen)
-    assertEquals(1, latch.count)
-
-    Thread.sleep(100)
-    assertFalse(done, "woke up too early")
-
-    ok = true
-    assertTrue(latch.countDown())
-    assertTrue(latch.isOpen)
-    assertEquals(0, latch.count)
-    job.await()
-    assertTrue(done, "failed to wakeup")
-  }
-
-  @Test
-  fun shouldSuspendWhenLatchCloses() = runBlocking {
-    val latch = CoroutineLatch(-1)
-    assertTrue(latch.isOpen)
-    assertEquals(-1, latch.count)
-
-    withTimeout(1) {
-      latch.await()
-    }
-
-    assertFalse(latch.countUp())
-    assertTrue(latch.isOpen)
-    assertEquals(0, latch.count)
-
-    withTimeout(1) {
-      latch.await()
-    }
-
-    assertTrue(latch.countUp())
-    assertFalse(latch.isOpen)
-    assertEquals(1, latch.count)
-
-    var ok = false
-    var done = false
-    val job = async {
-      latch.await()
-      assertTrue(ok, "failed to suspend")
-      done = true
-    }
-
-    ok = true
-    assertTrue(latch.countDown())
-    assertTrue(latch.isOpen)
-    job.await()
-    assertTrue(done, "failed to wakeup")
-  }
-
-  @Test
-  fun shouldTimeoutWhenBlocked() {
-    assertThrows<TimeoutCancellationException> {
-      runBlocking {
-        withTimeout(1) {
-          CoroutineLatch(1).await()
-        }
-      }
-    }
-  }
-}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt
index c6816fc..b3a2770 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt
@@ -48,7 +48,6 @@ import org.apache.tuweni.bytes.Bytes
 import org.apache.tuweni.bytes.Bytes32
 import org.apache.tuweni.concurrent.AsyncCompletion
 import org.apache.tuweni.concurrent.AsyncResult
-import org.apache.tuweni.concurrent.coroutines.CoroutineLatch
 import org.apache.tuweni.concurrent.coroutines.asyncCompletion
 import org.apache.tuweni.concurrent.coroutines.asyncResult
 import org.apache.tuweni.concurrent.coroutines.await
@@ -114,7 +113,7 @@ interface DiscoveryService {
      * @param advertiseTcpPort the TCP port to advertise to peers, or `null` if it should be the same as the UDP port.
      * @param routingTable a [PeerRoutingTable] which handles the ÐΞVp2p routing table
      * @param packetFilter a filter for incoming packets
-\     * @param bufferAllocator a [ByteBuffer] allocator, which must return buffers of size 1280 bytes or larger
+     \     * @param bufferAllocator a [ByteBuffer] allocator, which must return buffers of size 1280 bytes or larger
      * @param timeSupplier a function supplying the current time, in milliseconds since the epoch
      */
     @JvmOverloads
@@ -319,7 +318,6 @@ internal class CoroutineDiscoveryService constructor(
   private var enr: Bytes? = null
 
   private val shutdown = AtomicBoolean(false)
-  private val activityLatch = CoroutineLatch(1)
   private val bootstrapped = AsyncCompletion.incomplete()
   private var refreshLoop: Job? = null
   private val server = vertx.createDatagramSocket()
@@ -370,24 +368,14 @@ internal class CoroutineDiscoveryService constructor(
     )
     selfEndpoint = endpoint
     refreshLoop = launch {
-      activityLatch.countUp()
-      try {
-        while (true) {
-          delay(REFRESH_INTERVAL_MS)
-          refresh()
-        }
-      } finally {
-        activityLatch.countDown()
+      while (true) {
+        delay(REFRESH_INTERVAL_MS)
+        refresh()
       }
     }
     launch {
       bootstrapURIs.map { uri ->
-        activityLatch.countUp()
-        try {
-          bootstrapFrom(uri)
-        } finally {
-          activityLatch.countDown()
-        }
+        bootstrapFrom(uri)
       }
       bootstrapped.complete()
     }
@@ -432,16 +420,14 @@ internal class CoroutineDiscoveryService constructor(
       return
     }
 
-    activityLatch.countUp()
     val arrivalTime = timeSupplier()
-    val job = launch {
+    launch {
       try {
         receivePacket(packet.data(), packet.sender(), arrivalTime)
       } catch (e: Throwable) {
         logger.error("$serviceDescriptor: unexpected error during packet handling", e)
       }
     }
-    job.invokeOnCompletion { activityLatch.countDown() }
   }
 
   override suspend fun awaitBootstrap() = bootstrapped.await()
@@ -460,7 +446,6 @@ internal class CoroutineDiscoveryService constructor(
       verifyingEndpoints.cleanUp()
       findNodeStates.invalidateAll()
       findNodeStates.cleanUp()
-      activityLatch.countDown()
       refreshLoop?.cancel()
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org