You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2021/11/22 13:48:01 UTC

[GitHub] [cassandra] blambov commented on a change in pull request #1331: CASSANDRA-17071-trunk: Refactor keyspaces management in Schema

blambov commented on a change in pull request #1331:
URL: https://github.com/apache/cassandra/pull/1331#discussion_r754273358



##########
File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}.
+ * <p>
+ * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like
+ * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)}
+ * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline
+ * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help
+ * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such
+ * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap}
+ * does not guarantee at-most-once sematnics of running the mapping function for a single key.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>>
+{
+
+    /**
+     * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value
+     * if missing. It returns {@code null} regardless the value is missing or failed to initialize.
+     */
+    public V blockingGet(K key)
+    {
+        Future<V> future = get(key);
+        if (future != null && future.isDone())
+            return future.awaitUninterruptibly().getNow();
+        else
+            return null;
+    }
+
+    /**
+     * If the value for the given key is missing, execute a load function to obtain a value and put it into the map.
+     * It is guaranteed that the load function will be executed only once when the key is missing and mulitple threads
+     * called this method for the same key.
+     * <p>
+     * When the mapping function returns {@code null}, {@link NullPointerException} is thrown. When the mapping function
+     * throws exception, it is rethrown by this method. In both cases nothing gets added to the map.
+     */
+    public V blockingLoadIfAbsent(K key, Supplier<? extends V> loadFunction) throws RuntimeException
+    {
+        while (true)
+        {
+            Future<V> future = get(key);
+            AsyncPromise<V> newEntry = null;
+            if (future == null)
+            {
+                newEntry = new AsyncPromise<>();
+                future = putIfAbsent(key, newEntry);
+                if (future == null)
+                {
+                    // We managed to create an entry for the value. Now initialize it.
+                    future = newEntry;
+                    try
+                    {
+                        V v = loadFunction.get();
+                        if (v == null)
+                        {
+                            newEntry.setFailure(new NullPointerException("The mapping function returned null"));
+                            remove(key, future);
+                        }
+                        else
+                        {
+                            newEntry.setSuccess(v);
+                        }
+                    }
+                    catch (Throwable t)
+                    {
+                        newEntry.setFailure(t);
+                        // Remove future so that construction can be retried later
+                        remove(key, future);
+                    }
+                }
+                else
+                {
+                    newEntry = null;
+                }
+
+                // Else some other thread beat us to it, but we now have the reference to the future which we can wait for.
+            }
+
+            try
+            {
+                future.syncUninterruptibly();
+            }
+            catch (Throwable t)
+            {
+                // if blockingUnloadIfPresent was called in the meantime, we simply retry hoping that unloading gets
+                // finished soon
+                // also we retry if the concurrent attempt to load entry failed (but we do not retry if this attempt
+                // failed)
+                if (newEntry == null || Throwables.isCausedBy(t, ex -> ex instanceof KeyNotFoundException))
+                {
+                    Thread.yield();
+                    continue;
+                }
+            }
+
+            future.rethrowIfFailed();
+            return future.getNow();
+        }
+    }
+
+    /**
+     * If a value for the given key is present, unload function is run and the value is removed from the map.
+     * Similarly to {@link #blockingLoadIfAbsent(Object, Supplier)} at-most-once semantics is guaranteed for unload
+     * function.
+     * <p>
+     * When unload function fails, the value is removed from the map anyway and the failure is rethrown.
+     * <p>
+     * When the key was not found, the method returns {@code null}.
+     *
+     * @throws UnloadExecutionException when the unloading failed to complete - this is checked exception because
+     *                                  the value is removed from the map regardless of the result of unloading; therefore if the unloading failed, the
+     *                                  called is responsible for handling that; the {@link UnloadExecutionException} encapsulates the value which was
+     *                                  failed to unload.
+     */
+    public V blockingUnloadIfPresent(K key, Consumer<? super V> unloadFunction) throws UnloadExecutionException
+    {
+        Promise<V> droppedFuture = new AsyncPromise<V>().setFailure(new KeyNotFoundException());
+
+        Future<V> existingFuture;
+        do
+        {
+            existingFuture = get(key);
+            if (existingFuture == null || existingFuture.cause() != null)
+                return null;
+        } while (!replace(key, existingFuture, droppedFuture));
+
+        V v = existingFuture.awaitUninterruptibly().getNow();
+        if (v == null)
+        {
+            // which means that either the value failed to load or a concurrent attempt to unload already did the work
+            return null;
+        }
+
+        try
+        {
+            unloadFunction.accept(v);
+            return v;
+        }
+        catch (Throwable t)
+        {
+            throw new UnloadExecutionException(v, t);
+        }
+        finally
+        {
+            Future<V> future = remove(key);
+            assert future == droppedFuture;
+        }
+    }
+
+    private static class KeyNotFoundException extends IllegalStateException

Review comment:
       I would name this `ConcurrentUnloadException`. Since you are catching and processing this in the loading method, though, it should not be necessary -- a completed `null` future would work just as well with slightly simpler code.

##########
File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}.
+ * <p>
+ * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like
+ * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)}
+ * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline
+ * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help
+ * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such
+ * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap}
+ * does not guarantee at-most-once sematnics of running the mapping function for a single key.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>>
+{
+
+    /**
+     * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value
+     * if missing. It returns {@code null} regardless the value is missing or failed to initialize.
+     */
+    public V blockingGet(K key)
+    {
+        Future<V> future = get(key);
+        if (future != null && future.isDone())
+            return future.awaitUninterruptibly().getNow();
+        else
+            return null;
+    }
+
+    /**
+     * If the value for the given key is missing, execute a load function to obtain a value and put it into the map.
+     * It is guaranteed that the load function will be executed only once when the key is missing and mulitple threads
+     * called this method for the same key.
+     * <p>
+     * When the mapping function returns {@code null}, {@link NullPointerException} is thrown. When the mapping function
+     * throws exception, it is rethrown by this method. In both cases nothing gets added to the map.
+     */
+    public V blockingLoadIfAbsent(K key, Supplier<? extends V> loadFunction) throws RuntimeException
+    {
+        while (true)
+        {
+            Future<V> future = get(key);
+            AsyncPromise<V> newEntry = null;
+            if (future == null)
+            {
+                newEntry = new AsyncPromise<>();
+                future = putIfAbsent(key, newEntry);
+                if (future == null)
+                {
+                    // We managed to create an entry for the value. Now initialize it.
+                    future = newEntry;
+                    try
+                    {
+                        V v = loadFunction.get();
+                        if (v == null)
+                        {
+                            newEntry.setFailure(new NullPointerException("The mapping function returned null"));
+                            remove(key, future);
+                        }
+                        else
+                        {
+                            newEntry.setSuccess(v);
+                        }
+                    }
+                    catch (Throwable t)
+                    {
+                        newEntry.setFailure(t);
+                        // Remove future so that construction can be retried later
+                        remove(key, future);
+                    }
+                }
+                else
+                {
+                    newEntry = null;
+                }
+
+                // Else some other thread beat us to it, but we now have the reference to the future which we can wait for.
+            }
+
+            try
+            {
+                future.syncUninterruptibly();
+            }
+            catch (Throwable t)
+            {
+                // if blockingUnloadIfPresent was called in the meantime, we simply retry hoping that unloading gets
+                // finished soon
+                // also we retry if the concurrent attempt to load entry failed (but we do not retry if this attempt
+                // failed)
+                if (newEntry == null || Throwables.isCausedBy(t, ex -> ex instanceof KeyNotFoundException))
+                {
+                    Thread.yield();
+                    continue;
+                }
+            }
+
+            future.rethrowIfFailed();
+            return future.getNow();
+        }
+    }
+
+    /**
+     * If a value for the given key is present, unload function is run and the value is removed from the map.
+     * Similarly to {@link #blockingLoadIfAbsent(Object, Supplier)} at-most-once semantics is guaranteed for unload
+     * function.
+     * <p>
+     * When unload function fails, the value is removed from the map anyway and the failure is rethrown.
+     * <p>
+     * When the key was not found, the method returns {@code null}.
+     *
+     * @throws UnloadExecutionException when the unloading failed to complete - this is checked exception because
+     *                                  the value is removed from the map regardless of the result of unloading; therefore if the unloading failed, the
+     *                                  called is responsible for handling that; the {@link UnloadExecutionException} encapsulates the value which was
+     *                                  failed to unload.
+     */
+    public V blockingUnloadIfPresent(K key, Consumer<? super V> unloadFunction) throws UnloadExecutionException
+    {
+        Promise<V> droppedFuture = new AsyncPromise<V>().setFailure(new KeyNotFoundException());
+
+        Future<V> existingFuture;
+        do
+        {
+            existingFuture = get(key);
+            if (existingFuture == null || existingFuture.cause() != null)
+                return null;
+        } while (!replace(key, existingFuture, droppedFuture));
+
+        V v = existingFuture.awaitUninterruptibly().getNow();
+        if (v == null)
+        {
+            // which means that either the value failed to load or a concurrent attempt to unload already did the work
+            return null;

Review comment:
       At this point we have replaced the entry with our `droppedFuture`, which will no longer be removed from the map.

##########
File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}.
+ * <p>
+ * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like
+ * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)}
+ * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline
+ * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help
+ * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such
+ * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap}
+ * does not guarantee at-most-once sematnics of running the mapping function for a single key.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>>

