You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aashutosh Swarnakar <se...@gmail.com> on 2021/03/29 12:59:17 UTC

Flink State Query Server threads stuck in infinite loop with high GC activity on CopyOnWriteStateMap get

Hi Folks,



I've recently started using Flink for a pilot project where I need to
aggregate event counts on per minute window basis. The state has been made
queryable so that external services can query the state via Flink State
Query API. I am using memory state backend with a keyed process function
and map state.



I've a simple job running on a 6 node flink standalone cluster. 1 job
manager and 5 task managers. External services can query the 5 task manager
nodes for flink state.



The job operates fine whenever external clients are not querying flink
state but once the external clients start quering the flink state via flink
queryable client, I observe that flink query server threads and the
aggregate task thread gets stuck into an infinite loop in
CopyOnWriteStateMap.get() method. Also the GC activity peaks to 100% along
with 100% CPU usage. The task manager nodes are unable to recover from this
situation and I have to restart the cluster. Let me know if anybody has
faced this issue before.



Any information with regards to below queries will be very helpful.



1. Is this a thread synchronisation issue ?

2. Is CopyOnWriteStateMap class thread safe ?

3. Is there a possibility for any race conditions when incremental
rehashing is done for CopyOnWriteStateMap ?

4. Can this be an issue with state usage in my job implementation (I am
doing a get and put on map state for processing each element in the stream)
?





I have added the thread dump below along with the code snippet where the
threads go into infinite loop.



Task thread:



                "aggregates-stream -> Map -> Sink: Cassandra Sink (2/10)#0"
- Thread t@76

                   java.lang.Thread.State: RUNNABLE

                        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:275)

                        at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:262)

                        at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:136)

                        at
org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:86)

                        at
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)

                        at
com.cybersource.risk.operator.ProcessAggregatesFunction.processElement(ProcessAggregatesFunction.java:44)

                        at
com.cybersource.risk.operator.ProcessAggregatesFunction.processElement(ProcessAggregatesFunction.java:20)

                        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)

                        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)

                        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)

                        at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)

                        at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

                        at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)

                        at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$202/2001022910.runDefaultAction(Unknown
Source)

                        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)

                        at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)

                        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)

                        at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)

                        at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

                        at java.lang.Thread.run(Thread.java:748)



                   Locked ownable synchronizers:

                        - None



Flink State Query Server Threads:



                "Flink Queryable State Server Thread 3" - Thread t@136

                   java.lang.Thread.State: RUNNABLE

                        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.incrementalRehash(CopyOnWriteStateMap.java:680)

                        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateMap.java:645)

                        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:270)

                        at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:262)

                        at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:222)

                        at
org.apache.flink.runtime.state.heap.HeapMapState.getSerializedValue(HeapMapState.java:188)

                        at
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:115)

                        at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:88)

                        at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)

                        at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:258)

                        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

                        at
java.util.concurrent.FutureTask.run(FutureTask.java:266)

                        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

                        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

                        at java.lang.Thread.run(Thread.java:748)



                   Locked ownable synchronizers:

                        - locked <32ba4c7d> (a
java.util.concurrent.ThreadPoolExecutor$Worker)





                "Flink Queryable State Server Thread 1" - Thread t@130

                   java.lang.Thread.State: RUNNABLE

                        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:275)

                        at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:262)

                        at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:222)

                        at
org.apache.flink.runtime.state.heap.HeapMapState.getSerializedValue(HeapMapState.java:188)

                        at
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:115)

                        at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:88)

                        at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)

                        at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:258)

                        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

                        at
java.util.concurrent.FutureTask.run(FutureTask.java:266)

                        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

                        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

                        at java.lang.Thread.run(Thread.java:748)



                   Locked ownable synchronizers:

                        - locked <78af0f0d> (a
java.util.concurrent.ThreadPoolExecutor$Worker)



                "Flink Queryable State Server Thread 0" - Thread t@129

                   java.lang.Thread.State: RUNNABLE

                        at
org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:275)

                        at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:262)

                        at
org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:222)

                        at
org.apache.flink.runtime.state.heap.HeapMapState.getSerializedValue(HeapMapState.java:188)

                        at
