You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/06/29 08:54:47 UTC

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/6228

    [FLINK-9491] Implement timer data structure based on RocksDB

    ## What is the purpose of the change
    
    This PR is another step towards integrating the timer state with the keyed state backends.
    
    First, the PR generalizes the data structure `InternalTimerHeap` to `InternalPriorityQueue` so that the functionality of a heap-set-organized state is decoupled from storing timers. The main reason for this is that state/backend related code lives in flink-runtime and timers are a concept from flink-streaming.
    
    Second, the PR also introduced an implementation of `InternalPriorityQueue` with set semantics (i.e. the data structure we require to manage timers) that is based on RocksDB. State in RocksDB is always partitioned into key-groups, so the general idea is to organize the implementation as a heap-of-heaps, where each sub-heap represents elements from exactly one key-group, that merges by priority over the key-group boundaries. The implementation reuses the in-memory implementation of `InternalPriorityQueue` (without set-properties) as the super-heap that holds the sub-heaps. Further more each sub-heap is an instance of `CachingInternalPriorityQueueSet`, consisting of a "fast", "small" cache (`OrderedSetCache`) and a "slow", "unbounded" store (`OrderedSetStore`), currently applying simple write-through synchronization between cache and store. In the current implementation, the cache is based on a an AVL-Tree and restricted in capacity. The store is backed by a RocksDB column family. 
 We utilize caching to reduced read-accesses to RocksDB.
    
    Please note that the RocksDB implementation is currently not yet integrated with the timer service or the backend. This will happen in the next steps.
    
    ## Brief change log
    
    - Refactored `InternalTimerHeap` to decouple it from timers, moved the data structures from flink-streaming to flink-runtime (-> `InternalPriorityQueue`).
    - Split the data-structure into a hierarchy, a heap without set-semantics (`HeapPriorityQueue`) and a heap extended with set-semantics (`HeapPriorityQueueSet`).
    - Introduced an implementation of RocksDB-based `InternalPriorityQueue` with set-semantics. Starting point is `KeyGroupPartitionedPriorityQueue`. This class uses a `HeapPriorityQueue` of `CachingInternalPriorityQueueSet` elements that each contains elements for exactly one key-group (heap-of-heaps). For RocksDB, we configure each `CachingInternalPriorityQueueSet` to use a `TreeOrderedSetCache` and a `RocksDBOrderedStore`.
    
    
    ## Verifying this change
    
    I added dedicated tests for all data structures.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes, fastutil)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink heapAbstractionsRocks

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6228.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6228
    
----
commit b3261a15bdf15207f50e2832a048fb3c84b8f642
Author: Stefan Richter <s....@...>
Date:   2018-06-19T08:01:30Z

    Introduce MAX_ARRAY_SIZE as general constant

commit dd64cbb7eb15317a4dc8f0626c50dffd58e6b5f9
Author: Stefan Richter <s....@...>
Date:   2018-06-18T12:38:01Z

    Generalization of timer queue to a queue(set) that is no longer coupled to timers and implementation for RocksDB