Review comment:
       This is not a very good name, especially if what we are doing is adding blocking methods that are the only way this class is meant to be used. It doesn't seem to make much sense to extend from the map either (yes, there's a performance hit, but this is not that hot a path).

##########
File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}.
+ * <p>
+ * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like
+ * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)}
+ * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline
+ * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help
+ * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such
+ * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap}
+ * does not guarantee at-most-once sematnics of running the mapping function for a single key.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>>
+{
+
+    /**
+     * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value
+     * if missing. It returns {@code null} regardless the value is missing or failed to initialize.
+     */
+    public V blockingGet(K key)
+    {
+        Future<V> future = get(key);
+        if (future != null && future.isDone())
+            return future.awaitUninterruptibly().getNow();
+        else
+            return null;
+    }
+
+    /**
+     * If the value for the given key is missing, execute a load function to obtain a value and put it into the map.
+     * It is guaranteed that the load function will be executed only once when the key is missing and mulitple threads
+     * called this method for the same key.
+     * <p>
+     * When the mapping function returns {@code null}, {@link NullPointerException} is thrown. When the mapping function
+     * throws exception, it is rethrown by this method. In both cases nothing gets added to the map.
+     */
+    public V blockingLoadIfAbsent(K key, Supplier<? extends V> loadFunction) throws RuntimeException
+    {
+        while (true)
+        {
+            Future<V> future = get(key);
+            AsyncPromise<V> newEntry = null;
+            if (future == null)
+            {
+                newEntry = new AsyncPromise<>();
+                future = putIfAbsent(key, newEntry);
+                if (future == null)
+                {
+                    // We managed to create an entry for the value. Now initialize it.
+                    future = newEntry;
+                    try
+                    {
+                        V v = loadFunction.get();
+                        if (v == null)
+                        {
+                            newEntry.setFailure(new NullPointerException("The mapping function returned null"));

Review comment:
       I would just throw here and let the exception handling deal with removal and passing on the failure.

##########
File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}.
+ * <p>
+ * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like
+ * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)}
+ * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline
+ * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help
+ * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such
+ * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap}
+ * does not guarantee at-most-once sematnics of running the mapping function for a single key.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>>
+{
+
+    /**
+     * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value
+     * if missing. It returns {@code null} regardless the value is missing or failed to initialize.
+     */
+    public V blockingGet(K key)
+    {
+        Future<V> future = get(key);
+        if (future != null && future.isDone())
+            return future.awaitUninterruptibly().getNow();

Review comment:
       Isn't the await superfluous?

##########
File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}.
+ * <p>
+ * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like
+ * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)}
+ * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline
+ * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help
+ * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such
+ * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap}
+ * does not guarantee at-most-once sematnics of running the mapping function for a single key.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>>
+{
+
+    /**
+     * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value
+     * if missing. It returns {@code null} regardless the value is missing or failed to initialize.
+     */
+    public V blockingGet(K key)

Review comment:
       This is not really a blocking method. Perhaps `getIfPresent()`?

##########
File path: src/java/org/apache/cassandra/utils/concurrent/ExtendedNonBlockingHashMap.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+import org.apache.cassandra.utils.Throwables;
+
+/**
+ * An extension of {@link NonBlockingHashMap} where all values are wrapped by {@link Future}.
+ * <p>
+ * The main purpose of this class is to provide the functionality of concurrent hash map which may perform operations like
+ * {@link ConcurrentHashMap#computeIfAbsent(Object, Function)} and {@link ConcurrentHashMap#computeIfPresent(Object, BiFunction)}
+ * with synchronization scope reduced to the single key - that is, when dealing with a single key, unline
+ * {@link ConcurrentHashMap} we do not lock the whole map for the time the mapping function is running. This may help
+ * to avoid the case when we want to load/unload a value for a key K1 while loading/unloading a value for a key K2. Such
+ * scenario is forbidden in case of {@link ConcurrentHashMap} and leads to a deadlock. On the other hand, {@link NonBlockingHashMap}
+ * does not guarantee at-most-once sematnics of running the mapping function for a single key.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class ExtendedNonBlockingHashMap<K, V> extends NonBlockingHashMap<K, Future<V>>
+{
+
+    /**
+     * Get a value for a given key, waiting for initialization if necessary. This method does not initialize the value
+     * if missing. It returns {@code null} regardless the value is missing or failed to initialize.
+     */
+    public V blockingGet(K key)
+    {
+        Future<V> future = get(key);
+        if (future != null && future.isDone())
+            return future.awaitUninterruptibly().getNow();
+        else
+            return null;
+    }
+
+    /**
+     * If the value for the given key is missing, execute a load function to obtain a value and put it into the map.
+     * It is guaranteed that the load function will be executed only once when the key is missing and mulitple threads
+     * called this method for the same key.
+     * <p>
+     * When the mapping function returns {@code null}, {@link NullPointerException} is thrown. When the mapping function
+     * throws exception, it is rethrown by this method. In both cases nothing gets added to the map.
+     */
+    public V blockingLoadIfAbsent(K key, Supplier<? extends V> loadFunction) throws RuntimeException
+    {
+        while (true)
+        {
+            Future<V> future = get(key);
+            AsyncPromise<V> newEntry = null;
+            if (future == null)
+            {
+                newEntry = new AsyncPromise<>();
+                future = putIfAbsent(key, newEntry);
+                if (future == null)
+                {
+                    // We managed to create an entry for the value. Now initialize it.
+                    future = newEntry;
+                    try
+                    {
+                        V v = loadFunction.get();
+                        if (v == null)
+                        {
+                            newEntry.setFailure(new NullPointerException("The mapping function returned null"));
+                            remove(key, future);
+                        }
+                        else
+                        {
+                            newEntry.setSuccess(v);
+                        }
+                    }
+                    catch (Throwable t)
+                    {
+                        newEntry.setFailure(t);
+                        // Remove future so that construction can be retried later
+                        remove(key, future);
+                    }
+                }
+                else
+                {
+                    newEntry = null;
+                }
+
+                // Else some other thread beat us to it, but we now have the reference to the future which we can wait for.
+            }
+
+            try
+            {
+                future.syncUninterruptibly();
+            }
+            catch (Throwable t)
+            {
+                // if blockingUnloadIfPresent was called in the meantime, we simply retry hoping that unloading gets
+                // finished soon
+                // also we retry if the concurrent attempt to load entry failed (but we do not retry if this attempt
+                // failed)
+                if (newEntry == null || Throwables.isCausedBy(t, ex -> ex instanceof KeyNotFoundException))
+                {
+                    Thread.yield();
+                    continue;
+                }
+            }
+
+            future.rethrowIfFailed();

Review comment:
       Why not do this by calling `throw t` at the end of the `catch` case above?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org