You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuweni.apache.org by to...@apache.org on 2020/06/27 18:40:06 UTC
[incubator-tuweni] branch master updated: remove AtomicSlotMap
(#102)
This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/master by this push:
new 1ebd7d0 remove AtomicSlotMap (#102)
1ebd7d0 is described below
commit 1ebd7d059d4b4b2fbbc6192f7ea5e099fa799b71
Author: Antoine Toulme <at...@users.noreply.github.com>
AuthorDate: Sat Jun 27 11:39:58 2020 -0700
remove AtomicSlotMap (#102)
* remove AtomicSlotMap
* update junit, add timeouts to all tests that tend to create trouble
---
.../apache/tuweni/concurrent/AtomicSlotMap.java | 253 ---------------------
.../tuweni/concurrent/AtomicSlotMapTest.java | 130 -----------
dependency-versions.gradle | 2 +-
.../tuweni/devp2p/DiscoveryServiceJavaTest.java | 2 +
.../tuweni/devp2p/v5/NodeDiscoveryServiceTest.java | 2 +
.../apache/tuweni/devp2p/DiscoveryServiceTest.kt | 2 +
.../devp2p/v5/DefaultNodeDiscoveryServiceTest.kt | 2 +
.../org/apache/tuweni/devp2p/v5/IntegrationTest.kt | 2 +
.../devp2p/v5/internal/DefaultUdpConnectorTest.kt | 2 +
.../tuweni/devp2p/v5/topic/TopicIntegrationTest.kt | 2 +
10 files changed, 15 insertions(+), 384 deletions(-)
diff --git a/concurrent/src/main/java/org/apache/tuweni/concurrent/AtomicSlotMap.java b/concurrent/src/main/java/org/apache/tuweni/concurrent/AtomicSlotMap.java
deleted file mode 100644
index 65682bf..0000000
--- a/concurrent/src/main/java/org/apache/tuweni/concurrent/AtomicSlotMap.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.tuweni.concurrent;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
-
-import com.google.common.collect.DiscreteDomain;
-
-/**
- * An atomic map that locates available keys within a {@link DiscreteDomain}.
- *
- * <p>
- * This is an atomic map that will allocate key slots based on availability. It will attempt to keep the range compact
- * by filling slots as they become available.
- * <p>
- * This implementation should be used with small sets, as addition is an O(N) operation.
- *
- * @param <K> The type of the map keys.
- * @param <V> The type of values to store in the map.
- */
-@SuppressWarnings("rawtypes") // allow ungenerified Comparable types
-public final class AtomicSlotMap<K extends Comparable, V> {
- private final DiscreteDomain<K> domain;
- private final ConcurrentHashMap<K, Optional<V>> slots = new ConcurrentHashMap<>();
- private final AtomicInteger size = new AtomicInteger(0);
-
- /**
- * Create a slot map over the range of integers > 0.
- *
- * @param <V> The type of values to store in the map.
- * @return A new slot map.
- */
- public static <V> AtomicSlotMap<Integer, V> positiveIntegerSlots() {
- return new AtomicSlotMap<>(PositiveIntegerDomain.INSTANCE);
- }
-
- /**
- * Create a slot map over the provided domain.
- *
- * @param domain The {@link DiscreteDomain} that defines the slots to be used.
- */
- public AtomicSlotMap(DiscreteDomain<K> domain) {
- requireNonNull(domain);
- this.domain = domain;
- }
-
- /**
- * Add a value to the slot map, using the first available slot.
- *
- * @param value The value.
- * @return The slot that was used to store the value.
- */
- public K add(V value) {
- requireNonNull(value);
- K slot = domain.minValue();
- Optional<V> storedValue = Optional.of(value);
- while (slots.containsKey(slot) || slots.putIfAbsent(slot, storedValue) != null) {
- slot = domain.next(slot);
- }
- size.incrementAndGet();
- return slot;
- }
-
- /**
- * Put a value into a specific slot.
- *
- * @param slot The slot to put the value in.
- * @param value The value.
- * @return The previous value in the slot, if present.
- */
- @Nullable
- public V put(K slot, V value) {
- requireNonNull(slot);
- requireNonNull(value);
- Optional<V> previous = slots.put(slot, Optional.of(value));
- if (previous == null || !previous.isPresent()) {
- size.incrementAndGet();
- return null;
- }
- return previous.get();
- }
-
- /**
- * Find a slot and compute a value for it.
- *
- * @param fn A function to compute the value for a slot.
- * @return The slot for which the value was computed.
- */
- public K compute(Function<? super K, ? extends V> fn) {
- requireNonNull(fn);
- K slot = domain.minValue();
- // store an empty optional to prevent contention on the slot, then replace with computed value.
- Optional<V> placeholder = Optional.empty();
- while (slots.containsKey(slot) || slots.putIfAbsent(slot, placeholder) != null) {
- slot = domain.next(slot);
- }
- try {
- if (slots.replace(slot, placeholder, Optional.of(fn.apply(slot)))) {
- size.incrementAndGet();
- }
- return slot;
- } catch (Throwable ex) {
- slots.remove(slot, placeholder);
- throw ex;
- }
- }
-
- /**
- * Find a slot and compute a value for it.
- *
- * @param fn A function to compute the value for a slot.
- * @return A result that will complete with the slot for which the value was computed.
- */
- public AsyncResult<K> computeAsync(Function<? super K, AsyncResult<? extends V>> fn) {
- requireNonNull(fn);
- K slot = domain.minValue();
- // store an empty optional to prevent contention on the slot, then replace with computed value.
- Optional<V> placeholder = Optional.empty();
- while (slots.containsKey(slot) || slots.putIfAbsent(slot, placeholder) != null) {
- slot = domain.next(slot);
- }
- K finalSlot = slot;
- try {
- return fn.apply(finalSlot).thenApply(value -> {
- if (slots.replace(finalSlot, placeholder, Optional.of(value))) {
- size.incrementAndGet();
- }
- return finalSlot;
- });
- } catch (Throwable ex) {
- slots.remove(finalSlot, placeholder);
- throw ex;
- }
- }
-
- /**
- * Get the value in a slot.
- *
- * @param slot The slot.
- * @return The value, if present.
- */
- @Nullable
- public V get(K slot) {
- requireNonNull(slot);
- Optional<V> value = slots.get(slot);
- if (value == null) {
- return null;
- }
- return value.orElse(null);
- }
-
- /**
- * Remove a value from a slot, making the slot available again.
- *
- * @param slot The slot.
- * @return The value that was in the slot, if any.
- */
- @Nullable
- public V remove(K slot) {
- requireNonNull(slot);
- Optional<V> previous = slots.remove(slot);
- if (previous == null || !previous.isPresent()) {
- return null;
- }
- size.decrementAndGet();
- return previous.get();
- }
-
- /**
- * @return The number of slots filled.
- */
- public int size() {
- return size.get();
- }
-
- /**
- * @return A stream over the entries in the slot map.
- */
- public Stream<Map.Entry<K, V>> entries() {
- return slots.entrySet().stream().filter(e -> e.getValue().isPresent()).map(e -> new Map.Entry<K, V>() {
- @Override
- public K getKey() {
- return e.getKey();
- }
-
- @Override
- public V getValue() {
- return e.getValue().get();
- }
-
- @Override
- public V setValue(Object value) {
- throw new UnsupportedOperationException();
- }
- });
- }
-
- /**
- * @return A stream over the values stored in the slot map.
- */
- public Stream<V> values() {
- return slots.values().stream().filter(Optional::isPresent).map(Optional::get);
- }
-
- private static final class PositiveIntegerDomain extends DiscreteDomain<Integer> {
- private static final PositiveIntegerDomain INSTANCE = new PositiveIntegerDomain();
-
- @Override
- public Integer next(Integer value) {
- int i = value;
- return (i == Integer.MAX_VALUE) ? null : i + 1;
- }
-
- @Override
- public Integer previous(Integer value) {
- int i = value;
- return (i == 1) ? null : i - 1;
- }
-
- @Override
- public long distance(Integer start, Integer end) {
- return (long) end - start;
- }
-
- @Override
- public Integer minValue() {
- return 1;
- }
-
- @Override
- public Integer maxValue() {
- return Integer.MAX_VALUE;
- }
- }
-}
diff --git a/concurrent/src/test/java/org/apache/tuweni/concurrent/AtomicSlotMapTest.java b/concurrent/src/test/java/org/apache/tuweni/concurrent/AtomicSlotMapTest.java
deleted file mode 100644
index 4019b10..0000000
--- a/concurrent/src/test/java/org/apache/tuweni/concurrent/AtomicSlotMapTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.tuweni.concurrent;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.jupiter.api.Test;
-
-class AtomicSlotMapTest {
-
- @Test
- void shouldUseSlotsIncrementally() {
- AtomicSlotMap<Integer, String> slotMap = AtomicSlotMap.positiveIntegerSlots();
-
- assertEquals(1, (int) slotMap.add("value"));
- assertEquals(2, (int) slotMap.add("value"));
- assertEquals(3, (int) slotMap.add("value"));
- assertEquals(4, (int) slotMap.add("value"));
- }
-
- @Test
- void shouldReuseSlotsIncrementally() {
- AtomicSlotMap<Integer, String> slotMap = AtomicSlotMap.positiveIntegerSlots();
-
- assertEquals(1, (int) slotMap.add("value"));
- assertEquals(2, (int) slotMap.add("value"));
- assertEquals(3, (int) slotMap.add("value"));
- assertEquals(4, (int) slotMap.add("value"));
- slotMap.remove(2);
- slotMap.remove(4);
- assertEquals(2, (int) slotMap.add("value"));
- assertEquals(4, (int) slotMap.add("value"));
- }
-
- @Test
- void maintainsCountWhenConcurrentlyRemoving() {
- AtomicSlotMap<Integer, String> slotMap = AtomicSlotMap.positiveIntegerSlots();
- int firstSlot = slotMap.add("foo");
-
- CompletableAsyncResult<String> result = AsyncResult.incomplete();
- AtomicInteger secondSlot = new AtomicInteger();
- slotMap.computeAsync(s -> {
- secondSlot.set(s);
- return result;
- });
- assertEquals(1, slotMap.size());
-
- slotMap.remove(secondSlot.get());
- assertEquals(1, slotMap.size());
-
- result.complete("bar");
- assertEquals(1, slotMap.size());
-
- slotMap.remove(firstSlot);
- assertEquals(0, slotMap.size());
- }
-
- @Test
- void shouldNotDuplicateSlotsWhileAddingAndRemoving() throws Exception {
- AtomicSlotMap<Integer, String> slotMap = AtomicSlotMap.positiveIntegerSlots();
- Set<Integer> fastSlots = ConcurrentHashMap.newKeySet();
- Set<Integer> slowSlots = ConcurrentHashMap.newKeySet();
-
- Callable<Void> fastAdders = () -> {
- int slot = slotMap.add("a fast value");
- fastSlots.add(slot);
- return null;
- };
-
- Callable<Void> slowAdders = () -> {
- CompletableAsyncResult<String> result = AsyncResult.incomplete();
- slotMap.computeAsync(s -> result).thenAccept(slowSlots::add);
-
- Thread.sleep(10);
- result.complete("a slow value");
- return null;
- };
-
- Callable<Void> addAndRemovers = () -> {
- int slot = slotMap.add("a value");
- Thread.sleep(5);
- slotMap.remove(slot);
- return null;
- };
-
- ExecutorService fastPool = Executors.newFixedThreadPool(20);
- ExecutorService slowPool = Executors.newFixedThreadPool(20);
- ExecutorService addAndRemovePool = Executors.newFixedThreadPool(40);
- List<Future<Void>> fastFutures = fastPool.invokeAll(Collections.nCopies(1000, fastAdders));
- List<Future<Void>> slowFutures = slowPool.invokeAll(Collections.nCopies(1000, slowAdders));
- List<Future<Void>> addAndRemoveFutures = addAndRemovePool.invokeAll(Collections.nCopies(2000, addAndRemovers));
-
- for (Future<Void> future : addAndRemoveFutures) {
- future.get();
- }
- for (Future<Void> future : slowFutures) {
- future.get();
- }
- for (Future<Void> future : fastFutures) {
- future.get();
- }
-
- assertEquals(1000, fastSlots.size());
- assertEquals(1000, slowSlots.size());
- slowSlots.addAll(fastSlots);
- assertEquals(2000, slowSlots.size());
-
- assertEquals(2000, slotMap.size());
- }
-}
diff --git a/dependency-versions.gradle b/dependency-versions.gradle
index f7f75f8..e3356a7 100644
--- a/dependency-versions.gradle
+++ b/dependency-versions.gradle
@@ -43,7 +43,7 @@ dependencyManagement {
entry 'bcprov-jdk15on'
}
dependency('org.fusesource.leveldbjni:leveldbjni-all:1.8')
- dependencySet(group: 'org.junit.jupiter', version: '5.3.2') {
+ dependencySet(group: 'org.junit.jupiter', version: '5.6.2') {
entry 'junit-jupiter-api'
entry 'junit-jupiter-engine'
entry 'junit-jupiter-params'
diff --git a/devp2p/src/integrationTest/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java b/devp2p/src/integrationTest/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java
index 6dbf19b..44d447e 100644
--- a/devp2p/src/integrationTest/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java
+++ b/devp2p/src/integrationTest/java/org/apache/tuweni/devp2p/DiscoveryServiceJavaTest.java
@@ -26,8 +26,10 @@ import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
+@Timeout(10)
@ExtendWith(BouncyCastleExtension.class)
class DiscoveryServiceJavaTest {
diff --git a/devp2p/src/integrationTest/java/org/apache/tuweni/devp2p/v5/NodeDiscoveryServiceTest.java b/devp2p/src/integrationTest/java/org/apache/tuweni/devp2p/v5/NodeDiscoveryServiceTest.java
index da1c200..dea10a4 100644
--- a/devp2p/src/integrationTest/java/org/apache/tuweni/devp2p/v5/NodeDiscoveryServiceTest.java
+++ b/devp2p/src/integrationTest/java/org/apache/tuweni/devp2p/v5/NodeDiscoveryServiceTest.java
@@ -18,8 +18,10 @@ import org.apache.tuweni.junit.BouncyCastleExtension;
import java.net.InetSocketAddress;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
+@Timeout(10)
@ExtendWith(BouncyCastleExtension.class)
public class NodeDiscoveryServiceTest {
diff --git a/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt b/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt
index e958823..9f51abd 100644
--- a/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt
+++ b/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/DiscoveryServiceTest.kt
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import java.net.InetAddress
import java.net.InetSocketAddress
@@ -48,6 +49,7 @@ private suspend fun CoroutineDatagramChannel.receivePacket(): Packet {
return Packet.decodeFrom(buffer)
}
+@Timeout(10)
@ExtendWith(BouncyCastleExtension::class)
internal class DiscoveryServiceTest {
diff --git a/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt b/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
index 15d6d88..e448415 100644
--- a/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
+++ b/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/DefaultNodeDiscoveryServiceTest.kt
@@ -28,12 +28,14 @@ import org.apache.tuweni.junit.BouncyCastleExtension
import org.apache.tuweni.net.coroutines.CoroutineDatagramChannel
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import java.net.InetAddress
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.time.Instant
+@Timeout(10)
@ExtendWith(BouncyCastleExtension::class)
class DefaultNodeDiscoveryServiceTest {
diff --git a/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/IntegrationTest.kt b/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/IntegrationTest.kt
index d51be67..351cde9 100644
--- a/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/IntegrationTest.kt
+++ b/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/IntegrationTest.kt
@@ -23,7 +23,9 @@ import org.apache.tuweni.devp2p.v5.packet.PongMessage
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.Timeout
+@Timeout(10)
class IntegrationTest : AbstractIntegrationTest() {
@Test
diff --git a/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt b/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
index d538784..9ec2c7f 100644
--- a/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
+++ b/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/internal/DefaultUdpConnectorTest.kt
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.parallel.Execution
import org.junit.jupiter.api.parallel.ExecutionMode
@@ -43,6 +44,7 @@ import java.net.InetAddress
import java.net.InetSocketAddress
import java.nio.ByteBuffer
+@Timeout(10)
@Disabled
@ObsoleteCoroutinesApi
@ExtendWith(BouncyCastleExtension::class)
diff --git a/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicIntegrationTest.kt b/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicIntegrationTest.kt
index 3057166..22a7f7c 100644
--- a/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicIntegrationTest.kt
+++ b/devp2p/src/integrationTest/kotlin/org/apache/tuweni/devp2p/v5/topic/TopicIntegrationTest.kt
@@ -31,8 +31,10 @@ import org.apache.tuweni.devp2p.v5.packet.UdpMessage
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.Timeout
import java.net.InetAddress
+@Timeout(10)
class TopicIntegrationTest : AbstractIntegrationTest() {
@Disabled("Blocks testing")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@tuweni.apache.org
For additional commands, e-mail: commits-help@tuweni.apache.org