org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:115)

                        at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:88)

                        at
org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)

                        at
org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:258)

                        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

                        at
java.util.concurrent.FutureTask.run(FutureTask.java:266)

                        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

                        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

                        at java.lang.Thread.run(Thread.java:748)



                   Locked ownable synchronizers:

                        - locked <139b2e11> (a
java.util.concurrent.ThreadPoolExecutor$Worker)







Code snippet where threads get stuck (CopyOnWriteStateMap.java):



// For loop in get method



                public S get(K key, N namespace) {



        final int hash = computeHashForOperationAndDoIncrementalRehash(key,
namespace);

        final int requiredVersion = highestRequiredSnapshotVersion;

        final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);

        int index = hash & (tab.length - 1);



        for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next) {

            final K eKey = e.key;

            final N eNamespace = e.namespace;

            if ((e.hash == hash && key.equals(eKey) &&
namespace.equals(eNamespace))) {



                // copy-on-write check for state

                if (e.stateVersion < requiredVersion) {

                    // copy-on-write check for entry

                    if (e.entryVersion < requiredVersion) {

                        e = handleChainedEntryCopyOnWrite(tab, hash &
(tab.length - 1), e);

                    }

                    e.stateVersion = stateMapVersion;

                    e.state = getStateSerializer().copy(e.state);

                }



                return e.state;

            }

        }



        return null;

    }





