You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/12/13 05:20:56 UTC
[incubator-tuweni] branch master updated: remove unused classes
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new ae69233 remove unused classes
new 1f91a08 Merge pull request #183 from atoulme/remove_unused
ae69233 is described below
commit ae692337f0d68521fc03c18a5d9ca7e846e7c421
Author: Antoine Toulme <an...@lunar-ocean.com>
AuthorDate: Sat Dec 12 20:41:06 2020 -0800
remove unused classes
---
.../tuweni/concurrent/coroutines/AsyncResult.kt | 14 ---
.../tuweni/concurrent/coroutines/CoroutineLatch.kt | 105 ------------------
.../concurrent/coroutines/CoroutineLatchTest.kt | 118 ---------------------
.../org/apache/tuweni/devp2p/DiscoveryService.kt | 27 ++---
4 files changed, 6 insertions(+), 258 deletions(-)
diff --git a/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/AsyncResult.kt b/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/AsyncResult.kt
index ce0a78a..c5e1d66 100644
--- a/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/AsyncResult.kt
+++ b/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/AsyncResult.kt
@@ -27,11 +27,9 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import org.apache.tuweni.concurrent.AsyncResult
-import org.apache.tuweni.concurrent.CompletableAsyncResult
import java.util.concurrent.CancellationException
import java.util.concurrent.CompletionException
import java.util.function.BiConsumer
-import kotlin.Result
import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
@@ -77,18 +75,6 @@ fun <T> CoroutineScope.asyncResult(
return asyncResult
}
-private class AsyncResultCoroutine<T>(
- override val context: CoroutineContext,
- val asyncResult: CompletableAsyncResult<T> = AsyncResult.incomplete()
-) : Continuation<T>, CoroutineScope {
- override val coroutineContext: CoroutineContext get() = context
- override fun resumeWith(result: Result<T>) {
- result
- .onSuccess { asyncResult.complete(it) }
- .onFailure { asyncResult.completeExceptionally(it) }
- }
-}
-
/**
* Converts this deferred value to an [AsyncResult].
* The deferred value is cancelled when the returned [AsyncResult] is cancelled or otherwise completed.
diff --git a/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatch.kt b/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatch.kt
deleted file mode 100644
index 0dfd10d..0000000
--- a/concurrent-coroutines/src/main/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatch.kt
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.concurrent.coroutines
-
-import kotlinx.coroutines.suspendCancellableCoroutine
-import java.util.concurrent.atomic.AtomicInteger
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
-
-/**
- * A co-routine synchronization aid that allows co-routines to wait until a set of operations being performed
- * has completed.
- *
- * The latch is initialized with a given count. If the latch count is greater than zero, the `await()` method will
- * suspend until the count reaches zero due to invocations of the `countDown()` method, at which point all suspended
- * co-routines will be resumed.
- *
- * Unlike the Java `CountDownLatch`, this latch allows the count to be increased via invocation of the `countUp()`
- * method. Increasing the count from zero will result in calls to `await()` suspending again. Note that the count may
- * be negative, requiring multiple calls to `countUp()` before calls to `await()` suspend.
- *
- * @param initial The initial count of the latch, which may be positive, zero, or negative.
- * @constructor A latch.
- */
-class CoroutineLatch(initial: Int) {
-
- private val atomicCount = AtomicInteger(initial)
- private var waitingCoroutines = mutableListOf<Continuation<Unit>>()
-
- /**
- * The current latch count.
- */
- val count: Int
- get() = atomicCount.get()
-
- /**
- * Indicates if the latch is open (`count <= 0`).
- */
- val isOpen: Boolean
- get() = atomicCount.get() <= 0
-
- /**
- * Decrease the latch count, potentially opening the latch and awakening suspending co-routines.
- *
- * @return `true` if the latch was opened as a result of this invocation.
- */
- fun countDown(): Boolean {
- var toAwaken: List<Continuation<Unit>>? = null
- synchronized(this) {
- if (atomicCount.decrementAndGet() == 0) {
- toAwaken = waitingCoroutines
- waitingCoroutines = mutableListOf()
- }
- }
- toAwaken?.forEach { it.resume(Unit) }
- return toAwaken != null
- }
-
- /**
- * Increase the latch count, potentially closing the latch.
- *
- * @return `true` if the latch was closed as a result of this invocation.
- */
- fun countUp(): Boolean = atomicCount.incrementAndGet() == 1
-
- /**
- * Await the latch opening. If already open, return without suspending.
- */
- suspend fun await() {
- if (atomicCount.get() <= 0) {
- return
- }
- suspendCancellableCoroutine { cont: Continuation<Unit> ->
- try {
- var suspended: Boolean
- synchronized(this) {
- suspended = atomicCount.get() > 0
- if (suspended) {
- waitingCoroutines.add(cont)
- }
- }
- if (!suspended) {
- cont.resume(Unit)
- }
- } catch (e: Throwable) {
- cont.resumeWithException(e)
- }
- }
- }
-}
diff --git a/concurrent-coroutines/src/test/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatchTest.kt b/concurrent-coroutines/src/test/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatchTest.kt
deleted file mode 100644
index 61ab68c..0000000
--- a/concurrent-coroutines/src/test/kotlin/org/apache/tuweni/concurrent/coroutines/CoroutineLatchTest.kt
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tuweni.concurrent.coroutines
-
-import kotlinx.coroutines.TimeoutCancellationException
-import kotlinx.coroutines.async
-import kotlinx.coroutines.runBlocking
-import kotlinx.coroutines.withTimeout
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Assertions.assertFalse
-import org.junit.jupiter.api.Assertions.assertTrue
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.assertThrows
-
-internal class CoroutineLatchTest {
-
- @Test
- fun shouldntSuspendWhenLatchIsOpen() = runBlocking {
- withTimeout(1) {
- CoroutineLatch(0).await()
- }
- withTimeout(1) {
- CoroutineLatch(-1).await()
- }
- }
-
- @Test
- fun shouldUnsuspendWhenLatchOpens() = runBlocking {
- val latch = CoroutineLatch(2)
- assertFalse(latch.isOpen)
- assertEquals(2, latch.count)
-
- var ok = false
- var done = false
- val job = async {
- latch.await()
- assertTrue(ok, "failed to suspend")
- done = true
- }
-
- Thread.sleep(100)
- assertFalse(latch.countDown())
- assertFalse(latch.isOpen)
- assertEquals(1, latch.count)
-
- Thread.sleep(100)
- assertFalse(done, "woke up too early")
-
- ok = true
- assertTrue(latch.countDown())
- assertTrue(latch.isOpen)
- assertEquals(0, latch.count)
- job.await()
- assertTrue(done, "failed to wakeup")
- }
-
- @Test
- fun shouldSuspendWhenLatchCloses() = runBlocking {
- val latch = CoroutineLatch(-1)
- assertTrue(latch.isOpen)
- assertEquals(-1, latch.count)
-
- withTimeout(1) {
- latch.await()
- }
-
- assertFalse(latch.countUp())
- assertTrue(latch.isOpen)
- assertEquals(0, latch.count)
-
- withTimeout(1) {
- latch.await()
- }
-
- assertTrue(latch.countUp())
- assertFalse(latch.isOpen)
- assertEquals(1, latch.count)
-
- var ok = false
- var done = false
- val job = async {
- latch.await()
- assertTrue(ok, "failed to suspend")
- done = true
- }
-
- ok = true
- assertTrue(latch.countDown())
- assertTrue(latch.isOpen)
- job.await()
- assertTrue(done, "failed to wakeup")
- }
-
- @Test
- fun shouldTimeoutWhenBlocked() {
- assertThrows<TimeoutCancellationException> {
- runBlocking {
- withTimeout(1) {
- CoroutineLatch(1).await()
- }
- }
- }
- }
-}
diff --git a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt
index c6816fc..b3a2770 100644
--- a/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt
+++ b/devp2p/src/main/kotlin/org/apache/tuweni/devp2p/DiscoveryService.kt
@@ -48,7 +48,6 @@ import org.apache.tuweni.bytes.Bytes
import org.apache.tuweni.bytes.Bytes32
import org.apache.tuweni.concurrent.AsyncCompletion
import org.apache.tuweni.concurrent.AsyncResult
-import org.apache.tuweni.concurrent.coroutines.CoroutineLatch
import org.apache.tuweni.concurrent.coroutines.asyncCompletion
import org.apache.tuweni.concurrent.coroutines.asyncResult
import org.apache.tuweni.concurrent.coroutines.await
@@ -114,7 +113,7 @@ interface DiscoveryService {
* @param advertiseTcpPort the TCP port to advertise to peers, or `null` if it should be the same as the UDP port.
* @param routingTable a [PeerRoutingTable] which handles the ÐΞVp2p routing table
* @param packetFilter a filter for incoming packets
-\ * @param bufferAllocator a [ByteBuffer] allocator, which must return buffers of size 1280 bytes or larger
+ \ * @param bufferAllocator a [ByteBuffer] allocator, which must return buffers of size 1280 bytes or larger
* @param timeSupplier a function supplying the current time, in milliseconds since the epoch
*/
@JvmOverloads
@@ -319,7 +318,6 @@ internal class CoroutineDiscoveryService constructor(
private var enr: Bytes? = null
private val shutdown = AtomicBoolean(false)
- private val activityLatch = CoroutineLatch(1)
private val bootstrapped = AsyncCompletion.incomplete()
private var refreshLoop: Job? = null
private val server = vertx.createDatagramSocket()
@@ -370,24 +368,14 @@ internal class CoroutineDiscoveryService constructor(
)
selfEndpoint = endpoint
refreshLoop = launch {
- activityLatch.countUp()
- try {
- while (true) {
- delay(REFRESH_INTERVAL_MS)
- refresh()
- }
- } finally {
- activityLatch.countDown()
+ while (true) {
+ delay(REFRESH_INTERVAL_MS)
+ refresh()
}
}
launch {
bootstrapURIs.map { uri ->
- activityLatch.countUp()
- try {
- bootstrapFrom(uri)
- } finally {
- activityLatch.countDown()
- }
+ bootstrapFrom(uri)
}
bootstrapped.complete()
}
@@ -432,16 +420,14 @@ internal class CoroutineDiscoveryService constructor(
return
}
- activityLatch.countUp()
val arrivalTime = timeSupplier()
- val job = launch {
+ launch {
try {
receivePacket(packet.data(), packet.sender(), arrivalTime)
} catch (e: Throwable) {
logger.error("$serviceDescriptor: unexpected error during packet handling", e)
}
}
- job.invokeOnCompletion { activityLatch.countDown() }
}
override suspend fun awaitBootstrap() = bootstrapped.await()
@@ -460,7 +446,6 @@ internal class CoroutineDiscoveryService constructor(
verifyingEndpoints.cleanUp()
findNodeStates.invalidateAll()
findNodeStates.cleanUp()
- activityLatch.countDown()
refreshLoop?.cancel()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org