----


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199816139
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java ---
    @@ -0,0 +1,280 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.runtime.state.KeyExtractorFunction;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.IOUtils;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +
    +/**
    + * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
    + * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
    + * semantics.
    + *
    + * @param <T> the type of elements in the queue.
    + * @param <PQ> type type of sub-queue used for each key-group partition.
    + */
    +public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
    +	implements InternalPriorityQueue<T> {
    +
    +	/** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
    +	@Nonnull
    +	private final HeapPriorityQueue<PQ> keyGroupHeap;
    +
    +	/** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
    +	@Nonnull
    +	private final PQ[] keyGroupLists;
    +
    +	/** Function to extract the key from contained elements. */
    +	@Nonnull
    +	private final KeyExtractorFunction<T> keyExtractor;
    +
    +	/** The total number of key-groups (in the job). */
    +	@Nonnegative
    +	private final int totalKeyGroups;
    +
    +	/** The smallest key-group id with a subpartition managed by this ordered set. */
    +	@Nonnegative
    +	private final int firstKeyGroup;
    +
    +	@SuppressWarnings("unchecked")
    +	public KeyGroupPartitionedPriorityQueue(
    +		@Nonnull KeyExtractorFunction<T> keyExtractor,
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
    +		@Nonnull KeyGroupRange keyGroupRange,
    +		@Nonnegative int totalKeyGroups) {
    +
    +		this.keyExtractor = keyExtractor;
    +		this.totalKeyGroups = totalKeyGroups;
    +		this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
    +		this.keyGroupLists = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
    +		this.keyGroupHeap = new HeapPriorityQueue<>(
    +			new InternalPriorityQueueComparator<>(elementComparator),
    +			keyGroupRange.getNumberOfKeyGroups());
    +		for (int i = 0; i < keyGroupLists.length; i++) {
    +			final PQ keyGroupCache =
    +				orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementComparator);
    +			keyGroupLists[i] = keyGroupCache;
    +			keyGroupHeap.add(keyGroupCache);
    +		}
    +	}
    +
    +	@Nullable
    +	@Override
    +	public T poll() {
    +		final PQ headList = keyGroupHeap.peek();
    +		final T head = headList.poll();
    +		keyGroupHeap.adjustModifiedElement(headList);
    +		return head;
    +	}
    +
    +	@Nullable
    +	@Override
    +	public T peek() {
    +		return keyGroupHeap.peek().peek();
    +	}
    +
    +	@Override
    +	public boolean add(@Nonnull T toAdd) {
    +		final PQ list = getListForElementKeyGroup(toAdd);
    +
    +		// the branch checks if the head element has (potentially) changed.
    +		if (list.add(toAdd)) {
    +			keyGroupHeap.adjustModifiedElement(list);
    +			// could we have a new head?
    +			return toAdd.equals(peek());
    +		} else {
    +			// head unchanged
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public boolean remove(@Nonnull T toRemove) {
    +		final PQ list = getListForElementKeyGroup(toRemove);
    +
    +		final T oldHead = peek();
    +
    +		// the branch checks if the head element has (potentially) changed.
    +		if (list.remove(toRemove)) {
    +			keyGroupHeap.adjustModifiedElement(list);
    +			// could we have a new head?
    +			return toRemove.equals(oldHead);
    +		} else {
    +			// head unchanged
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public boolean isEmpty() {
    +		return peek() == null;
    +	}
    +
    +	@Override
    +	public int size() {
    +		int sizeSum = 0;
    +		for (PQ list : keyGroupLists) {
    +			sizeSum += list.size();
    +		}
    +		return sizeSum;
    +	}
    +
    +	@Override
    +	public void addAll(@Nullable Collection<? extends T> toAdd) {
    +
    +		if (toAdd == null) {
    +			return;
    +		}
    +
    +		// TODO consider bulk loading the partitions and "heapify" keyGroupHeap once after all elements are inserted.
    +		for (T element : toAdd) {
    +			add(element);
    +		}
    +	}
    +
    +	@Nonnull
    +	@Override
    +	public CloseableIterator<T> iterator() {
    +		return new KeyGroupConcatenationIterator<>(keyGroupLists);
    +	}
    +
    +	private PQ getListForElementKeyGroup(T element) {
    +		return keyGroupLists[computeKeyGroupIndex(element)];
    +	}
    +
    +	private int computeKeyGroupIndex(T element) {
    +		final Object extractKeyFromElement = keyExtractor.extractKeyFromElement(element);
    +		final int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement, totalKeyGroups);
    +		return keyGroupId - firstKeyGroup;
    +	}
    +
    +	/**
    +	 * Iterator for {@link KeyGroupPartitionedPriorityQueue}. This iterator is not guaranteeing any order of elements.
    +	 * Using code must {@link #close()} after usage.
    +	 *
    +	 * @param <T> the type of iterated elements.
    +	 */
    +	private static final class KeyGroupConcatenationIterator<
    +		T, PQS extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
    +		implements CloseableIterator<T> {
    +
    +		/** Array with the subpartitions that we iterate. No null values in the array. */
    +		@Nonnull
    +		private final PQS[] keyGroupLists;
    +
    +		/** The subpartition the is currently iterated. */
    +		@Nonnegative
    +		private int index;
    +
    +		/** The iterator of the current subpartition. */
    +		@Nonnull
    +		private CloseableIterator<T> current;
    +
    +		private KeyGroupConcatenationIterator(@Nonnull PQS[] keyGroupLists) {
    +			this.keyGroupLists = keyGroupLists;
    +			this.index = 0;
    +			this.current = CloseableIterator.empty();
    +		}
    +
    +		@Override
    +		public boolean hasNext() {
    +			boolean currentHasNext = current.hasNext();
    +
    +			// find the iterator of the next partition that has elements.
    +			while (!currentHasNext && index < keyGroupLists.length) {
    +				IOUtils.closeQuietly(current);
    +				current = keyGroupLists[index++].iterator();
    +				currentHasNext = current.hasNext();
    +			}
    +			return currentHasNext;
    +		}
    +
    +		@Override
    +		public T next() {
    +			return current.next();
    +		}
    +
    +		@Override
    +		public void close() throws Exception {
    +			current.close();
    +		}
    +	}
    +
    +	/**
    +	 * Comparator that compares {@link InternalPriorityQueue} objects by their head element. Must handle null results
    +	 * from {@link #peek()}.
    +	 *
    +	 * @param <T> type of the elements in the compared queues.
    +	 * @param <Q> type of queue.
    +	 */
    +	private static final class InternalPriorityQueueComparator<T, Q extends InternalPriorityQueue<T>>
    +		implements Comparator<Q> {
    +
    +		/** Comparator for the queue elements, so we can compare their heads. */
    +		@Nonnull
    +		private final Comparator<T> elementComparator;
    +
    +		InternalPriorityQueueComparator(@Nonnull Comparator<T> elementComparator) {
    +			this.elementComparator = elementComparator;
    +		}
    +
    +		@Override
    +		public int compare(Q o1, Q o2) {
    +			final T leftTimer = o1.peek();
    +			final T rightTimer = o2.peek();
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6228


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199816552
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199812541
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java ---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import javax.annotation.Nonnull;
    +
    +/**
    + * Function to extract a key from a given object.
    + *
    + * @param <T> type of the element from which we extract the key.
    + */
    +@FunctionalInterface
    +public interface KeyExtractorFunction<T> {
    --- End diff --
    
    I find it useful when concepts have names attached to it and some form of typing otherwise, you end up with a lot of `Function<>` objects and have to think twice about their concrete use-case.


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199325172
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java ---
    @@ -0,0 +1,277 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.runtime.state.KeyExtractorFunction;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.IOUtils;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +
    +/**
    + * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
    + * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
    + * semantics.
    + *
    + * @param <T> the type of elements in the queue.
    + * @param <PQ> type type of sub-queue used for each key-group partition.
    + */
    +public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
    +	implements InternalPriorityQueue<T> {
    +
    +	/** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
    +	@Nonnull
    +	private final HeapPriorityQueue<PQ> keyGroupHeap;
    +
    +	/** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
    +	@Nonnull
    +	private final PQ[] keyGroupLists;
    +
    +	/** Function to extract the key from contained elements. */
    +	@Nonnull
    +	private final KeyExtractorFunction<T> keyExtractor;
    +
    +	/** The total number of key-groups (in the job). */
    +	@Nonnegative
    +	private final int totalKeyGroups;
    +
    +	/** The smallest key-group id with a subpartition managed by this ordered set. */
    +	@Nonnegative
    +	private final int firstKeyGroup;
    +
    +	@SuppressWarnings("unchecked")
    +	public KeyGroupPartitionedPriorityQueue(
    +		@Nonnull KeyExtractorFunction<T> keyExtractor,
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
    +		@Nonnull KeyGroupRange keyGroupRange,
    +		@Nonnegative int totalKeyGroups) {
    +
    +		this.keyExtractor = keyExtractor;
    +		this.totalKeyGroups = totalKeyGroups;
    +		this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
    +		this.keyGroupLists = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
    +		this.keyGroupHeap = new HeapPriorityQueue<>(
    +			new InternalPriorityQueueComparator<>(elementComparator),
    +			keyGroupRange.getNumberOfKeyGroups());
    +		for (int i = 0; i < keyGroupLists.length; i++) {
    +			final PQ keyGroupCache =
    +				orderedCacheFactory.create(firstKeyGroup + i, elementComparator);
    +			keyGroupLists[i] = keyGroupCache;
    +			keyGroupHeap.add(keyGroupCache);
    +		}
    +	}
    +
    +	@Nullable
    +	@Override
    +	public T poll() {
    +		final PQ headList = keyGroupHeap.peek();
    +		final T head = headList.poll();
    +		keyGroupHeap.adjustModifiedElement(headList);
    +		return head;
    +	}
    +
    +	@Nullable
    +	@Override
    +	public T peek() {
    +		return keyGroupHeap.peek().peek();
    +	}
    +
    +	@Override
    +	public boolean add(@Nonnull T toAdd) {
    +		final PQ list = getListForElementKeyGroup(toAdd);
    +
    +		// the branch checks if the head element has (potentially) changed.
    +		if (list.add(toAdd)) {
    +			keyGroupHeap.adjustModifiedElement(list);
    +			// could we have a new head?
    +			return toAdd.equals(peek());
    +		} else {
    +			// head unchanged
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public boolean remove(@Nonnull T toRemove) {
    +		final PQ list = getListForElementKeyGroup(toRemove);
    +
    +		final T oldHead = peek();
    +
    +		// the branch checks if the head element has (potentially) changed.
    +		if (list.remove(toRemove)) {
    +			keyGroupHeap.adjustModifiedElement(list);
    +			// could we have a new head?
    +			return toRemove.equals(oldHead);
    +		} else {
    +			// head unchanged
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public boolean isEmpty() {
    +		return peek() == null;
    +	}
    +
    +	@Override
    +	public int size() {
    +		int sizeSum = 0;
    +		for (PQ list : keyGroupLists) {
    +			sizeSum += list.size();
    +		}
    +		return sizeSum;
    +	}
    +
    +	@Override
    +	public void addAll(@Nullable Collection<? extends T> toAdd) {
    +
    +		if (toAdd == null) {
    +			return;
    +		}
    +
    +		// TODO consider bulk loading the partitions and "heapify" keyGroupHeap once after all elements are inserted.
    +		for (T element : toAdd) {
    +			add(element);
    +		}
    +	}
    +
    +	@Nonnull
    +	@Override
    +	public CloseableIterator<T> iterator() {
    +		return new KeyGroupConcatenationIterator<>(keyGroupLists);
    +	}
    +
    +	private PQ getListForElementKeyGroup(T element) {
    +		return keyGroupLists[computeKeyGroupIndex(element)];
    +	}
    +
    +	private int computeKeyGroupIndex(T element) {
    +		final Object extractKeyFromElement = keyExtractor.extractKeyFromElement(element);
    +		final int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement, totalKeyGroups);
    +		return keyGroupId - firstKeyGroup;
    +	}
    +
    +	/**
    +	 * Iterator for {@link KeyGroupPartitionedPriorityQueue}. This iterator is not guaranteeing any order of elements.
    +	 * Using code must {@link #close()} after usage.
    +	 *
    +	 * @param <T> the type of iterated elements.
    +	 */
    +	private static final class KeyGroupConcatenationIterator<
    +		T, PQS extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
    +		implements CloseableIterator<T> {
    +
    +		/** Array with the subpartitions that we iterate. No null values in the array. */
    +		@Nonnull
    +		private final PQS[] keyGroupLists;
    +
    +		/** The subpartition the is currently iterated. */
    +		@Nonnegative
    +		private int index;
    +
    +		/** The iterator of the current subpartition. */
    +		@Nonnull
    +		private CloseableIterator<T> current;
    +
    +		private KeyGroupConcatenationIterator(@Nonnull PQS[] keyGroupLists) {
    +			this.keyGroupLists = keyGroupLists;
    +			this.index = 0;
    +			this.current = CloseableIterator.empty();
    +		}
    +
    +		@Override
    +		public boolean hasNext() {
    +			boolean currentHasNext = current.hasNext();
    +
    +			// find the iterator of the next partition that has elements.
    +			while (!currentHasNext && index < keyGroupLists.length) {
    +				IOUtils.closeQuietly(current);
    +				current = keyGroupLists[index++].iterator();
    +				currentHasNext = current.hasNext();
    +			}
    +			return currentHasNext;
    +		}
    +
    +		@Override
    +		public T next() {
    +			return current.next();
    +		}
    +
    +		@Override
    +		public void close() throws Exception {
    +			current.close();
    +		}
    +	}
    +
    +	/**
    +	 * Comparator that compares {@link InternalPriorityQueue} objects by their head element. Must handle null results
    +	 * from {@link #peek()}.
    +	 *
    +	 * @param <T> type of the elements in the compared queues.
    +	 * @param <Q> type of queue.
    +	 */
    +	private static final class InternalPriorityQueueComparator<T, Q extends InternalPriorityQueue<T>>
    +		implements Comparator<Q> {
    +
    +		/** Comparator for the queue elements, so we can compare their heads. */
    +		private final Comparator<T> elementComparator;
    --- End diff --
    
    elementComparator is `@Nonnull`


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199814790
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.state.KeyExtractorFunction;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Set;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +
    +/**
    + * A heap-based priority queue with set semantics, based on {@link HeapPriorityQueue}. The heap is supported by hash
    + * set for fast contains (de-duplication) and deletes. Object identification happens based on {@link #equals(Object)}.
    + *
    + * <p>Possible future improvements:
    + * <ul>
    + *  <li>We could also implement shrinking for the heap and the deduplication set.</li>
    + *  <li>We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set
    + * would be enough if it could return existing elements on unsuccessful adding, etc..</li>
    + * </ul>
    + *
    + * @param <T> type of the contained elements.
    + */
    +public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends HeapPriorityQueue<T> {
    +
    +	/**
    +	 * Function to extract the key from contained elements.
    +	 */
    +	private final KeyExtractorFunction<T> keyExtractor;
    +
    +	/**
    +	 * This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of elements.
    +	 */
    +	private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
    +
    +	/**
    +	 * The key-group range of elements that are managed by this queue.
    +	 */
    +	private final KeyGroupRange keyGroupRange;
    +
    +	/**
    +	 * The total number of key-groups of the job.
    +	 */
    +	private final int totalNumberOfKeyGroups;
    +
    +	/**
    +	 * Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
    +	 *
    +	 * @param elementComparator comparator for the contained elements.
    +	 * @param keyExtractor function to extract a key from the contained elements.
    +	 * @param minimumCapacity the minimum and initial capacity of this priority queue.
    +	 * @param keyGroupRange the key-group range of the elements in this set.
    +	 * @param totalNumberOfKeyGroups the total number of key-groups of the job.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public HeapPriorityQueueSet(
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnull KeyExtractorFunction<T> keyExtractor,
    +		@Nonnegative int minimumCapacity,
    +		@Nonnull KeyGroupRange keyGroupRange,
    +		@Nonnegative int totalNumberOfKeyGroups) {
    +
    +		super(elementComparator, minimumCapacity);
    +
    +		this.keyExtractor = keyExtractor;
    +
    +		this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
    +		this.keyGroupRange = keyGroupRange;
    +
    +		final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups();
    +		final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange;
    +		this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange];
    +		for (int i = 0; i < keyGroupsInLocalRange; ++i) {
    +			deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize);
    +		}
    +	}
    +
    +	@Override
    +	@Nullable
    +	public T poll() {
    +		final T toRemove = super.poll();
    +		if (toRemove != null) {
    +			return getDedupMapForElement(toRemove).remove(toRemove);
    +		} else {
    +			return null;
    +		}
    +	}
    +
    +	/**
    +	 * Adds the element to the queue. In contrast to the superclass and to maintain set semantics, this happens only if
    +	 * no such element is already contained (determined by {@link #equals(Object)}).
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean add(@Nonnull T element) {
    +		return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
    +	}
    +
    +	/**
    +	 * In contrast to the superclass and to maintain set semantics, removal here is based on comparing the given element
    +	 * via {@link #equals(Object)}.
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean remove(@Nonnull T elementToRemove) {
    +		T storedElement = getDedupMapForElement(elementToRemove).remove(elementToRemove);
    +		return storedElement != null && super.remove(storedElement);
    +	}
    +
    +	@Override
    +	public void clear() {
    +		super.clear();
    +		for (HashMap<?, ?> elementHashMap :
    +			deduplicationMapsByKeyGroup) {
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199567205
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.runtime.state.KeyExtractorFunction;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Set;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
    +
    +/**
    + * A heap-based priority queue with set semantics, based on {@link HeapPriorityQueue}. The heap is supported by hash
    + * set for fast contains (de-duplication) and deletes. Object identification happens based on {@link #equals(Object)}.
    + *
    + * <p>Possible future improvements:
    + * <ul>
    + *  <li>We could also implement shrinking for the heap and the deduplication set.</li>
    + *  <li>We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set
    + * would be enough if it could return existing elements on unsuccessful adding, etc..</li>
    + * </ul>
    + *
    + * @param <T> type of the contained elements.
    + */
    +public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends HeapPriorityQueue<T> {
    +
    +	/**
    +	 * Function to extract the key from contained elements.
    +	 */
    +	private final KeyExtractorFunction<T> keyExtractor;
    +
    +	/**
    +	 * This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of elements.
    +	 */
    +	private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
    +
    +	/**
    +	 * The key-group range of elements that are managed by this queue.
    +	 */
    +	private final KeyGroupRange keyGroupRange;
    +
    +	/**
    +	 * The total number of key-groups of the job.
    +	 */
    +	private final int totalNumberOfKeyGroups;
    +
    +	/**
    +	 * Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity.
    +	 *
    +	 * @param elementComparator comparator for the contained elements.
    +	 * @param keyExtractor function to extract a key from the contained elements.
    +	 * @param minimumCapacity the minimum and initial capacity of this priority queue.
    +	 * @param keyGroupRange the key-group range of the elements in this set.
    +	 * @param totalNumberOfKeyGroups the total number of key-groups of the job.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public HeapPriorityQueueSet(
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnull KeyExtractorFunction<T> keyExtractor,
    +		@Nonnegative int minimumCapacity,
    +		@Nonnull KeyGroupRange keyGroupRange,
    +		@Nonnegative int totalNumberOfKeyGroups) {
    +
    +		super(elementComparator, minimumCapacity);
    +
    +		this.keyExtractor = keyExtractor;
    +
    +		this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
    +		this.keyGroupRange = keyGroupRange;
    +
    +		final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups();
    +		final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange;
    +		this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange];
    +		for (int i = 0; i < keyGroupsInLocalRange; ++i) {
    +			deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize);
    +		}
    +	}
    +
    +	@Override
    +	@Nullable
    +	public T poll() {
    +		final T toRemove = super.poll();
    +		if (toRemove != null) {
    +			return getDedupMapForElement(toRemove).remove(toRemove);
    +		} else {
    +			return null;
    +		}
    +	}
    +
    +	/**
    +	 * Adds the element to the queue. In contrast to the superclass and to maintain set semantics, this happens only if
    +	 * no such element is already contained (determined by {@link #equals(Object)}).
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean add(@Nonnull T element) {
    +		return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
    +	}
    +
    +	/**
    +	 * In contrast to the superclass and to maintain set semantics, removal here is based on comparing the given element
    +	 * via {@link #equals(Object)}.
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean remove(@Nonnull T elementToRemove) {
    +		T storedElement = getDedupMapForElement(elementToRemove).remove(elementToRemove);
    +		return storedElement != null && super.remove(storedElement);
    +	}
    +
    +	@Override
    +	public void clear() {
    +		super.clear();
    +		for (HashMap<?, ?> elementHashMap :
    +			deduplicationMapsByKeyGroup) {
    --- End diff --
    
    Accidental newline?


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199813559
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.util.CloseableIterator;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.Iterator;
    +import java.util.NoSuchElementException;
    +
    +import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
    +
    +/**
    + * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
    + * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
    + * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
    + * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
    + * not on equals.
    + *
    + * <p>Possible future improvements:
    + * <ul>
    + *  <li>We could also implement shrinking for the heap.</li>
    + * </ul>
    + *
    + * @param <T> type of the contained elements.
    + */
    +public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
    +
    +	/**
    +	 * The index of the head element in the array that represents the heap.
    +	 */
    +	private static final int QUEUE_HEAD_INDEX = 1;
    +
    +	/**
    +	 * Comparator for the contained elements.
    +	 */
    +	private final Comparator<T> elementComparator;
    +
    +	/**
    +	 * The array that represents the heap-organized priority queue.
    +	 */
    +	private T[] queue;
    +
    +	/**
    +	 * The current size of the priority queue.
    +	 */
    +	private int size;
    +
    +	/**
    +	 * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
    +	 *
    +	 * @param elementComparator comparator for the contained elements.
    +	 * @param minimumCapacity the minimum and initial capacity of this priority queue.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public HeapPriorityQueue(
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnegative int minimumCapacity) {
    +
    +		this.elementComparator = elementComparator;
    +		this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
    +	}
    +
    +	@Override
    +	@Nullable
    +	public T poll() {
    +		return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
    +	}
    +
    +	@Override
    +	@Nullable
    +	public T peek() {
    +		return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
    +	}
    +
    +	/**
    +	 * Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}.
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean add(@Nonnull T toAdd) {
    +		return addInternal(toAdd);
    +	}
    +
    +	/**
    +	 * This remove is based on object identity, not the result of equals.
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean remove(@Nonnull T toStop) {
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199806702
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    --- End diff --
    
    Could be useful to insert random add operation while emptying the queue, one new test or in relevant tests.
    It would check that mixed-in additions do not break the order.
    e.g. every 10 operations if size is greater than 10 to ensure convergence to empty state


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199806942
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    +		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
    +		@Nonnull Set<TestElement> checkSet,
    +		int count) {
    +
    +		ThreadLocalRandom localRandom = ThreadLocalRandom.current();
    +
    +		final int numUniqueKeys = Math.max(count / 4, 64);
    +
    +		long duplicatePriority = Long.MIN_VALUE;
    +
    +		for (int i = 0; i < count; ++i) {
    +			TestElement element;
    +			do {
    +				long elementPriority;
    +				if (duplicatePriority == Long.MIN_VALUE) {
    +					elementPriority = localRandom.nextLong();
    +				} else {
    +					elementPriority = duplicatePriority;
    +					duplicatePriority = Long.MIN_VALUE;
    +				}
    +				element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
    +			} while (!checkSet.add(element));
    +
    +			if (localRandom.nextInt(10) == 0) {
    +				duplicatePriority = element.getPriority();
    +			}
    +
    +			final boolean headChangedIndicated = priorityQueue.add(element);
    +			if (element.equals(priorityQueue.peek())) {
    +				Assert.assertTrue(headChangedIndicated);
    +			}
    +		}
    +		Assert.assertEquals(count, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testPeekPollOrder() {
    +		final int initialCapacity = 4;
    +		final int testSize = 1000;
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(initialCapacity);
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		int lastSize = priorityQueue.size();
    +		Assert.assertEquals(testSize, lastSize);
    +		TestElement testElement;
    +		while ((testElement = priorityQueue.peek()) != null) {
    +			Assert.assertFalse(priorityQueue.isEmpty());
    +			Assert.assertEquals(lastSize, priorityQueue.size());
    +			Assert.assertEquals(testElement, priorityQueue.poll());
    +			Assert.assertTrue(checkSet.remove(testElement));
    +			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = testElement.getPriority();
    +			--lastSize;
    +		}
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +		Assert.assertEquals(0, priorityQueue.size());
    +		Assert.assertEquals(0, checkSet.size());
    +	}
    +
    +	@Test
    +	public void testStopInsertMixKeepsOrder() {
    +
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		final int testSize = 128;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		// check that the whole set is still in order
    +		while (!checkSet.isEmpty()) {
    +
    +			Iterator<TestElement> iterator = checkSet.iterator();
    +			TestElement element = iterator.next();
    +			iterator.remove();
    +
    +			boolean removesHead = element.equals(priorityQueue.peek());
    +			if (removesHead) {
    +				Assert.assertTrue(priorityQueue.remove(element));
    +			} else {
    +				priorityQueue.remove(element);
    +			}
    +			Assert.assertEquals(checkSet.size(), priorityQueue.size());
    +
    +			long lastPriorityValue = Long.MIN_VALUE;
    +
    +			while ((element = priorityQueue.poll()) != null) {
    +				Assert.assertTrue(element.getPriority() >= lastPriorityValue);
    +				lastPriorityValue = element.getPriority();
    +			}
    +
    +			Assert.assertTrue(priorityQueue.isEmpty());
    +
    +			priorityQueue.addAll(checkSet);
    +		}
    +	}
    +
    +	@Test
    +	public void testPoll() {
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		Assert.assertNull(priorityQueue.poll());
    +
    +		final int testSize = 345;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		while (!priorityQueue.isEmpty()) {
    +			TestElement removed = priorityQueue.poll();
    +			Assert.assertNotNull(removed);
    +			Assert.assertTrue(checkSet.remove(removed));
    +			Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = removed.getPriority();
    +		}
    +		Assert.assertTrue(checkSet.isEmpty());
    +
    +		Assert.assertNull(priorityQueue.poll());
    +	}
    +
    +	@Test
    +	public void testIsEmpty() {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +
    +		Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
    +		Assert.assertFalse(priorityQueue.isEmpty());
    +
    +		priorityQueue.poll();
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +	}
    +
    +	@Test
    +	public void testBulkAddRestoredTimers() throws Exception {
    --- End diff --
    
    testBulkAddRestored**Elements**


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199806903
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    +		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
    +		@Nonnull Set<TestElement> checkSet,
    +		int count) {
    +
    +		ThreadLocalRandom localRandom = ThreadLocalRandom.current();
    +
    +		final int numUniqueKeys = Math.max(count / 4, 64);
    +
    +		long duplicatePriority = Long.MIN_VALUE;
    +
    +		for (int i = 0; i < count; ++i) {
    +			TestElement element;
    +			do {
    +				long elementPriority;
    +				if (duplicatePriority == Long.MIN_VALUE) {
    +					elementPriority = localRandom.nextLong();
    +				} else {
    +					elementPriority = duplicatePriority;
    +					duplicatePriority = Long.MIN_VALUE;
    +				}
    +				element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
    +			} while (!checkSet.add(element));
    +
    +			if (localRandom.nextInt(10) == 0) {
    +				duplicatePriority = element.getPriority();
    +			}
    +
    +			final boolean headChangedIndicated = priorityQueue.add(element);
    +			if (element.equals(priorityQueue.peek())) {
    +				Assert.assertTrue(headChangedIndicated);
    +			}
    +		}
    +		Assert.assertEquals(count, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testPeekPollOrder() {
    +		final int initialCapacity = 4;
    +		final int testSize = 1000;
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(initialCapacity);
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		int lastSize = priorityQueue.size();
    +		Assert.assertEquals(testSize, lastSize);
    +		TestElement testElement;
    +		while ((testElement = priorityQueue.peek()) != null) {
    +			Assert.assertFalse(priorityQueue.isEmpty());
    +			Assert.assertEquals(lastSize, priorityQueue.size());
    +			Assert.assertEquals(testElement, priorityQueue.poll());
    +			Assert.assertTrue(checkSet.remove(testElement));
    +			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = testElement.getPriority();
    +			--lastSize;
    +		}
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +		Assert.assertEquals(0, priorityQueue.size());
    +		Assert.assertEquals(0, checkSet.size());
    +	}
    +
    +	@Test
    +	public void testStopInsertMixKeepsOrder() {
    +
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		final int testSize = 128;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		// check that the whole set is still in order
    +		while (!checkSet.isEmpty()) {
    +
    +			Iterator<TestElement> iterator = checkSet.iterator();
    +			TestElement element = iterator.next();
    +			iterator.remove();
    +
    +			boolean removesHead = element.equals(priorityQueue.peek());
    +			if (removesHead) {
    +				Assert.assertTrue(priorityQueue.remove(element));
    +			} else {
    +				priorityQueue.remove(element);
    +			}
    +			Assert.assertEquals(checkSet.size(), priorityQueue.size());
    +
    +			long lastPriorityValue = Long.MIN_VALUE;
    --- End diff --
    
    Could be
    `long lastPriorityValue = removesHead ? element.getPriority() : Long.MIN_VALUE;`


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199816118
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
    +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.util.NoSuchElementException;
    +
    +/**
    + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
    + * based on RocksDB.
    + *
    + * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
    + * produced by the provided serializer for the elements!
    + *
    + * @param <T> the type of stored elements.
    + */
    +public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
    +
    +	/** Serialized empty value to insert into RocksDB. */
    +	private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199325112
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.util.CloseableIterator;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +
    +/**
    + * Interface for collection that gives in order access to elements w.r.t their priority.
    + *
    + * @param <T> type of elements in the ordered set.
    + */
    +@Internal
    +public interface InternalPriorityQueue<T> {
    +
    +	/**
    +	 * Retrieves and removes the first element (w.r.t. the order) of this set,
    +	 * or returns {@code null} if this set is empty.
    +	 *
    +	 * @return the first element of this ordered set, or {@code null} if this set is empty.
    +	 */
    +	@Nullable
    +	T poll();
    +
    +	/**
    +	 * Retrieves, but does not remove, the element (w.r.t. order) of this set,
    +	 * or returns {@code null} if this set is empty.
    +	 *
    +	 * @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty.
    +	 */
    +	@Nullable
    +	T peek();
    +
    +	/**
    +	 * Adds the given element to the set, if it is not already contained.
    +	 *
    +	 * @param toAdd the element to add to the set.
    +	 * @return <code>true</> if the operation changed the head element or if is it unclear if the head element changed.
    --- End diff --
    
    nit: unclosed HTML tag `<code>`


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199814578
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.util.CloseableIterator;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.Iterator;
    +import java.util.NoSuchElementException;
    +
    +import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
    +
    +/**
    + * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
    + * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
    + * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
    + * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
    + * not on equals.
    + *
    + * <p>Possible future improvements:
    + * <ul>
    + *  <li>We could also implement shrinking for the heap.</li>
    + * </ul>
    + *
    + * @param <T> type of the contained elements.
    + */
    +public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
    +
    +	/**
    +	 * The index of the head element in the array that represents the heap.
    +	 */
    +	private static final int QUEUE_HEAD_INDEX = 1;
    +
    +	/**
    +	 * Comparator for the contained elements.
    +	 */
    +	private final Comparator<T> elementComparator;
    +
    +	/**
    +	 * The array that represents the heap-organized priority queue.
    +	 */
    +	private T[] queue;
    +
    +	/**
    +	 * The current size of the priority queue.
    +	 */
    +	private int size;
    +
    +	/**
    +	 * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
    +	 *
    +	 * @param elementComparator comparator for the contained elements.
    +	 * @param minimumCapacity the minimum and initial capacity of this priority queue.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public HeapPriorityQueue(
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnegative int minimumCapacity) {
    +
    +		this.elementComparator = elementComparator;
    +		this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
    +	}
    +
    +	@Override
    +	@Nullable
    +	public T poll() {
    +		return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
    +	}
    +
    +	@Override
    +	@Nullable
    +	public T peek() {
    +		return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
    +	}
    +
    +	/**
    +	 * Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}.
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean add(@Nonnull T toAdd) {
    +		return addInternal(toAdd);
    +	}
    +
    +	/**
    +	 * This remove is based on object identity, not the result of equals.
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean remove(@Nonnull T toStop) {
    +		return removeInternal(toStop);
    +	}
    +
    +	@Override
    +	public boolean isEmpty() {
    +		return size() == 0;
    +	}
    +
    +	@Override
    +	@Nonnegative
    +	public int size() {
    +		return size;
    +	}
    +
    +	public void clear() {
    +		size = 0;
    +		Arrays.fill(queue, null);
    +	}
    +
    +	@SuppressWarnings({"unchecked"})
    +	@Nonnull
    +	public <O> O[] toArray(O[] out) {
    --- End diff --
    
    It is aligned with the signature from `Collection::toArray(...)`


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199813529
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.util.CloseableIterator;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.Iterator;
    +import java.util.NoSuchElementException;
    +
    +import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
    +
    +/**
    + * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
    + * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
    + * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
    + * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199320146
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java ---
    @@ -0,0 +1,288 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +
    +/**
    + * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
    + * two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth
    + * about the elements in this queue. The second storage is a (fast) ordered set cache, typically with some limited
    + * capacity. The cache is used to improve performance of accesses to the underlying store and contains contains an
    + * ordered (partial) view on the top elements in the ordered store. We are currently applying simple write-trough
    --- End diff --
    
    typo: `write-trough` -> `write-through`


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199566980
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.util.CloseableIterator;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.Iterator;
    +import java.util.NoSuchElementException;
    +
    +import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
    +
    +/**
    + * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
    + * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
    + * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
    + * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
    --- End diff --
    
    I would describe it: based on its index in the internal array, also in `remove` doc comment


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199325523
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
    @@ -0,0 +1,283 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
    +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.util.NoSuchElementException;
    +
    +/**
    + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
    + * based on RocksDB.
    + *
    + * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
    + * produced by the provided serializer for the elements!
    + *
    + * @param <T> the type of stored elements.
    + */
    +public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
    +
    +	/** Serialized empty value to insert into RocksDB. */
    +	private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
    +
    +	/** The RocksDB instance that serves as store. */
    +	@Nonnull
    +	private final RocksDB db;
    +
    +	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
    +	@Nonnull
    +	private final ColumnFamilyHandle columnFamilyHandle;
    +
    +	/** Read options for RocksDB. */
    +	@Nonnull
    +	private final ReadOptions readOptions;
    +
    +	/**
    +	 * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
    +	 * aligned with their logical order.
    +	 */
    +	@Nonnull
    +	private final TypeSerializer<T> byteOrderProducingSerializer;
    +
    +	/** Wrapper to batch all writes to RocksDB. */
    +	@Nonnull
    +	private final RocksDBWriteBatchWrapper batchWrapper;
    +
    +	/** The key-group id of all elements stored in this instance. */
    +	@Nonnegative
    +	private final int keyGroupId;
    +
    +	/** The key-group id in serialized form. */
    +	@Nonnull
    +	private final byte[] groupPrefixBytes;
    +
    +	/** Output stream that helps to serialize elements. */
    +	@Nonnull
    +	private final ByteArrayOutputStreamWithPos outputStream;
    +
    +	/** Output view that helps to serialize elements, must wrap the output stream. */
    +	@Nonnull
    +	private final DataOutputViewStreamWrapper outputView;
    +
    +	public RocksDBOrderedStore(
    +		@Nonnegative int keyGroupId,
    +		@Nonnull RocksDB db,
    +		@Nonnull ColumnFamilyHandle columnFamilyHandle,
    +		@Nonnull ReadOptions readOptions,
    +		@Nonnull TypeSerializer<T> byteOrderProducingSerializer,
    +		@Nonnull ByteArrayOutputStreamWithPos outputStream,
    +		@Nonnull DataOutputViewStreamWrapper outputView,
    +		@Nonnull RocksDBWriteBatchWrapper batchWrapper) {
    +		this.db = db;
    +		this.columnFamilyHandle = columnFamilyHandle;
    +		this.readOptions = readOptions;
    +		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
    +		this.outputStream = outputStream;
    +		this.outputView = outputView;
    +		this.keyGroupId = keyGroupId;
    +		this.batchWrapper = batchWrapper;
    +		this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
    +	}
    +
    +	private byte[] createKeyGroupBytes(int keyGroupId) {
    +
    +		outputStream.reset();
    +
    +		try {
    +			outputView.writeShort(keyGroupId);
    +		} catch (IOException e) {
    +			throw new FlinkRuntimeException("Could not write key-group bytes.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	@Override
    +	public void add(@Nonnull T element) {
    +		byte[] elementBytes = serializeElement(element);
    +		try {
    +			batchWrapper.put(columnFamilyHandle, elementBytes, DUMMY_BYTES);
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException("Error while getting element from RocksDB.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void remove(@Nonnull T element) {
    +		byte[] elementBytes = serializeElement(element);
    +		try {
    +			batchWrapper.remove(columnFamilyHandle, elementBytes);
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException("Error while removing element from RocksDB.", e);
    +		}
    +	}
    +
    +	/**
    +	 * This implementation comes at a relatively high cost per invocation. It should not be called repeatedly when it is
    +	 * clear that the value did not change. Currently this is only truly used to realize certain higher-level tests.
    +	 *
    +	 * @see org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore
    +	 */
    +	@Override
    +	public int size() {
    +
    +		int count = 0;
    +		try (final RocksToJavaIteratorAdapter iterator = orderedIterator()) {
    +			while (iterator.hasNext()) {
    +				iterator.next();
    +				++count;
    +			}
    +		}
    +
    +		return count;
    +	}
    +
    +	@Nonnull
    +	@Override
    +	public RocksToJavaIteratorAdapter orderedIterator() {
    +
    +		flushWriteBatch();
    +
    +		return new RocksToJavaIteratorAdapter(
    +			new RocksIteratorWrapper(
    +				db.newIterator(columnFamilyHandle, readOptions)));
    +	}
    +
    +	/**
    +	 * Ensures that recent writes are flushed and reflect in the RocksDB instance.
    +	 */
    +	private void flushWriteBatch() {
    +		try {
    +			batchWrapper.flush();
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException(e);
    +		}
    +	}
    +
    +	private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
    +		for (int i = 0; i < prefixBytes.length; ++i) {
    +			if (bytes[i] != prefixBytes[i]) {
    +				return false;
    +			}
    +		}
    +		return true;
    +	}
    +
    +	private byte[] serializeElement(T element) {
    +		try {
    +			outputStream.reset();
    +			outputView.writeShort(keyGroupId);
    --- End diff --
    
    nit: Not sure whether we could use the `outputView.setPosition(2)` here?


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199567018
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.util.CloseableIterator;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.Iterator;
    +import java.util.NoSuchElementException;
    +
    +import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
    +
    +/**
    + * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
    + * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
    + * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
    + * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
    + * not on equals.
    + *
    + * <p>Possible future improvements:
    + * <ul>
    + *  <li>We could also implement shrinking for the heap.</li>
    + * </ul>
    + *
    + * @param <T> type of the contained elements.
    + */
    +public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
    +
    +	/**
    +	 * The index of the head element in the array that represents the heap.
    +	 */
    +	private static final int QUEUE_HEAD_INDEX = 1;
    +
    +	/**
    +	 * Comparator for the contained elements.
    +	 */
    +	private final Comparator<T> elementComparator;
    +
    +	/**
    +	 * The array that represents the heap-organized priority queue.
    +	 */
    +	private T[] queue;
    +
    +	/**
    +	 * The current size of the priority queue.
    +	 */
    +	private int size;
    +
    +	/**
    +	 * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
    +	 *
    +	 * @param elementComparator comparator for the contained elements.
    +	 * @param minimumCapacity the minimum and initial capacity of this priority queue.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public HeapPriorityQueue(
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnegative int minimumCapacity) {
    +
    +		this.elementComparator = elementComparator;
    +		this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
    +	}
    +
    +	@Override
    +	@Nullable
    +	public T poll() {
    +		return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
    +	}
    +
    +	@Override
    +	@Nullable
    +	public T peek() {
    +		return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
    +	}
    +
    +	/**
    +	 * Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}.
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean add(@Nonnull T toAdd) {
    +		return addInternal(toAdd);
    +	}
    +
    +	/**
    +	 * This remove is based on object identity, not the result of equals.
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean remove(@Nonnull T toStop) {
    --- End diff --
    
    Maybe rename method parameter to `toRemove`


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199320135
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java ---
    @@ -0,0 +1,288 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +
    +/**
    + * This class is an implementation of a {@link InternalPriorityQueue} with set semantics that internally consists of
    + * two different storage types. The first storage is a (potentially slow) ordered set store manages the ground truth
    + * about the elements in this queue. The second storage is a (fast) ordered set cache, typically with some limited
    + * capacity. The cache is used to improve performance of accesses to the underlying store and contains contains an
    --- End diff --
    
    duplicated `contains`


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199817513
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    +		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
    +		@Nonnull Set<TestElement> checkSet,
    +		int count) {
    +
    +		ThreadLocalRandom localRandom = ThreadLocalRandom.current();
    +
    +		final int numUniqueKeys = Math.max(count / 4, 64);
    +
    +		long duplicatePriority = Long.MIN_VALUE;
    +
    +		for (int i = 0; i < count; ++i) {
    +			TestElement element;
    +			do {
    +				long elementPriority;
    +				if (duplicatePriority == Long.MIN_VALUE) {
    +					elementPriority = localRandom.nextLong();
    +				} else {
    +					elementPriority = duplicatePriority;
    +					duplicatePriority = Long.MIN_VALUE;
    +				}
    +				element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
    +			} while (!checkSet.add(element));
    +
    +			if (localRandom.nextInt(10) == 0) {
    +				duplicatePriority = element.getPriority();
    +			}
    +
    +			final boolean headChangedIndicated = priorityQueue.add(element);
    +			if (element.equals(priorityQueue.peek())) {
    +				Assert.assertTrue(headChangedIndicated);
    +			}
    +		}
    +		Assert.assertEquals(count, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testPeekPollOrder() {
    +		final int initialCapacity = 4;
    +		final int testSize = 1000;
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(initialCapacity);
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		int lastSize = priorityQueue.size();
    +		Assert.assertEquals(testSize, lastSize);
    +		TestElement testElement;
    +		while ((testElement = priorityQueue.peek()) != null) {
    +			Assert.assertFalse(priorityQueue.isEmpty());
    +			Assert.assertEquals(lastSize, priorityQueue.size());
    +			Assert.assertEquals(testElement, priorityQueue.poll());
    +			Assert.assertTrue(checkSet.remove(testElement));
    +			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = testElement.getPriority();
    +			--lastSize;
    +		}
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +		Assert.assertEquals(0, priorityQueue.size());
    +		Assert.assertEquals(0, checkSet.size());
    +	}
    +
    +	@Test
    +	public void testStopInsertMixKeepsOrder() {
    +
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		final int testSize = 128;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		// check that the whole set is still in order
    +		while (!checkSet.isEmpty()) {
    +
    +			Iterator<TestElement> iterator = checkSet.iterator();
    +			TestElement element = iterator.next();
    +			iterator.remove();
    +
    +			boolean removesHead = element.equals(priorityQueue.peek());
    +			if (removesHead) {
    +				Assert.assertTrue(priorityQueue.remove(element));
    +			} else {
    +				priorityQueue.remove(element);
    +			}
    +			Assert.assertEquals(checkSet.size(), priorityQueue.size());
    +
    +			long lastPriorityValue = Long.MIN_VALUE;
    +
    +			while ((element = priorityQueue.poll()) != null) {
    +				Assert.assertTrue(element.getPriority() >= lastPriorityValue);
    +				lastPriorityValue = element.getPriority();
    +			}
    +
    +			Assert.assertTrue(priorityQueue.isEmpty());
    +
    +			priorityQueue.addAll(checkSet);
    +		}
    +	}
    +
    +	@Test
    +	public void testPoll() {
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		Assert.assertNull(priorityQueue.poll());
    +
    +		final int testSize = 345;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		while (!priorityQueue.isEmpty()) {
    +			TestElement removed = priorityQueue.poll();
    +			Assert.assertNotNull(removed);
    +			Assert.assertTrue(checkSet.remove(removed));
    +			Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = removed.getPriority();
    +		}
    +		Assert.assertTrue(checkSet.isEmpty());
    +
    +		Assert.assertNull(priorityQueue.poll());
    +	}
    +
    +	@Test
    +	public void testIsEmpty() {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +
    +		Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
    +		Assert.assertFalse(priorityQueue.isEmpty());
    +
    +		priorityQueue.poll();
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +	}
    +
    +	@Test
    +	public void testBulkAddRestoredTimers() throws Exception {
    +		final int testSize = 10;
    +		HashSet<TestElement> elementSet = new HashSet<>(testSize);
    +		for (int i = 0; i < testSize; ++i) {
    +			elementSet.add(new TestElement(i, i));
    +		}
    +
    +		List<TestElement> twoTimesElementSet = new ArrayList<>(elementSet.size() * 2);
    +
    +		for (TestElement testElement : elementSet) {
    +			twoTimesElementSet.add(testElement.deepCopy());
    +			twoTimesElementSet.add(testElement.deepCopy());
    +		}
    +
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		priorityQueue.addAll(twoTimesElementSet);
    +		priorityQueue.addAll(elementSet);
    +
    +		final int expectedSize = testSetSemantics() ? elementSet.size() : 3 * elementSet.size();
    +
    +		Assert.assertEquals(expectedSize, priorityQueue.size());
    +		try (final CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
    +			while (iterator.hasNext()) {
    +				if (testSetSemantics()) {
    +					Assert.assertTrue(elementSet.remove(iterator.next()));
    +				} else {
    +					Assert.assertTrue(elementSet.contains(iterator.next()));
    +				}
    +			}
    +		}
    +		if (testSetSemantics()) {
    +			Assert.assertTrue(elementSet.isEmpty());
    +		}
    +	}
    +
    +	@Test
    +	public void testIterator() throws Exception {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		// test empty iterator
    +		try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
    +			Assert.assertFalse(iterator.hasNext());
    +			try {
    +				iterator.next();
    +				Assert.fail();
    +			} catch (NoSuchElementException ignore) {
    +			}
    +		}
    +
    +		// iterate some data
    +		final int testSize = 10;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +		try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
    +			while (iterator.hasNext()) {
    +				Assert.assertTrue(checkSet.remove(iterator.next()));
    +			}
    +			Assert.assertTrue(checkSet.isEmpty());
    +		}
    +	}
    +
    +	@Test
    +	public void testAdd() {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		TestElement lowPrioElement = new TestElement(4711L, 42L);
    +		TestElement highPrioElement = new TestElement(815L, 23L);
    +		Assert.assertTrue(priorityQueue.add(lowPrioElement));
    +		if (testSetSemantics()) {
    +			priorityQueue.add(lowPrioElement.deepCopy());
    +		}
    +		Assert.assertEquals(1, priorityQueue.size());
    +		Assert.assertTrue(priorityQueue.add(highPrioElement));
    +		Assert.assertEquals(2, priorityQueue.size());
    +		Assert.assertEquals(highPrioElement, priorityQueue.poll());
    +		Assert.assertEquals(1, priorityQueue.size());
    +		Assert.assertEquals(lowPrioElement, priorityQueue.poll());
    +		Assert.assertEquals(0, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testRemove() {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		final long key = 4711L;
    +		final long priorityValue = 42L;
    +		final TestElement testElement = new TestElement(key, priorityValue);
    +		if (testSetSemantics()) {
    +			Assert.assertFalse(priorityQueue.remove(testElement));
    +		}
    +		Assert.assertTrue(priorityQueue.add(testElement));
    +		Assert.assertTrue(priorityQueue.remove(testElement));
    +		if (testSetSemantics()) {
    +			Assert.assertFalse(priorityQueue.remove(testElement));
    +		}
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +	}
    +
    +	protected abstract InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity);
    +
    +	protected abstract boolean testSetSemantics();
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199806792
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    --- End diff --
    
    insertRandom**Elements**


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199807071
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    +		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
    +		@Nonnull Set<TestElement> checkSet,
    +		int count) {
    +
    +		ThreadLocalRandom localRandom = ThreadLocalRandom.current();
    +
    +		final int numUniqueKeys = Math.max(count / 4, 64);
    +
    +		long duplicatePriority = Long.MIN_VALUE;
    +
    +		for (int i = 0; i < count; ++i) {
    +			TestElement element;
    +			do {
    +				long elementPriority;
    +				if (duplicatePriority == Long.MIN_VALUE) {
    +					elementPriority = localRandom.nextLong();
    +				} else {
    +					elementPriority = duplicatePriority;
    +					duplicatePriority = Long.MIN_VALUE;
    +				}
    +				element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
    +			} while (!checkSet.add(element));
    +
    +			if (localRandom.nextInt(10) == 0) {
    +				duplicatePriority = element.getPriority();
    +			}
    +
    +			final boolean headChangedIndicated = priorityQueue.add(element);
    +			if (element.equals(priorityQueue.peek())) {
    +				Assert.assertTrue(headChangedIndicated);
    +			}
    +		}
    +		Assert.assertEquals(count, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testPeekPollOrder() {
    +		final int initialCapacity = 4;
    +		final int testSize = 1000;
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(initialCapacity);
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		int lastSize = priorityQueue.size();
    +		Assert.assertEquals(testSize, lastSize);
    +		TestElement testElement;
    +		while ((testElement = priorityQueue.peek()) != null) {
    +			Assert.assertFalse(priorityQueue.isEmpty());
    +			Assert.assertEquals(lastSize, priorityQueue.size());
    +			Assert.assertEquals(testElement, priorityQueue.poll());
    +			Assert.assertTrue(checkSet.remove(testElement));
    +			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = testElement.getPriority();
    +			--lastSize;
    +		}
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +		Assert.assertEquals(0, priorityQueue.size());
    +		Assert.assertEquals(0, checkSet.size());
    +	}
    +
    +	@Test
    +	public void testStopInsertMixKeepsOrder() {
    +
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		final int testSize = 128;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		// check that the whole set is still in order
    +		while (!checkSet.isEmpty()) {
    +
    +			Iterator<TestElement> iterator = checkSet.iterator();
    +			TestElement element = iterator.next();
    +			iterator.remove();
    +
    +			boolean removesHead = element.equals(priorityQueue.peek());
    +			if (removesHead) {
    +				Assert.assertTrue(priorityQueue.remove(element));
    +			} else {
    +				priorityQueue.remove(element);
    +			}
    +			Assert.assertEquals(checkSet.size(), priorityQueue.size());
    +
    +			long lastPriorityValue = Long.MIN_VALUE;
    +
    +			while ((element = priorityQueue.poll()) != null) {
    +				Assert.assertTrue(element.getPriority() >= lastPriorityValue);
    +				lastPriorityValue = element.getPriority();
    +			}
    +
    +			Assert.assertTrue(priorityQueue.isEmpty());
    +
    +			priorityQueue.addAll(checkSet);
    +		}
    +	}
    +
    +	@Test
    +	public void testPoll() {
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		Assert.assertNull(priorityQueue.poll());
    +
    +		final int testSize = 345;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		while (!priorityQueue.isEmpty()) {
    +			TestElement removed = priorityQueue.poll();
    +			Assert.assertNotNull(removed);
    +			Assert.assertTrue(checkSet.remove(removed));
    +			Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = removed.getPriority();
    +		}
    +		Assert.assertTrue(checkSet.isEmpty());
    +
    +		Assert.assertNull(priorityQueue.poll());
    +	}
    +
    +	@Test
    +	public void testIsEmpty() {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +
    +		Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
    +		Assert.assertFalse(priorityQueue.isEmpty());
    +
    +		priorityQueue.poll();
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +	}
    +
    +	@Test
    +	public void testBulkAddRestoredTimers() throws Exception {
    +		final int testSize = 10;
    +		HashSet<TestElement> elementSet = new HashSet<>(testSize);
    +		for (int i = 0; i < testSize; ++i) {
    +			elementSet.add(new TestElement(i, i));
    +		}
    +
    +		List<TestElement> twoTimesElementSet = new ArrayList<>(elementSet.size() * 2);
    +
    +		for (TestElement testElement : elementSet) {
    +			twoTimesElementSet.add(testElement.deepCopy());
    +			twoTimesElementSet.add(testElement.deepCopy());
    +		}
    +
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		priorityQueue.addAll(twoTimesElementSet);
    +		priorityQueue.addAll(elementSet);
    +
    +		final int expectedSize = testSetSemantics() ? elementSet.size() : 3 * elementSet.size();
    +
    +		Assert.assertEquals(expectedSize, priorityQueue.size());
    +		try (final CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
    +			while (iterator.hasNext()) {
    +				if (testSetSemantics()) {
    +					Assert.assertTrue(elementSet.remove(iterator.next()));
    +				} else {
    +					Assert.assertTrue(elementSet.contains(iterator.next()));
    +				}
    +			}
    +		}
    +		if (testSetSemantics()) {
    +			Assert.assertTrue(elementSet.isEmpty());
    +		}
    +	}
    +
    +	@Test
    +	public void testIterator() throws Exception {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		// test empty iterator
    +		try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
    +			Assert.assertFalse(iterator.hasNext());
    +			try {
    +				iterator.next();
    +				Assert.fail();
    +			} catch (NoSuchElementException ignore) {
    +			}
    +		}
    +
    +		// iterate some data
    +		final int testSize = 10;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +		try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
    +			while (iterator.hasNext()) {
    +				Assert.assertTrue(checkSet.remove(iterator.next()));
    +			}
    +			Assert.assertTrue(checkSet.isEmpty());
    +		}
    +	}
    +
    +	@Test
    +	public void testAdd() {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		TestElement lowPrioElement = new TestElement(4711L, 42L);
    +		TestElement highPrioElement = new TestElement(815L, 23L);
    +		Assert.assertTrue(priorityQueue.add(lowPrioElement));
    +		if (testSetSemantics()) {
    +			priorityQueue.add(lowPrioElement.deepCopy());
    +		}
    +		Assert.assertEquals(1, priorityQueue.size());
    +		Assert.assertTrue(priorityQueue.add(highPrioElement));
    +		Assert.assertEquals(2, priorityQueue.size());
    +		Assert.assertEquals(highPrioElement, priorityQueue.poll());
    +		Assert.assertEquals(1, priorityQueue.size());
    +		Assert.assertEquals(lowPrioElement, priorityQueue.poll());
    +		Assert.assertEquals(0, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testRemove() {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		final long key = 4711L;
    +		final long priorityValue = 42L;
    +		final TestElement testElement = new TestElement(key, priorityValue);
    +		if (testSetSemantics()) {
    +			Assert.assertFalse(priorityQueue.remove(testElement));
    +		}
    +		Assert.assertTrue(priorityQueue.add(testElement));
    +		Assert.assertTrue(priorityQueue.remove(testElement));
    +		if (testSetSemantics()) {
    +			Assert.assertFalse(priorityQueue.remove(testElement));
    +		}
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +	}
    +
    +	protected abstract InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity);
    +
    +	protected abstract boolean testSetSemantics();
    +
    +	/**
    +	 * Payload for usage in the test.
    +	 */
    +	protected static class TestElement implements HeapPriorityQueueElement {
    +
    +		private final long key;
    +		private final long priority;
    +		private int internalIndex;
    +
    +		public TestElement(long key, long priority) {
    +			this.key = key;
    +			this.priority = priority;
    +			this.internalIndex = NOT_CONTAINED;
    +		}
    +
    +		public long getKey() {
    +			return key;
    +		}
    +
    +		public long getPriority() {
    +			return priority;
    +		}
    +
    +		@Override
    +		public int getInternalIndex() {
    +			return internalIndex;
    +		}
    +
    +		@Override
    +		public void setInternalIndex(int newIndex) {
    +			internalIndex = newIndex;
    +		}
    +
    +		@Override
    +		public boolean equals(Object o) {
    +			if (this == o) {
    +				return true;
    +			}
    +			if (o == null || getClass() != o.getClass()) {
    +				return false;
    +			}
    +			TestElement that = (TestElement) o;
    +			return getKey() == that.getKey() &&
    +				getPriority() == that.getPriority();
    +		}
    +
    +		@Override
    +		public int hashCode() {
    +			return Objects.hash(getKey(), getPriority());
    +		}
    +
    +		public TestElement deepCopy() {
    +			return new TestElement(key, priority);
    +		}
    +	}
    +
    +	/**
    +	 * Serializer for {@link TestElement}. The serialization format produced by this serializer allows lexicographic
    +	 * ordering by {@link TestElement#getPriority}.
    +	 */
    +	protected static class TestElementSerializer extends TypeSerializer<TestElement> {
    --- End diff --
    
    extending `CompositeSerializer` from #6196 might reduce code a bit


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199841799
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    --- End diff --
    
    good point 👍 


---

[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6228
  
    @sihuazhou @azagrebin thanks for the reviews! I will merge this once my travis is green.


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199427083
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
    @@ -0,0 +1,283 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
    +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.util.NoSuchElementException;
    +
    +/**
    + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
    + * based on RocksDB.
    + *
    + * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
    + * produced by the provided serializer for the elements!
    + *
    + * @param <T> the type of stored elements.
    + */
    +public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
    +
    +	/** Serialized empty value to insert into RocksDB. */
    +	private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
    +
    +	/** The RocksDB instance that serves as store. */
    +	@Nonnull
    +	private final RocksDB db;
    +
    +	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
    +	@Nonnull
    +	private final ColumnFamilyHandle columnFamilyHandle;
    +
    +	/** Read options for RocksDB. */
    +	@Nonnull
    +	private final ReadOptions readOptions;
    +
    +	/**
    +	 * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
    +	 * aligned with their logical order.
    +	 */
    +	@Nonnull
    +	private final TypeSerializer<T> byteOrderProducingSerializer;
    +
    +	/** Wrapper to batch all writes to RocksDB. */
    +	@Nonnull
    +	private final RocksDBWriteBatchWrapper batchWrapper;
    +
    +	/** The key-group id of all elements stored in this instance. */
    +	@Nonnegative
    +	private final int keyGroupId;
    +
    +	/** The key-group id in serialized form. */
    +	@Nonnull
    +	private final byte[] groupPrefixBytes;
    +
    +	/** Output stream that helps to serialize elements. */
    +	@Nonnull
    +	private final ByteArrayOutputStreamWithPos outputStream;
    +
    +	/** Output view that helps to serialize elements, must wrap the output stream. */
    +	@Nonnull
    +	private final DataOutputViewStreamWrapper outputView;
    +
    +	public RocksDBOrderedStore(
    +		@Nonnegative int keyGroupId,
    +		@Nonnull RocksDB db,
    +		@Nonnull ColumnFamilyHandle columnFamilyHandle,
    +		@Nonnull ReadOptions readOptions,
    +		@Nonnull TypeSerializer<T> byteOrderProducingSerializer,
    +		@Nonnull ByteArrayOutputStreamWithPos outputStream,
    +		@Nonnull DataOutputViewStreamWrapper outputView,
    +		@Nonnull RocksDBWriteBatchWrapper batchWrapper) {
    +		this.db = db;
    +		this.columnFamilyHandle = columnFamilyHandle;
    +		this.readOptions = readOptions;
    +		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
    +		this.outputStream = outputStream;
    +		this.outputView = outputView;
    +		this.keyGroupId = keyGroupId;
    +		this.batchWrapper = batchWrapper;
    +		this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
    +	}
    +
    +	private byte[] createKeyGroupBytes(int keyGroupId) {
    +
    +		outputStream.reset();
    +
    +		try {
    +			outputView.writeShort(keyGroupId);
    +		} catch (IOException e) {
    +			throw new FlinkRuntimeException("Could not write key-group bytes.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	@Override
    +	public void add(@Nonnull T element) {
    +		byte[] elementBytes = serializeElement(element);
    +		try {
    +			batchWrapper.put(columnFamilyHandle, elementBytes, DUMMY_BYTES);
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException("Error while getting element from RocksDB.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void remove(@Nonnull T element) {
    +		byte[] elementBytes = serializeElement(element);
    +		try {
    +			batchWrapper.remove(columnFamilyHandle, elementBytes);
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException("Error while removing element from RocksDB.", e);
    +		}
    +	}
    +
    +	/**
    +	 * This implementation comes at a relatively high cost per invocation. It should not be called repeatedly when it is
    +	 * clear that the value did not change. Currently this is only truly used to realize certain higher-level tests.
    +	 *
    +	 * @see org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore
    +	 */
    +	@Override
    +	public int size() {
    +
    +		int count = 0;
    +		try (final RocksToJavaIteratorAdapter iterator = orderedIterator()) {
    +			while (iterator.hasNext()) {
    +				iterator.next();
    +				++count;
    +			}
    +		}
    +
    +		return count;
    +	}
    +
    +	@Nonnull
    +	@Override
    +	public RocksToJavaIteratorAdapter orderedIterator() {
    +
    +		flushWriteBatch();
    +
    +		return new RocksToJavaIteratorAdapter(
    +			new RocksIteratorWrapper(
    +				db.newIterator(columnFamilyHandle, readOptions)));
    +	}
    +
    +	/**
    +	 * Ensures that recent writes are flushed and reflect in the RocksDB instance.
    +	 */
    +	private void flushWriteBatch() {
    +		try {
    +			batchWrapper.flush();
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException(e);
    +		}
    +	}
    +
    +	private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
    +		for (int i = 0; i < prefixBytes.length; ++i) {
    +			if (bytes[i] != prefixBytes[i]) {
    +				return false;
    +			}
    +		}
    +		return true;
    +	}
    +
    +	private byte[] serializeElement(T element) {
    +		try {
    +			outputStream.reset();
    +			outputView.writeShort(keyGroupId);
    --- End diff --
    
    It depends. If we do it like this, we can share the same outstream and view instances across all stores, i.e. all key-groups in the operator. Like this, we only have to allocate a single bigger `byte[]` as "write buffer" and not one per key-group. In the same way, I am later also considering to share the same write batch.


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199325117
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.util.CloseableIterator;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +
    +/**
    + * Interface for collection that gives in order access to elements w.r.t their priority.
    + *
    + * @param <T> type of elements in the ordered set.
    + */
    +@Internal
    +public interface InternalPriorityQueue<T> {
    +
    +	/**
    +	 * Retrieves and removes the first element (w.r.t. the order) of this set,
    +	 * or returns {@code null} if this set is empty.
    +	 *
    +	 * @return the first element of this ordered set, or {@code null} if this set is empty.
    +	 */
    +	@Nullable
    +	T poll();
    +
    +	/**
    +	 * Retrieves, but does not remove, the element (w.r.t. order) of this set,
    +	 * or returns {@code null} if this set is empty.
    +	 *
    +	 * @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty.
    +	 */
    +	@Nullable
    +	T peek();
    +
    +	/**
    +	 * Adds the given element to the set, if it is not already contained.
    +	 *
    +	 * @param toAdd the element to add to the set.
    +	 * @return <code>true</> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</> iff the head element was not changed by this operation.
    +	 */
    +	boolean add(@Nonnull T toAdd);
    +
    +	/**
    +	 * Removes the given element from the set, if is contained in the set.
    +	 *
    +	 * @param toRemove the element to remove.
    +	 * @return <code>true</> if the operation changed the head element or if is it unclear if the head element changed.
    --- End diff --
    
    nit: unclosed HTML tag `<code>`


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199320321
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
    @@ -0,0 +1,283 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
    +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.util.NoSuchElementException;
    +
    +/**
    + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
    + * based on RocksDB.
    + *
    + * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
    + * produced by the provided serializer for the elements!
    + *
    + * @param <T> the type of stored elements.
    + */
    +public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
    +
    +	/** Serialized empty value to insert into RocksDB. */
    +	private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
    +
    +	/** The RocksDB instance that serves as store. */
    +	@Nonnull
    +	private final RocksDB db;
    +
    +	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
    +	@Nonnull
    +	private final ColumnFamilyHandle columnFamilyHandle;
    +
    +	/** Read options for RocksDB. */
    +	@Nonnull
    +	private final ReadOptions readOptions;
    +
    +	/**
    +	 * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
    +	 * aligned with their logical order.
    +	 */
    +	@Nonnull
    +	private final TypeSerializer<T> byteOrderProducingSerializer;
    +
    +	/** Wrapper to batch all writes to RocksDB. */
    +	@Nonnull
    +	private final RocksDBWriteBatchWrapper batchWrapper;
    +
    +	/** The key-group id of all elements stored in this instance. */
    +	@Nonnegative
    +	private final int keyGroupId;
    +
    +	/** The key-group id in serialized form. */
    +	@Nonnull
    +	private final byte[] groupPrefixBytes;
    +
    +	/** Output stream that helps to serialize elements. */
    +	@Nonnull
    +	private final ByteArrayOutputStreamWithPos outputStream;
    +
    +	/** Output view that helps to serialize elements, must wrap the output stream. */
    +	@Nonnull
    +	private final DataOutputViewStreamWrapper outputView;
    +
    +	public RocksDBOrderedStore(
    +		@Nonnegative int keyGroupId,
    +		@Nonnull RocksDB db,
    +		@Nonnull ColumnFamilyHandle columnFamilyHandle,
    +		@Nonnull ReadOptions readOptions,
    +		@Nonnull TypeSerializer<T> byteOrderProducingSerializer,
    +		@Nonnull ByteArrayOutputStreamWithPos outputStream,
    +		@Nonnull DataOutputViewStreamWrapper outputView,
    +		@Nonnull RocksDBWriteBatchWrapper batchWrapper) {
    +		this.db = db;
    +		this.columnFamilyHandle = columnFamilyHandle;
    +		this.readOptions = readOptions;
    +		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
    +		this.outputStream = outputStream;
    +		this.outputView = outputView;
    +		this.keyGroupId = keyGroupId;
    +		this.batchWrapper = batchWrapper;
    +		this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
    +	}
    +
    +	private byte[] createKeyGroupBytes(int keyGroupId) {
    --- End diff --
    
    In the `RocksDBKeyedStateBackend` we firstly compute the `keyGroupPrefixBytes`(1 or 2 bytes), for what reason we always use 2 bytes for the key group here?


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199442186
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
    @@ -0,0 +1,283 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
    +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.util.NoSuchElementException;
    +
    +/**
    + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
    + * based on RocksDB.
    + *
    + * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
    + * produced by the provided serializer for the elements!
    + *
    + * @param <T> the type of stored elements.
    + */
    +public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
    +
    +	/** Serialized empty value to insert into RocksDB. */
    +	private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
    +
    +	/** The RocksDB instance that serves as store. */
    +	@Nonnull
    +	private final RocksDB db;
    +
    +	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
    +	@Nonnull
    +	private final ColumnFamilyHandle columnFamilyHandle;
    +
    +	/** Read options for RocksDB. */
    +	@Nonnull
    +	private final ReadOptions readOptions;
    +
    +	/**
    +	 * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
    +	 * aligned with their logical order.
    +	 */
    +	@Nonnull
    +	private final TypeSerializer<T> byteOrderProducingSerializer;
    +
    +	/** Wrapper to batch all writes to RocksDB. */
    +	@Nonnull
    +	private final RocksDBWriteBatchWrapper batchWrapper;
    +
    +	/** The key-group id of all elements stored in this instance. */
    +	@Nonnegative
    +	private final int keyGroupId;
    +
    +	/** The key-group id in serialized form. */
    +	@Nonnull
    +	private final byte[] groupPrefixBytes;
    +
    +	/** Output stream that helps to serialize elements. */
    +	@Nonnull
    +	private final ByteArrayOutputStreamWithPos outputStream;
    +
    +	/** Output view that helps to serialize elements, must wrap the output stream. */
    +	@Nonnull
    +	private final DataOutputViewStreamWrapper outputView;
    +
    +	public RocksDBOrderedStore(
    +		@Nonnegative int keyGroupId,
    +		@Nonnull RocksDB db,
    +		@Nonnull ColumnFamilyHandle columnFamilyHandle,
    +		@Nonnull ReadOptions readOptions,
    +		@Nonnull TypeSerializer<T> byteOrderProducingSerializer,
    +		@Nonnull ByteArrayOutputStreamWithPos outputStream,
    +		@Nonnull DataOutputViewStreamWrapper outputView,
    +		@Nonnull RocksDBWriteBatchWrapper batchWrapper) {
    +		this.db = db;
    +		this.columnFamilyHandle = columnFamilyHandle;
    +		this.readOptions = readOptions;
    +		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
    +		this.outputStream = outputStream;
    +		this.outputView = outputView;
    +		this.keyGroupId = keyGroupId;
    +		this.batchWrapper = batchWrapper;
    +		this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
    +	}
    +
    +	private byte[] createKeyGroupBytes(int keyGroupId) {
    --- End diff --
    
    I did it for simplicity, but you are right. Will change it to variable size.


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199806862
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    +		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
    +		@Nonnull Set<TestElement> checkSet,
    +		int count) {
    +
    +		ThreadLocalRandom localRandom = ThreadLocalRandom.current();
    +
    +		final int numUniqueKeys = Math.max(count / 4, 64);
    +
    +		long duplicatePriority = Long.MIN_VALUE;
    +
    +		for (int i = 0; i < count; ++i) {
    +			TestElement element;
    +			do {
    +				long elementPriority;
    +				if (duplicatePriority == Long.MIN_VALUE) {
    +					elementPriority = localRandom.nextLong();
    +				} else {
    +					elementPriority = duplicatePriority;
    +					duplicatePriority = Long.MIN_VALUE;
    +				}
    +				element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
    +			} while (!checkSet.add(element));
    +
    +			if (localRandom.nextInt(10) == 0) {
    +				duplicatePriority = element.getPriority();
    +			}
    +
    +			final boolean headChangedIndicated = priorityQueue.add(element);
    +			if (element.equals(priorityQueue.peek())) {
    +				Assert.assertTrue(headChangedIndicated);
    +			}
    +		}
    +		Assert.assertEquals(count, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testPeekPollOrder() {
    +		final int initialCapacity = 4;
    +		final int testSize = 1000;
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(initialCapacity);
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		int lastSize = priorityQueue.size();
    +		Assert.assertEquals(testSize, lastSize);
    +		TestElement testElement;
    +		while ((testElement = priorityQueue.peek()) != null) {
    +			Assert.assertFalse(priorityQueue.isEmpty());
    +			Assert.assertEquals(lastSize, priorityQueue.size());
    +			Assert.assertEquals(testElement, priorityQueue.poll());
    +			Assert.assertTrue(checkSet.remove(testElement));
    +			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = testElement.getPriority();
    +			--lastSize;
    +		}
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +		Assert.assertEquals(0, priorityQueue.size());
    +		Assert.assertEquals(0, checkSet.size());
    +	}
    +
    +	@Test
    +	public void testStopInsertMixKeepsOrder() {
    --- End diff --
    
    test**Remove**InsertMixKeepsOrder


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199567286
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java ---
    @@ -0,0 +1,280 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.runtime.state.KeyExtractorFunction;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.IOUtils;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +
    +/**
    + * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
    + * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
    + * semantics.
    + *
    + * @param <T> the type of elements in the queue.
    + * @param <PQ> type type of sub-queue used for each key-group partition.
    + */
    +public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
    +	implements InternalPriorityQueue<T> {
    +
    +	/** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
    +	@Nonnull
    +	private final HeapPriorityQueue<PQ> keyGroupHeap;
    +
    +	/** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
    +	@Nonnull
    +	private final PQ[] keyGroupLists;
    +
    +	/** Function to extract the key from contained elements. */
    +	@Nonnull
    +	private final KeyExtractorFunction<T> keyExtractor;
    +
    +	/** The total number of key-groups (in the job). */
    +	@Nonnegative
    +	private final int totalKeyGroups;
    +
    +	/** The smallest key-group id with a subpartition managed by this ordered set. */
    +	@Nonnegative
    +	private final int firstKeyGroup;
    +
    +	@SuppressWarnings("unchecked")
    +	public KeyGroupPartitionedPriorityQueue(
    +		@Nonnull KeyExtractorFunction<T> keyExtractor,
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
    +		@Nonnull KeyGroupRange keyGroupRange,
    +		@Nonnegative int totalKeyGroups) {
    +
    +		this.keyExtractor = keyExtractor;
    +		this.totalKeyGroups = totalKeyGroups;
    +		this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
    +		this.keyGroupLists = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
    +		this.keyGroupHeap = new HeapPriorityQueue<>(
    +			new InternalPriorityQueueComparator<>(elementComparator),
    +			keyGroupRange.getNumberOfKeyGroups());
    +		for (int i = 0; i < keyGroupLists.length; i++) {
    +			final PQ keyGroupCache =
    +				orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementComparator);
    +			keyGroupLists[i] = keyGroupCache;
    +			keyGroupHeap.add(keyGroupCache);
    +		}
    +	}
    +
    +	@Nullable
    +	@Override
    +	public T poll() {
    +		final PQ headList = keyGroupHeap.peek();
    +		final T head = headList.poll();
    +		keyGroupHeap.adjustModifiedElement(headList);
    +		return head;
    +	}
    +
    +	@Nullable
    +	@Override
    +	public T peek() {
    +		return keyGroupHeap.peek().peek();
    +	}
    +
    +	@Override
    +	public boolean add(@Nonnull T toAdd) {
    +		final PQ list = getListForElementKeyGroup(toAdd);
    +
    +		// the branch checks if the head element has (potentially) changed.
    +		if (list.add(toAdd)) {
    +			keyGroupHeap.adjustModifiedElement(list);
    +			// could we have a new head?
    +			return toAdd.equals(peek());
    +		} else {
    +			// head unchanged
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public boolean remove(@Nonnull T toRemove) {
    +		final PQ list = getListForElementKeyGroup(toRemove);
    +
    +		final T oldHead = peek();
    +
    +		// the branch checks if the head element has (potentially) changed.
    +		if (list.remove(toRemove)) {
    +			keyGroupHeap.adjustModifiedElement(list);
    +			// could we have a new head?
    +			return toRemove.equals(oldHead);
    +		} else {
    +			// head unchanged
    +			return false;
    +		}
    +	}
    +
    +	@Override
    +	public boolean isEmpty() {
    +		return peek() == null;
    +	}
    +
    +	@Override
    +	public int size() {
    +		int sizeSum = 0;
    +		for (PQ list : keyGroupLists) {
    +			sizeSum += list.size();
    +		}
    +		return sizeSum;
    +	}
    +
    +	@Override
    +	public void addAll(@Nullable Collection<? extends T> toAdd) {
    +
    +		if (toAdd == null) {
    +			return;
    +		}
    +
    +		// TODO consider bulk loading the partitions and "heapify" keyGroupHeap once after all elements are inserted.
    +		for (T element : toAdd) {
    +			add(element);
    +		}
    +	}
    +
    +	@Nonnull
    +	@Override
    +	public CloseableIterator<T> iterator() {
    +		return new KeyGroupConcatenationIterator<>(keyGroupLists);
    +	}
    +
    +	private PQ getListForElementKeyGroup(T element) {
    +		return keyGroupLists[computeKeyGroupIndex(element)];
    +	}
    +
    +	private int computeKeyGroupIndex(T element) {
    +		final Object extractKeyFromElement = keyExtractor.extractKeyFromElement(element);
    +		final int keyGroupId = KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement, totalKeyGroups);
    +		return keyGroupId - firstKeyGroup;
    +	}
    +
    +	/**
    +	 * Iterator for {@link KeyGroupPartitionedPriorityQueue}. This iterator is not guaranteeing any order of elements.
    +	 * Using code must {@link #close()} after usage.
    +	 *
    +	 * @param <T> the type of iterated elements.
    +	 */
    +	private static final class KeyGroupConcatenationIterator<
    +		T, PQS extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
    +		implements CloseableIterator<T> {
    +
    +		/** Array with the subpartitions that we iterate. No null values in the array. */
    +		@Nonnull
    +		private final PQS[] keyGroupLists;
    +
    +		/** The subpartition the is currently iterated. */
    +		@Nonnegative
    +		private int index;
    +
    +		/** The iterator of the current subpartition. */
    +		@Nonnull
    +		private CloseableIterator<T> current;
    +
    +		private KeyGroupConcatenationIterator(@Nonnull PQS[] keyGroupLists) {
    +			this.keyGroupLists = keyGroupLists;
    +			this.index = 0;
    +			this.current = CloseableIterator.empty();
    +		}
    +
    +		@Override
    +		public boolean hasNext() {
    +			boolean currentHasNext = current.hasNext();
    +
    +			// find the iterator of the next partition that has elements.
    +			while (!currentHasNext && index < keyGroupLists.length) {
    +				IOUtils.closeQuietly(current);
    +				current = keyGroupLists[index++].iterator();
    +				currentHasNext = current.hasNext();
    +			}
    +			return currentHasNext;
    +		}
    +
    +		@Override
    +		public T next() {
    +			return current.next();
    +		}
    +
    +		@Override
    +		public void close() throws Exception {
    +			current.close();
    +		}
    +	}
    +
    +	/**
    +	 * Comparator that compares {@link InternalPriorityQueue} objects by their head element. Must handle null results
    +	 * from {@link #peek()}.
    +	 *
    +	 * @param <T> type of the elements in the compared queues.
    +	 * @param <Q> type of queue.
    +	 */
    +	private static final class InternalPriorityQueueComparator<T, Q extends InternalPriorityQueue<T>>
    +		implements Comparator<Q> {
    +
    +		/** Comparator for the queue elements, so we can compare their heads. */
    +		@Nonnull
    +		private final Comparator<T> elementComparator;
    +
    +		InternalPriorityQueueComparator(@Nonnull Comparator<T> elementComparator) {
    +			this.elementComparator = elementComparator;
    +		}
    +
    +		@Override
    +		public int compare(Q o1, Q o2) {
    +			final T leftTimer = o1.peek();
    +			final T rightTimer = o2.peek();
    --- End diff --
    
    `xTimer` -> `xElement`, probably left over from timer heap


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199815863
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java ---
    @@ -0,0 +1,280 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.runtime.state.KeyExtractorFunction;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.IOUtils;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +
    +/**
    + * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
    + * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
    + * semantics.
    + *
    + * @param <T> the type of elements in the queue.
    + * @param <PQ> type type of sub-queue used for each key-group partition.
    + */
    +public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
    +	implements InternalPriorityQueue<T> {
    +
    +	/** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
    +	@Nonnull
    +	private final HeapPriorityQueue<PQ> keyGroupHeap;
    +
    +	/** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
    +	@Nonnull
    +	private final PQ[] keyGroupLists;
    --- End diff --
    
    👍 


---

[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6228
  
    CC @azagrebin  @sihuazhou 


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199812074
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.util.CloseableIterator;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +
    +/**
    + * Interface for collection that gives in order access to elements w.r.t their priority.
    + *
    + * @param <T> type of elements in the ordered set.
    + */
    +@Internal
    +public interface InternalPriorityQueue<T> {
    +
    +	/**
    +	 * Retrieves and removes the first element (w.r.t. the order) of this set,
    +	 * or returns {@code null} if this set is empty.
    +	 *
    +	 * @return the first element of this ordered set, or {@code null} if this set is empty.
    +	 */
    +	@Nullable
    +	T poll();
    +
    +	/**
    +	 * Retrieves, but does not remove, the element (w.r.t. order) of this set,
    +	 * or returns {@code null} if this set is empty.
    +	 *
    +	 * @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty.
    +	 */
    +	@Nullable
    +	T peek();
    +
    +	/**
    +	 * Adds the given element to the set, if it is not already contained.
    +	 *
    +	 * @param toAdd the element to add to the set.
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	boolean add(@Nonnull T toAdd);
    +
    +	/**
    +	 * Removes the given element from the set, if is contained in the set.
    +	 *
    +	 * @param toRemove the element to remove.
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199567420
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
    +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.util.NoSuchElementException;
    +
    +/**
    + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
    + * based on RocksDB.
    + *
    + * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
    + * produced by the provided serializer for the elements!
    + *
    + * @param <T> the type of stored elements.
    + */
    +public class RocksDBOrderedSetStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
    +
    +	/** Serialized empty value to insert into RocksDB. */
    +	private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
    --- End diff --
    
    Empty value might take less space: `new byte[] {}` or if not supported by RocksDB, maybe easier `new byte[] { 0 }`


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199816750
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    +		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
    +		@Nonnull Set<TestElement> checkSet,
    +		int count) {
    +
    +		ThreadLocalRandom localRandom = ThreadLocalRandom.current();
    +
    +		final int numUniqueKeys = Math.max(count / 4, 64);
    +
    +		long duplicatePriority = Long.MIN_VALUE;
    +
    +		for (int i = 0; i < count; ++i) {
    +			TestElement element;
    +			do {
    +				long elementPriority;
    +				if (duplicatePriority == Long.MIN_VALUE) {
    +					elementPriority = localRandom.nextLong();
    +				} else {
    +					elementPriority = duplicatePriority;
    +					duplicatePriority = Long.MIN_VALUE;
    +				}
    +				element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
    +			} while (!checkSet.add(element));
    +
    +			if (localRandom.nextInt(10) == 0) {
    +				duplicatePriority = element.getPriority();
    +			}
    +
    +			final boolean headChangedIndicated = priorityQueue.add(element);
    +			if (element.equals(priorityQueue.peek())) {
    +				Assert.assertTrue(headChangedIndicated);
    +			}
    +		}
    +		Assert.assertEquals(count, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testPeekPollOrder() {
    +		final int initialCapacity = 4;
    +		final int testSize = 1000;
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(initialCapacity);
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		int lastSize = priorityQueue.size();
    +		Assert.assertEquals(testSize, lastSize);
    +		TestElement testElement;
    +		while ((testElement = priorityQueue.peek()) != null) {
    +			Assert.assertFalse(priorityQueue.isEmpty());
    +			Assert.assertEquals(lastSize, priorityQueue.size());
    +			Assert.assertEquals(testElement, priorityQueue.poll());
    +			Assert.assertTrue(checkSet.remove(testElement));
    +			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = testElement.getPriority();
    +			--lastSize;
    +		}
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +		Assert.assertEquals(0, priorityQueue.size());
    +		Assert.assertEquals(0, checkSet.size());
    +	}
    +
    +	@Test
    +	public void testStopInsertMixKeepsOrder() {
    --- End diff --
    
    👍 


---

[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6228
  
    @sihuazhou thanks for the fast review. I addressed all your comments.


---

[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6228
  
    +1 from my side


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199426052
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
    @@ -0,0 +1,283 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
    +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.util.NoSuchElementException;
    +
    +/**
    + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
    + * based on RocksDB.
    + *
    + * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
    + * produced by the provided serializer for the elements!
    + *
    + * @param <T> the type of stored elements.
    + */
    +public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
    +
    +	/** Serialized empty value to insert into RocksDB. */
    +	private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
    +
    +	/** The RocksDB instance that serves as store. */
    +	@Nonnull
    +	private final RocksDB db;
    +
    +	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
    +	@Nonnull
    +	private final ColumnFamilyHandle columnFamilyHandle;
    +
    +	/** Read options for RocksDB. */
    +	@Nonnull
    +	private final ReadOptions readOptions;
    +
    +	/**
    +	 * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
    +	 * aligned with their logical order.
    +	 */
    +	@Nonnull
    +	private final TypeSerializer<T> byteOrderProducingSerializer;
    +
    +	/** Wrapper to batch all writes to RocksDB. */
    +	@Nonnull
    +	private final RocksDBWriteBatchWrapper batchWrapper;
    +
    +	/** The key-group id of all elements stored in this instance. */
    +	@Nonnegative
    +	private final int keyGroupId;
    +
    +	/** The key-group id in serialized form. */
    +	@Nonnull
    +	private final byte[] groupPrefixBytes;
    +
    +	/** Output stream that helps to serialize elements. */
    +	@Nonnull
    +	private final ByteArrayOutputStreamWithPos outputStream;
    +
    +	/** Output view that helps to serialize elements, must wrap the output stream. */
    +	@Nonnull
    +	private final DataOutputViewStreamWrapper outputView;
    +
    +	public RocksDBOrderedStore(
    +		@Nonnegative int keyGroupId,
    +		@Nonnull RocksDB db,
    +		@Nonnull ColumnFamilyHandle columnFamilyHandle,
    +		@Nonnull ReadOptions readOptions,
    +		@Nonnull TypeSerializer<T> byteOrderProducingSerializer,
    +		@Nonnull ByteArrayOutputStreamWithPos outputStream,
    +		@Nonnull DataOutputViewStreamWrapper outputView,
    +		@Nonnull RocksDBWriteBatchWrapper batchWrapper) {
    +		this.db = db;
    +		this.columnFamilyHandle = columnFamilyHandle;
    +		this.readOptions = readOptions;
    +		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
    +		this.outputStream = outputStream;
    +		this.outputView = outputView;
    +		this.keyGroupId = keyGroupId;
    +		this.batchWrapper = batchWrapper;
    +		this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
    +	}
    +
    +	private byte[] createKeyGroupBytes(int keyGroupId) {
    +
    +		outputStream.reset();
    +
    +		try {
    +			outputView.writeShort(keyGroupId);
    +		} catch (IOException e) {
    +			throw new FlinkRuntimeException("Could not write key-group bytes.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	@Override
    +	public void add(@Nonnull T element) {
    +		byte[] elementBytes = serializeElement(element);
    +		try {
    +			batchWrapper.put(columnFamilyHandle, elementBytes, DUMMY_BYTES);
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException("Error while getting element from RocksDB.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void remove(@Nonnull T element) {
    +		byte[] elementBytes = serializeElement(element);
    +		try {
    +			batchWrapper.remove(columnFamilyHandle, elementBytes);
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException("Error while removing element from RocksDB.", e);
    +		}
    +	}
    +
    +	/**
    +	 * This implementation comes at a relatively high cost per invocation. It should not be called repeatedly when it is
    +	 * clear that the value did not change. Currently this is only truly used to realize certain higher-level tests.
    +	 *
    +	 * @see org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore
    +	 */
    +	@Override
    +	public int size() {
    +
    +		int count = 0;
    +		try (final RocksToJavaIteratorAdapter iterator = orderedIterator()) {
    +			while (iterator.hasNext()) {
    +				iterator.next();
    +				++count;
    +			}
    +		}
    +
    +		return count;
    +	}
    +
    +	@Nonnull
    +	@Override
    +	public RocksToJavaIteratorAdapter orderedIterator() {
    +
    +		flushWriteBatch();
    +
    +		return new RocksToJavaIteratorAdapter(
    +			new RocksIteratorWrapper(
    +				db.newIterator(columnFamilyHandle, readOptions)));
    +	}
    +
    +	/**
    +	 * Ensures that recent writes are flushed and reflect in the RocksDB instance.
    +	 */
    +	private void flushWriteBatch() {
    +		try {
    +			batchWrapper.flush();
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException(e);
    +		}
    +	}
    +
    +	private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
    +		for (int i = 0; i < prefixBytes.length; ++i) {
    +			if (bytes[i] != prefixBytes[i]) {
    +				return false;
    +			}
    +		}
    +		return true;
    +	}
    +
    +	private byte[] serializeElement(T element) {
    +		try {
    +			outputStream.reset();
    +			outputView.writeShort(keyGroupId);
    +			byteOrderProducingSerializer.serialize(element, outputView);
    +			return outputStream.toByteArray();
    +		} catch (IOException e) {
    +			throw new FlinkRuntimeException("Error while serializing the element.", e);
    +		}
    +	}
    +
    +	private T deserializeElement(byte[] bytes) {
    +		try {
    +			// TODO introduce a stream in which we can change the internal byte[] to avoid creating instances per call
    +			ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(bytes);
    +			DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
    +			inputView.readShort();
    --- End diff --
    
    `skip()` also has a return value that should be checked (in general, maybe not for this particular stream that we use), similar to what `skipBytes(...)`. I think still doing the check is cleaner and then I think we can also use `readShort`, and it is easier to identify as the counterpart of the corresponding `writeShort(...)`.


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199566880
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java ---
    @@ -0,0 +1,36 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import javax.annotation.Nonnull;
    +
    +/**
    + * Function to extract a key from a given object.
    + *
    + * @param <T> type of the element from which we extract the key.
    + */
    +@FunctionalInterface
    +public interface KeyExtractorFunction<T> {
    --- End diff --
    
    Is there a particular reason to create a named interface? It could be just Function<T, Object>.


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199567240
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java ---
    @@ -0,0 +1,280 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.runtime.state.KeyExtractorFunction;
    +import org.apache.flink.runtime.state.KeyGroupRange;
    +import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.IOUtils;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.Comparator;
    +
    +/**
    + * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and
    + * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set
    + * semantics.
    + *
    + * @param <T> the type of elements in the queue.
    + * @param <PQ> type type of sub-queue used for each key-group partition.
    + */
    +public class KeyGroupPartitionedPriorityQueue<T, PQ extends InternalPriorityQueue<T> & HeapPriorityQueueElement>
    +	implements InternalPriorityQueue<T> {
    +
    +	/** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/
    +	@Nonnull
    +	private final HeapPriorityQueue<PQ> keyGroupHeap;
    +
    +	/** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */
    +	@Nonnull
    +	private final PQ[] keyGroupLists;
    --- End diff --
    
    I would suggest to rename it to `KeyGroupQueueSets` along with `getListForElementKeyGroup` and `keyGroupCache`. Probably some left-over


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199325190
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
    @@ -0,0 +1,283 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
    +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.util.NoSuchElementException;
    +
    +/**
    + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
    + * based on RocksDB.
    + *
    + * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
    + * produced by the provided serializer for the elements!
    + *
    + * @param <T> the type of stored elements.
    + */
    +public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
    --- End diff --
    
    Does `RocksDBOrderedStore` mean `RocksDBOrderedSetStore`?


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199320254
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java ---
    @@ -0,0 +1,283 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
    +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.FlinkRuntimeException;
    +
    +import org.rocksdb.ColumnFamilyHandle;
    +import org.rocksdb.ReadOptions;
    +import org.rocksdb.RocksDB;
    +import org.rocksdb.RocksDBException;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.util.NoSuchElementException;
    +
    +/**
    + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
    + * based on RocksDB.
    + *
    + * <p>IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences
    + * produced by the provided serializer for the elements!
    + *
    + * @param <T> the type of stored elements.
    + */
    +public class RocksDBOrderedStore<T> implements CachingInternalPriorityQueueSet.OrderedSetStore<T> {
    +
    +	/** Serialized empty value to insert into RocksDB. */
    +	private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET);
    +
    +	/** The RocksDB instance that serves as store. */
    +	@Nonnull
    +	private final RocksDB db;
    +
    +	/** Handle to the column family of the RocksDB instance in which the elements are stored. */
    +	@Nonnull
    +	private final ColumnFamilyHandle columnFamilyHandle;
    +
    +	/** Read options for RocksDB. */
    +	@Nonnull
    +	private final ReadOptions readOptions;
    +
    +	/**
    +	 * Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be
    +	 * aligned with their logical order.
    +	 */
    +	@Nonnull
    +	private final TypeSerializer<T> byteOrderProducingSerializer;
    +
    +	/** Wrapper to batch all writes to RocksDB. */
    +	@Nonnull
    +	private final RocksDBWriteBatchWrapper batchWrapper;
    +
    +	/** The key-group id of all elements stored in this instance. */
    +	@Nonnegative
    +	private final int keyGroupId;
    +
    +	/** The key-group id in serialized form. */
    +	@Nonnull
    +	private final byte[] groupPrefixBytes;
    +
    +	/** Output stream that helps to serialize elements. */
    +	@Nonnull
    +	private final ByteArrayOutputStreamWithPos outputStream;
    +
    +	/** Output view that helps to serialize elements, must wrap the output stream. */
    +	@Nonnull
    +	private final DataOutputViewStreamWrapper outputView;
    +
    +	public RocksDBOrderedStore(
    +		@Nonnegative int keyGroupId,
    +		@Nonnull RocksDB db,
    +		@Nonnull ColumnFamilyHandle columnFamilyHandle,
    +		@Nonnull ReadOptions readOptions,
    +		@Nonnull TypeSerializer<T> byteOrderProducingSerializer,
    +		@Nonnull ByteArrayOutputStreamWithPos outputStream,
    +		@Nonnull DataOutputViewStreamWrapper outputView,
    +		@Nonnull RocksDBWriteBatchWrapper batchWrapper) {
    +		this.db = db;
    +		this.columnFamilyHandle = columnFamilyHandle;
    +		this.readOptions = readOptions;
    +		this.byteOrderProducingSerializer = byteOrderProducingSerializer;
    +		this.outputStream = outputStream;
    +		this.outputView = outputView;
    +		this.keyGroupId = keyGroupId;
    +		this.batchWrapper = batchWrapper;
    +		this.groupPrefixBytes = createKeyGroupBytes(keyGroupId);
    +	}
    +
    +	private byte[] createKeyGroupBytes(int keyGroupId) {
    +
    +		outputStream.reset();
    +
    +		try {
    +			outputView.writeShort(keyGroupId);
    +		} catch (IOException e) {
    +			throw new FlinkRuntimeException("Could not write key-group bytes.", e);
    +		}
    +
    +		return outputStream.toByteArray();
    +	}
    +
    +	@Override
    +	public void add(@Nonnull T element) {
    +		byte[] elementBytes = serializeElement(element);
    +		try {
    +			batchWrapper.put(columnFamilyHandle, elementBytes, DUMMY_BYTES);
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException("Error while getting element from RocksDB.", e);
    +		}
    +	}
    +
    +	@Override
    +	public void remove(@Nonnull T element) {
    +		byte[] elementBytes = serializeElement(element);
    +		try {
    +			batchWrapper.remove(columnFamilyHandle, elementBytes);
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException("Error while removing element from RocksDB.", e);
    +		}
    +	}
    +
    +	/**
    +	 * This implementation comes at a relatively high cost per invocation. It should not be called repeatedly when it is
    +	 * clear that the value did not change. Currently this is only truly used to realize certain higher-level tests.
    +	 *
    +	 * @see org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore
    +	 */
    +	@Override
    +	public int size() {
    +
    +		int count = 0;
    +		try (final RocksToJavaIteratorAdapter iterator = orderedIterator()) {
    +			while (iterator.hasNext()) {
    +				iterator.next();
    +				++count;
    +			}
    +		}
    +
    +		return count;
    +	}
    +
    +	@Nonnull
    +	@Override
    +	public RocksToJavaIteratorAdapter orderedIterator() {
    +
    +		flushWriteBatch();
    +
    +		return new RocksToJavaIteratorAdapter(
    +			new RocksIteratorWrapper(
    +				db.newIterator(columnFamilyHandle, readOptions)));
    +	}
    +
    +	/**
    +	 * Ensures that recent writes are flushed and reflect in the RocksDB instance.
    +	 */
    +	private void flushWriteBatch() {
    +		try {
    +			batchWrapper.flush();
    +		} catch (RocksDBException e) {
    +			throw new FlinkRuntimeException(e);
    +		}
    +	}
    +
    +	private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
    +		for (int i = 0; i < prefixBytes.length; ++i) {
    +			if (bytes[i] != prefixBytes[i]) {
    +				return false;
    +			}
    +		}
    +		return true;
    +	}
    +
    +	private byte[] serializeElement(T element) {
    +		try {
    +			outputStream.reset();
    +			outputView.writeShort(keyGroupId);
    +			byteOrderProducingSerializer.serialize(element, outputView);
    +			return outputStream.toByteArray();
    +		} catch (IOException e) {
    +			throw new FlinkRuntimeException("Error while serializing the element.", e);
    +		}
    +	}
    +
    +	private T deserializeElement(byte[] bytes) {
    +		try {
    +			// TODO introduce a stream in which we can change the internal byte[] to avoid creating instances per call
    +			ByteArrayInputStreamWithPos inputStream = new ByteArrayInputStreamWithPos(bytes);
    +			DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
    +			inputView.readShort();
    --- End diff --
    
    Maybe we could use `inputView.skip(2)` here.


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199806997
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    +		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
    +		@Nonnull Set<TestElement> checkSet,
    +		int count) {
    +
    +		ThreadLocalRandom localRandom = ThreadLocalRandom.current();
    +
    +		final int numUniqueKeys = Math.max(count / 4, 64);
    +
    +		long duplicatePriority = Long.MIN_VALUE;
    +
    +		for (int i = 0; i < count; ++i) {
    +			TestElement element;
    +			do {
    +				long elementPriority;
    +				if (duplicatePriority == Long.MIN_VALUE) {
    +					elementPriority = localRandom.nextLong();
    +				} else {
    +					elementPriority = duplicatePriority;
    +					duplicatePriority = Long.MIN_VALUE;
    +				}
    +				element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
    +			} while (!checkSet.add(element));
    +
    +			if (localRandom.nextInt(10) == 0) {
    +				duplicatePriority = element.getPriority();
    +			}
    +
    +			final boolean headChangedIndicated = priorityQueue.add(element);
    +			if (element.equals(priorityQueue.peek())) {
    +				Assert.assertTrue(headChangedIndicated);
    +			}
    +		}
    +		Assert.assertEquals(count, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testPeekPollOrder() {
    +		final int initialCapacity = 4;
    +		final int testSize = 1000;
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(initialCapacity);
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		int lastSize = priorityQueue.size();
    +		Assert.assertEquals(testSize, lastSize);
    +		TestElement testElement;
    +		while ((testElement = priorityQueue.peek()) != null) {
    +			Assert.assertFalse(priorityQueue.isEmpty());
    +			Assert.assertEquals(lastSize, priorityQueue.size());
    +			Assert.assertEquals(testElement, priorityQueue.poll());
    +			Assert.assertTrue(checkSet.remove(testElement));
    +			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = testElement.getPriority();
    +			--lastSize;
    +		}
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +		Assert.assertEquals(0, priorityQueue.size());
    +		Assert.assertEquals(0, checkSet.size());
    +	}
    +
    +	@Test
    +	public void testStopInsertMixKeepsOrder() {
    +
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		final int testSize = 128;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		// check that the whole set is still in order
    +		while (!checkSet.isEmpty()) {
    +
    +			Iterator<TestElement> iterator = checkSet.iterator();
    +			TestElement element = iterator.next();
    +			iterator.remove();
    +
    +			boolean removesHead = element.equals(priorityQueue.peek());
    +			if (removesHead) {
    +				Assert.assertTrue(priorityQueue.remove(element));
    +			} else {
    +				priorityQueue.remove(element);
    +			}
    +			Assert.assertEquals(checkSet.size(), priorityQueue.size());
    +
    +			long lastPriorityValue = Long.MIN_VALUE;
    +
    +			while ((element = priorityQueue.poll()) != null) {
    +				Assert.assertTrue(element.getPriority() >= lastPriorityValue);
    +				lastPriorityValue = element.getPriority();
    +			}
    +
    +			Assert.assertTrue(priorityQueue.isEmpty());
    +
    +			priorityQueue.addAll(checkSet);
    +		}
    +	}
    +
    +	@Test
    +	public void testPoll() {
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		Assert.assertNull(priorityQueue.poll());
    +
    +		final int testSize = 345;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		while (!priorityQueue.isEmpty()) {
    +			TestElement removed = priorityQueue.poll();
    +			Assert.assertNotNull(removed);
    +			Assert.assertTrue(checkSet.remove(removed));
    +			Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = removed.getPriority();
    +		}
    +		Assert.assertTrue(checkSet.isEmpty());
    +
    +		Assert.assertNull(priorityQueue.poll());
    +	}
    +
    +	@Test
    +	public void testIsEmpty() {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +
    +		Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
    +		Assert.assertFalse(priorityQueue.isEmpty());
    +
    +		priorityQueue.poll();
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +	}
    +
    +	@Test
    +	public void testBulkAddRestoredTimers() throws Exception {
    +		final int testSize = 10;
    +		HashSet<TestElement> elementSet = new HashSet<>(testSize);
    +		for (int i = 0; i < testSize; ++i) {
    +			elementSet.add(new TestElement(i, i));
    +		}
    +
    +		List<TestElement> twoTimesElementSet = new ArrayList<>(elementSet.size() * 2);
    +
    +		for (TestElement testElement : elementSet) {
    +			twoTimesElementSet.add(testElement.deepCopy());
    +			twoTimesElementSet.add(testElement.deepCopy());
    +		}
    +
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		priorityQueue.addAll(twoTimesElementSet);
    +		priorityQueue.addAll(elementSet);
    +
    +		final int expectedSize = testSetSemantics() ? elementSet.size() : 3 * elementSet.size();
    +
    +		Assert.assertEquals(expectedSize, priorityQueue.size());
    +		try (final CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
    +			while (iterator.hasNext()) {
    +				if (testSetSemantics()) {
    +					Assert.assertTrue(elementSet.remove(iterator.next()));
    +				} else {
    +					Assert.assertTrue(elementSet.contains(iterator.next()));
    +				}
    +			}
    +		}
    +		if (testSetSemantics()) {
    +			Assert.assertTrue(elementSet.isEmpty());
    +		}
    +	}
    +
    +	@Test
    +	public void testIterator() throws Exception {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		// test empty iterator
    +		try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
    +			Assert.assertFalse(iterator.hasNext());
    +			try {
    +				iterator.next();
    +				Assert.fail();
    +			} catch (NoSuchElementException ignore) {
    +			}
    +		}
    +
    +		// iterate some data
    +		final int testSize = 10;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +		try (CloseableIterator<TestElement> iterator = priorityQueue.iterator()) {
    +			while (iterator.hasNext()) {
    +				Assert.assertTrue(checkSet.remove(iterator.next()));
    +			}
    +			Assert.assertTrue(checkSet.isEmpty());
    +		}
    +	}
    +
    +	@Test
    +	public void testAdd() {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		TestElement lowPrioElement = new TestElement(4711L, 42L);
    +		TestElement highPrioElement = new TestElement(815L, 23L);
    +		Assert.assertTrue(priorityQueue.add(lowPrioElement));
    +		if (testSetSemantics()) {
    +			priorityQueue.add(lowPrioElement.deepCopy());
    +		}
    +		Assert.assertEquals(1, priorityQueue.size());
    +		Assert.assertTrue(priorityQueue.add(highPrioElement));
    +		Assert.assertEquals(2, priorityQueue.size());
    +		Assert.assertEquals(highPrioElement, priorityQueue.poll());
    +		Assert.assertEquals(1, priorityQueue.size());
    +		Assert.assertEquals(lowPrioElement, priorityQueue.poll());
    +		Assert.assertEquals(0, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testRemove() {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		final long key = 4711L;
    +		final long priorityValue = 42L;
    +		final TestElement testElement = new TestElement(key, priorityValue);
    +		if (testSetSemantics()) {
    +			Assert.assertFalse(priorityQueue.remove(testElement));
    +		}
    +		Assert.assertTrue(priorityQueue.add(testElement));
    +		Assert.assertTrue(priorityQueue.remove(testElement));
    +		if (testSetSemantics()) {
    +			Assert.assertFalse(priorityQueue.remove(testElement));
    +		}
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +	}
    +
    +	protected abstract InternalPriorityQueue<TestElement> newPriorityQueue(int initialCapacity);
    +
    +	protected abstract boolean testSetSemantics();
    --- End diff --
    
    maybe more readable `testSetSemanticsUniqueElements` or something


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199817158
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    +		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
    +		@Nonnull Set<TestElement> checkSet,
    +		int count) {
    +
    +		ThreadLocalRandom localRandom = ThreadLocalRandom.current();
    +
    +		final int numUniqueKeys = Math.max(count / 4, 64);
    +
    +		long duplicatePriority = Long.MIN_VALUE;
    +
    +		for (int i = 0; i < count; ++i) {
    +			TestElement element;
    +			do {
    +				long elementPriority;
    +				if (duplicatePriority == Long.MIN_VALUE) {
    +					elementPriority = localRandom.nextLong();
    +				} else {
    +					elementPriority = duplicatePriority;
    +					duplicatePriority = Long.MIN_VALUE;
    +				}
    +				element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
    +			} while (!checkSet.add(element));
    +
    +			if (localRandom.nextInt(10) == 0) {
    +				duplicatePriority = element.getPriority();
    +			}
    +
    +			final boolean headChangedIndicated = priorityQueue.add(element);
    +			if (element.equals(priorityQueue.peek())) {
    +				Assert.assertTrue(headChangedIndicated);
    +			}
    +		}
    +		Assert.assertEquals(count, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testPeekPollOrder() {
    +		final int initialCapacity = 4;
    +		final int testSize = 1000;
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(initialCapacity);
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		int lastSize = priorityQueue.size();
    +		Assert.assertEquals(testSize, lastSize);
    +		TestElement testElement;
    +		while ((testElement = priorityQueue.peek()) != null) {
    +			Assert.assertFalse(priorityQueue.isEmpty());
    +			Assert.assertEquals(lastSize, priorityQueue.size());
    +			Assert.assertEquals(testElement, priorityQueue.poll());
    +			Assert.assertTrue(checkSet.remove(testElement));
    +			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = testElement.getPriority();
    +			--lastSize;
    +		}
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +		Assert.assertEquals(0, priorityQueue.size());
    +		Assert.assertEquals(0, checkSet.size());
    +	}
    +
    +	@Test
    +	public void testStopInsertMixKeepsOrder() {
    +
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		final int testSize = 128;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		// check that the whole set is still in order
    +		while (!checkSet.isEmpty()) {
    +
    +			Iterator<TestElement> iterator = checkSet.iterator();
    +			TestElement element = iterator.next();
    +			iterator.remove();
    +
    +			boolean removesHead = element.equals(priorityQueue.peek());
    +			if (removesHead) {
    +				Assert.assertTrue(priorityQueue.remove(element));
    +			} else {
    +				priorityQueue.remove(element);
    +			}
    +			Assert.assertEquals(checkSet.size(), priorityQueue.size());
    +
    +			long lastPriorityValue = Long.MIN_VALUE;
    --- End diff --
    
    👍 


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199567167
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java ---
    @@ -0,0 +1,341 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.heap;
    +
    +import org.apache.flink.runtime.state.InternalPriorityQueue;
    +import org.apache.flink.util.CloseableIterator;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.Iterator;
    +import java.util.NoSuchElementException;
    +
    +import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
    +
    +/**
    + * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes
    + * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is
    + * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array
    + * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and
    + * not on equals.
    + *
    + * <p>Possible future improvements:
    + * <ul>
    + *  <li>We could also implement shrinking for the heap.</li>
    + * </ul>
    + *
    + * @param <T> type of the contained elements.
    + */
    +public class HeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
    +
    +	/**
    +	 * The index of the head element in the array that represents the heap.
    +	 */
    +	private static final int QUEUE_HEAD_INDEX = 1;
    +
    +	/**
    +	 * Comparator for the contained elements.
    +	 */
    +	private final Comparator<T> elementComparator;
    +
    +	/**
    +	 * The array that represents the heap-organized priority queue.
    +	 */
    +	private T[] queue;
    +
    +	/**
    +	 * The current size of the priority queue.
    +	 */
    +	private int size;
    +
    +	/**
    +	 * Creates an empty {@link HeapPriorityQueue} with the requested initial capacity.
    +	 *
    +	 * @param elementComparator comparator for the contained elements.
    +	 * @param minimumCapacity the minimum and initial capacity of this priority queue.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public HeapPriorityQueue(
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnegative int minimumCapacity) {
    +
    +		this.elementComparator = elementComparator;
    +		this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
    +	}
    +
    +	@Override
    +	@Nullable
    +	public T poll() {
    +		return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null;
    +	}
    +
    +	@Override
    +	@Nullable
    +	public T peek() {
    +		return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
    +	}
    +
    +	/**
    +	 * Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}.
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean add(@Nonnull T toAdd) {
    +		return addInternal(toAdd);
    +	}
    +
    +	/**
    +	 * This remove is based on object identity, not the result of equals.
    +	 *
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	@Override
    +	public boolean remove(@Nonnull T toStop) {
    +		return removeInternal(toStop);
    +	}
    +
    +	@Override
    +	public boolean isEmpty() {
    +		return size() == 0;
    +	}
    +
    +	@Override
    +	@Nonnegative
    +	public int size() {
    +		return size;
    +	}
    +
    +	public void clear() {
    +		size = 0;
    +		Arrays.fill(queue, null);
    +	}
    +
    +	@SuppressWarnings({"unchecked"})
    +	@Nonnull
    +	public <O> O[] toArray(O[] out) {
    --- End diff --
    
    Why is it another arbitrary parameter `O`? `T` seems to work, at least for tests.
    Also @Nonnull for out.


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199817285
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java ---
    @@ -0,0 +1,452 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.util.CloseableIterator;
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.TestLogger;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.NoSuchElementException;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +/**
    + * Testbase for implementations of {@link InternalPriorityQueue}.
    + */
    +public abstract class InternalPriorityQueueTestBase extends TestLogger {
    +
    +	protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2);
    +	protected static final KeyExtractorFunction<TestElement> KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
    +	protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
    +		Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
    +
    +	protected static void insertRandomTimers(
    +		@Nonnull InternalPriorityQueue<TestElement> priorityQueue,
    +		@Nonnull Set<TestElement> checkSet,
    +		int count) {
    +
    +		ThreadLocalRandom localRandom = ThreadLocalRandom.current();
    +
    +		final int numUniqueKeys = Math.max(count / 4, 64);
    +
    +		long duplicatePriority = Long.MIN_VALUE;
    +
    +		for (int i = 0; i < count; ++i) {
    +			TestElement element;
    +			do {
    +				long elementPriority;
    +				if (duplicatePriority == Long.MIN_VALUE) {
    +					elementPriority = localRandom.nextLong();
    +				} else {
    +					elementPriority = duplicatePriority;
    +					duplicatePriority = Long.MIN_VALUE;
    +				}
    +				element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
    +			} while (!checkSet.add(element));
    +
    +			if (localRandom.nextInt(10) == 0) {
    +				duplicatePriority = element.getPriority();
    +			}
    +
    +			final boolean headChangedIndicated = priorityQueue.add(element);
    +			if (element.equals(priorityQueue.peek())) {
    +				Assert.assertTrue(headChangedIndicated);
    +			}
    +		}
    +		Assert.assertEquals(count, priorityQueue.size());
    +	}
    +
    +	@Test
    +	public void testPeekPollOrder() {
    +		final int initialCapacity = 4;
    +		final int testSize = 1000;
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(initialCapacity);
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		int lastSize = priorityQueue.size();
    +		Assert.assertEquals(testSize, lastSize);
    +		TestElement testElement;
    +		while ((testElement = priorityQueue.peek()) != null) {
    +			Assert.assertFalse(priorityQueue.isEmpty());
    +			Assert.assertEquals(lastSize, priorityQueue.size());
    +			Assert.assertEquals(testElement, priorityQueue.poll());
    +			Assert.assertTrue(checkSet.remove(testElement));
    +			Assert.assertTrue(testElement.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = testElement.getPriority();
    +			--lastSize;
    +		}
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +		Assert.assertEquals(0, priorityQueue.size());
    +		Assert.assertEquals(0, checkSet.size());
    +	}
    +
    +	@Test
    +	public void testStopInsertMixKeepsOrder() {
    +
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		final int testSize = 128;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		// check that the whole set is still in order
    +		while (!checkSet.isEmpty()) {
    +
    +			Iterator<TestElement> iterator = checkSet.iterator();
    +			TestElement element = iterator.next();
    +			iterator.remove();
    +
    +			boolean removesHead = element.equals(priorityQueue.peek());
    +			if (removesHead) {
    +				Assert.assertTrue(priorityQueue.remove(element));
    +			} else {
    +				priorityQueue.remove(element);
    +			}
    +			Assert.assertEquals(checkSet.size(), priorityQueue.size());
    +
    +			long lastPriorityValue = Long.MIN_VALUE;
    +
    +			while ((element = priorityQueue.poll()) != null) {
    +				Assert.assertTrue(element.getPriority() >= lastPriorityValue);
    +				lastPriorityValue = element.getPriority();
    +			}
    +
    +			Assert.assertTrue(priorityQueue.isEmpty());
    +
    +			priorityQueue.addAll(checkSet);
    +		}
    +	}
    +
    +	@Test
    +	public void testPoll() {
    +		InternalPriorityQueue<TestElement> priorityQueue = newPriorityQueue(3);
    +
    +		Assert.assertNull(priorityQueue.poll());
    +
    +		final int testSize = 345;
    +		HashSet<TestElement> checkSet = new HashSet<>(testSize);
    +		insertRandomTimers(priorityQueue, checkSet, testSize);
    +
    +		long lastPriorityValue = Long.MIN_VALUE;
    +		while (!priorityQueue.isEmpty()) {
    +			TestElement removed = priorityQueue.poll();
    +			Assert.assertNotNull(removed);
    +			Assert.assertTrue(checkSet.remove(removed));
    +			Assert.assertTrue(removed.getPriority() >= lastPriorityValue);
    +			lastPriorityValue = removed.getPriority();
    +		}
    +		Assert.assertTrue(checkSet.isEmpty());
    +
    +		Assert.assertNull(priorityQueue.poll());
    +	}
    +
    +	@Test
    +	public void testIsEmpty() {
    +		InternalPriorityQueue<TestElement> priorityQueue =
    +			newPriorityQueue(1);
    +
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +
    +		Assert.assertTrue(priorityQueue.add(new TestElement(4711L, 42L)));
    +		Assert.assertFalse(priorityQueue.isEmpty());
    +
    +		priorityQueue.poll();
    +		Assert.assertTrue(priorityQueue.isEmpty());
    +	}
    +
    +	@Test
    +	public void testBulkAddRestoredTimers() throws Exception {
    --- End diff --
    
    👍 


---

[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6228
  
    @StefanRRichter very nice PR, I only had some very minor comments.


---

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6228#discussion_r199473489
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.util.CloseableIterator;
    +
    +import javax.annotation.Nonnegative;
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +
    +/**
    + * Interface for collection that gives in order access to elements w.r.t their priority.
    + *
    + * @param <T> type of elements in the ordered set.
    + */
    +@Internal
    +public interface InternalPriorityQueue<T> {
    +
    +	/**
    +	 * Retrieves and removes the first element (w.r.t. the order) of this set,
    +	 * or returns {@code null} if this set is empty.
    +	 *
    +	 * @return the first element of this ordered set, or {@code null} if this set is empty.
    +	 */
    +	@Nullable
    +	T poll();
    +
    +	/**
    +	 * Retrieves, but does not remove, the element (w.r.t. order) of this set,
    +	 * or returns {@code null} if this set is empty.
    +	 *
    +	 * @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty.
    +	 */
    +	@Nullable
    +	T peek();
    +
    +	/**
    +	 * Adds the given element to the set, if it is not already contained.
    +	 *
    +	 * @param toAdd the element to add to the set.
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    +	 */
    +	boolean add(@Nonnull T toAdd);
    +
    +	/**
    +	 * Removes the given element from the set, if is contained in the set.
    +	 *
    +	 * @param toRemove the element to remove.
    +	 * @return <code>true</code> if the operation changed the head element or if is it unclear if the head element changed.
    +	 * Only returns <code>false</code> iff the head element was not changed by this operation.
    --- End diff --
    
    typo: if **it is** unclear
    and **iff** already has only :)


---