// "while (e != null) {" loop in incremental rehash method



                private void incrementalRehash() {



                        StateMapEntry<K, N, S>[] oldMap = primaryTable;

                        StateMapEntry<K, N, S>[] newMap =
incrementalRehashTable;



                        int oldCapacity = oldMap.length;

                        int newMask = newMap.length - 1;

                        int requiredVersion =
highestRequiredSnapshotVersion;

                        int rhIdx = rehashIndex;

                        int transferred = 0;



                        // we migrate a certain minimum amount of entries
from the old to the new table

                        while (transferred <
MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {



                            StateMapEntry<K, N, S> e = oldMap[rhIdx];



                            while (e != null) {

                                // copy-on-write check for entry

                                if (e.entryVersion < requiredVersion) {

                                    e = new StateMapEntry<>(e,
stateMapVersion);

                                }

                                StateMapEntry<K, N, S> n = e.next;

                                int pos = e.hash & newMask;

                                e.next = newMap[pos];

                                newMap[pos] = e;

                                e = n;

                                ++transferred;

                            }



                            oldMap[rhIdx] = null;

                            if (++rhIdx == oldCapacity) {

                                // here, the rehash is complete and we
release resources and reset fields

                                primaryTable = newMap;

                                incrementalRehashTable = (StateMapEntry<K,
N, S>[]) EMPTY_TABLE;

                                primaryTableSize +=
incrementalRehashTableSize;

                                incrementalRehashTableSize = 0;

                                rehashIndex = 0;

                                return;

                            }

                        }



                        // sync our local bookkeeping the with official
bookkeeping fields

                        primaryTableSize -= transferred;

                        incrementalRehashTableSize += transferred;

                        rehashIndex = rhIdx;

                    }



Any help on this issue is highly appreciated.


Thanks,
Aashutosh

Re: Flink State Query Server threads stuck in infinite loop with high GC activity on CopyOnWriteStateMap get

Posted by Till Rohrmann <tr...@apache.org>.
Hi Aashutosh,

The queryable state feature is no longer actively maintained by the
community. What I would recommend is to output the aggregate counts via a
sink to some key value store which you query to obtain the results.

Looking at the implementation of CopyOnWriteStateMap, it does not look like
that this class is supposed to be accessed concurrently. I suspect that
this is the cause for the infinite loop you are seeing. I think the problem
was that this class was implemented after the development of queryable
state had been stopped. Sorry for the inconveniences.

I also pulled in the author of the CopyOnWriteStateMap PengFei Li who might
give more details.

Cheers,
Till

On Mon, Mar 29, 2021 at 2:59 PM Aashutosh Swarnakar <se...@gmail.com>
wrote:

> Hi Folks,
>
>
>
> I've recently started using Flink for a pilot project where I need to
> aggregate event counts on per minute window basis. The state has been made
> queryable so that external services can query the state via Flink State
> Query API. I am using memory state backend with a keyed process function
> and map state.
>
>
>
> I've a simple job running on a 6 node flink standalone cluster. 1 job
> manager and 5 task managers. External services can query the 5 task manager
> nodes for flink state.
>
>
>
> The job operates fine whenever external clients are not querying flink
> state but once the external clients start quering the flink state via flink
> queryable client, I observe that flink query server threads and the
> aggregate task thread gets stuck into an infinite loop in
> CopyOnWriteStateMap.get() method. Also the GC activity peaks to 100% along
> with 100% CPU usage. The task manager nodes are unable to recover from this
> situation and I have to restart the cluster. Let me know if anybody has
> faced this issue before.
>
>
>
> Any information with regards to below queries will be very helpful.
>
>
>
> 1. Is this a thread synchronisation issue ?
>
> 2. Is CopyOnWriteStateMap class thread safe ?
>
> 3. Is there a possibility for any race conditions when incremental
> rehashing is done for CopyOnWriteStateMap ?
>
> 4. Can this be an issue with state usage in my job implementation (I am
> doing a get and put on map state for processing each element in the stream)
> ?
>
>
>
>
>
> I have added the thread dump below along with the code snippet where the
> threads go into infinite loop.
>
>
>
> Task thread:
>
>
>
>                 "aggregates-stream -> Map -> Sink: Cassandra Sink
> (2/10)#0" - Thread t@76
>
>                    java.lang.Thread.State: RUNNABLE
>
>                         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:275)
>
>                         at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:262)
>
>                         at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:136)
>
>                         at
> org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:86)
>
>                         at
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>
>                         at
> com.cybersource.risk.operator.ProcessAggregatesFunction.processElement(ProcessAggregatesFunction.java:44)
>
>                         at
> com.cybersource.risk.operator.ProcessAggregatesFunction.processElement(ProcessAggregatesFunction.java:20)
>
>                         at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>
>                         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
>
>                         at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>
>                         at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>
>                         at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>
>                         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
>
>                         at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$202/2001022910.runDefaultAction(Unknown
> Source)
>
>                         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>
>                         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>
>                         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>
>                         at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>
>                         at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>
>                         at java.lang.Thread.run(Thread.java:748)
>
>
>
>                    Locked ownable synchronizers:
>
>                         - None
>
>
>
> Flink State Query Server Threads:
>
>
>
>                 "Flink Queryable State Server Thread 3" - Thread t@136
>
>                    java.lang.Thread.State: RUNNABLE
>
>                         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.incrementalRehash(CopyOnWriteStateMap.java:680)
>
>                         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.computeHashForOperationAndDoIncrementalRehash(CopyOnWriteStateMap.java:645)
>
>                         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:270)
>
>                         at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:262)
>
>                         at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:222)
>
>                         at
> org.apache.flink.runtime.state.heap.HeapMapState.getSerializedValue(HeapMapState.java:188)
>
>                         at
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:115)
>
>                         at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:88)
>
>                         at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
>
>                         at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:258)
>
>                         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>                         at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>                         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>                         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>                         at java.lang.Thread.run(Thread.java:748)
>
>
>
>                    Locked ownable synchronizers:
>
>                         - locked <32ba4c7d> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
>
>
>
>
>
>                 "Flink Queryable State Server Thread 1" - Thread t@130
>
>                    java.lang.Thread.State: RUNNABLE
>
>                         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:275)
>
>                         at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:262)
>
>                         at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:222)
>
>                         at
> org.apache.flink.runtime.state.heap.HeapMapState.getSerializedValue(HeapMapState.java:188)
>
>                         at
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:115)
>
>                         at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:88)
>
>                         at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
>
>                         at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:258)
>
>                         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>                         at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>                         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>                         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>                         at java.lang.Thread.run(Thread.java:748)
>
>
>
>                    Locked ownable synchronizers:
>
>                         - locked <78af0f0d> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
>
>
>
>                 "Flink Queryable State Server Thread 0" - Thread t@129
>
>                    java.lang.Thread.State: RUNNABLE
>
>                         at
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:275)
>
>                         at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:262)
>
>                         at
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:222)
>
>                         at
> org.apache.flink.runtime.state.heap.HeapMapState.getSerializedValue(HeapMapState.java:188)
>
>                         at
> org.apache.flink.queryablestate.server.KvStateServerHandler.getSerializedValue(KvStateServerHandler.java:115)
>
>                         at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:88)
>
>                         at
> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:48)
>
>                         at
> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:258)
>
>                         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>                         at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>                         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>                         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>                         at java.lang.Thread.run(Thread.java:748)
>
>
>
>                    Locked ownable synchronizers:
>
>                         - locked <139b2e11> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
>
>
>
>
>
>
>
> Code snippet where threads get stuck (CopyOnWriteStateMap.java):
>
>
>
> // For loop in get method
>
>
>
>                 public S get(K key, N namespace) {
>
>
>
>         final int hash =
> computeHashForOperationAndDoIncrementalRehash(key, namespace);
>
>         final int requiredVersion = highestRequiredSnapshotVersion;
>
>         final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
>
>         int index = hash & (tab.length - 1);
>
>
>
>         for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next)
> {
>
>             final K eKey = e.key;
>
>             final N eNamespace = e.namespace;
>
>             if ((e.hash == hash && key.equals(eKey) &&
> namespace.equals(eNamespace))) {
>
>
>
>                 // copy-on-write check for state
>
>                 if (e.stateVersion < requiredVersion) {
>
>                     // copy-on-write check for entry
>
>                     if (e.entryVersion < requiredVersion) {
>
>                         e = handleChainedEntryCopyOnWrite(tab, hash &
> (tab.length - 1), e);
>
>                     }
>
>                     e.stateVersion = stateMapVersion;
>
>                     e.state = getStateSerializer().copy(e.state);
>
>                 }
>
>
>
>                 return e.state;
>
>             }
>
>         }
>
>
>
>         return null;
>
>     }
>
>
>
>
>
> // "while (e != null) {" loop in incremental rehash method
>
>
>
>                 private void incrementalRehash() {
>
>
>
>                         StateMapEntry<K, N, S>[] oldMap = primaryTable;
>
>                         StateMapEntry<K, N, S>[] newMap =
> incrementalRehashTable;
>
>
>
>                         int oldCapacity = oldMap.length;
>
>                         int newMask = newMap.length - 1;
>
>                         int requiredVersion =
> highestRequiredSnapshotVersion;
>
>                         int rhIdx = rehashIndex;
>
>                         int transferred = 0;
>
>
>
>                         // we migrate a certain minimum amount of entries
> from the old to the new table
>
>                         while (transferred <
> MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {
>
>
>
>                             StateMapEntry<K, N, S> e = oldMap[rhIdx];
>
>
>
>                             while (e != null) {
>
>                                 // copy-on-write check for entry
>
>                                 if (e.entryVersion < requiredVersion) {
>
>                                     e = new StateMapEntry<>(e,
> stateMapVersion);
>
>                                 }
>
>                                 StateMapEntry<K, N, S> n = e.next;
>
>                                 int pos = e.hash & newMask;
>
>                                 e.next = newMap[pos];
>
>                                 newMap[pos] = e;
>
>                                 e = n;
>
>                                 ++transferred;
>
>                             }
>
>
>
>                             oldMap[rhIdx] = null;
>
>                             if (++rhIdx == oldCapacity) {
>
>                                 // here, the rehash is complete and we
> release resources and reset fields
>
>                                 primaryTable = newMap;
>
>                                 incrementalRehashTable = (StateMapEntry<K,
> N, S>[]) EMPTY_TABLE;
>
>                                 primaryTableSize +=
> incrementalRehashTableSize;
>
>                                 incrementalRehashTableSize = 0;
>
>                                 rehashIndex = 0;
>
>                                 return;
>
>                             }
>
>                         }
>
>
>
>                         // sync our local bookkeeping the with official
> bookkeeping fields
>
>                         primaryTableSize -= transferred;
>
>                         incrementalRehashTableSize += transferred;
>
>                         rehashIndex = rhIdx;
>
>                     }
>
>
>
> Any help on this issue is highly appreciated.
>
>
> Thanks,
> Aashutosh
>
>
>