You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/07/06 16:29:36 UTC

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/6276

    [FLINK-9486] Introduce TimerState in keyed state backend

    ## What is the purpose of the change
    
    This PR integrates `InternalTimerQueue` with keyed state backends (Heap and RocksDB), so that we can use the RocksDB-based version in the job for the first time. 
    
    We introduce the interface `KeyGroupPartitionedPriorityQueue` as an easy adapter to existing snapshotting code. This can probably be removed once the queues are fully integrated with the backend's snapshotting, in a followup PR. 
    
    The PR also addresses an issue with the `TreeOrderedCache` that requires a "proper" `Comparator` (implemented in `TieBreakingPriorityComparator`) and we introduce `PriorityComparator` to give more emphasize to this difference. `TieBreakingPriorityComparator` is likely to also go away once we come up with an improved caching that is not simply based on a tree.
    
    We introduce `PriorityQueueSetFactory` to the keyed state backends, and this is were the queues are build. The current implementation of RocksDB uses an additional RocksDB instance until we are fully integrated with backend snapshotting, because we are otherwise running into trouble with incremental snapshots.
    
    A configuration parameter is introduced to chose the implementation of queues for RocksDB, the default is still using the heap variant for now.
    
    Finally, we introduce an additional method for bulk polling in the `InternalTimerQueue` interface that opens up future optimizations.
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as `AbstractEventTimeWindowCheckpointingITCase`, but you would currently need to activate it via 
    `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE`.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes, if activated)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink integrateSetStateWithBackends

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6276.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6276
    
----
commit 0d8743e52a658876425b6cef03fef3fef2d09deb
Author: Stefan Richter <s....@...>
Date:   2018-07-04T11:43:49Z

    Remove read options from RocksDBOrderedSetStore

commit 84b1b36357322cf23d50396cbfa0725db95797ea
Author: Stefan Richter <s....@...>
Date:   2018-07-04T11:51:14Z

    Introduce (temporary?/visible for testing) KeyGroupPartitionedPriorityQueue interface to work with the existing snapshotting

commit 35e02705f6740854ae18a92b5a6dfbafd3201a8f
Author: Stefan Richter <s....@...>
Date:   2018-07-04T16:07:54Z

    Basic integration with backends / make Rocks timers work

commit 1294ac356162430cf9de86980de1d4a0154f33b8
Author: Stefan Richter <s....@...>
Date:   2018-07-05T16:46:34Z

    Introduce PriorityComparator and tie breaking variant as adapter to collections that require a comparator.
    
    This is required because the tree set that we use in the cache expects that Comparators are aligned with Object#equals

commit bfd3a12e77348a79c91656d80a7a67ece9825103
Author: Stefan Richter <s....@...>
Date:   2018-07-05T19:35:08Z

    Iterator directly from cache if no store-only elements.

commit fbf26f1f2efbe1e2029d09d297808e26e08b87d8
Author: Stefan Richter <s....@...>
Date:   2018-07-06T08:22:49Z

    Use a dedicated RocksDB instance for priority queue state. We can revert
    this once priority queue state is properly integrated with the
    snapshotting. Until then, we must isolate the priority queue state in
    a separate db or else incremental checkpoints will break.

commit 75cb05ab21e07eaed25e1cac048919f7f313b3f6
Author: Stefan Richter <s....@...>
Date:   2018-07-06T13:55:02Z

    Configuration part

commit 7a86e268072ec4ad9d9fae2fa8e852b66d4424a8
Author: Stefan Richter <s....@...>
Date:   2018-07-06T14:48:53Z

    Introduce bulk poll method in queue to open up future optimizations

----


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201015258
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TestPriorityQueueSetFactory.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.Comparator;
    +
    +/**
    + * Test implementation of a {@link PriorityQueueSetFactory}.
    + */
    +public class TestPriorityQueueSetFactory implements PriorityQueueSetFactory {
    +
    +	private final KeyGroupRange keyGroupRange;
    +	private final int totalkeyGroups;
    +
    +	public TestPriorityQueueSetFactory(KeyGroupRange keyGroupRange, int totalKeyGroups) {
    +		this.keyGroupRange = keyGroupRange;
    +		this.totalkeyGroups = totalKeyGroups;
    +	}
    +
    +	@Override
    +	public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
    +		@Nonnull String stateName,
    +		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnull KeyExtractorFunction<T> keyExtractor) {
    --- End diff --
    
    Please indent one level more


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201017540
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -2579,4 +2604,85 @@ public static RocksIteratorWrapper getRocksIterator(
     		ReadOptions readOptions) {
     		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
     	}
    +
    +	/**
    +	 * Encapsulates the logic and resources in connection with creating priority queue state structures.
    +	 */
    +	class RocksDBPriorityQueueFactory implements PriorityQueueSetFactory, AutoCloseable {
    +
    +		/** Default cache size per key-group. */
    +		private static final int DEFAULT_CACHES_SIZE = 8 * 1024;
    +
    +		/** A shared buffer to serialize elements for the priority queue. */
    +		@Nonnull
    +		private final ByteArrayOutputStreamWithPos elementSerializationOutStream;
    +
    +		/** A shared adapter wrapper around elementSerializationOutStream to become a {@link DataOutputView}. */
    +		@Nonnull
    +		private final DataOutputViewStreamWrapper elementSerializationOutView;
    +
    +		/** A shared {@link RocksDBWriteBatchWrapper} to batch modifications to priority queues. */
    +		@Nonnull
    +		private final RocksDBWriteBatchWrapper writeBatchWrapper;
    +
    +		/** Map to track all column families created to back priority queues. */
    +		@Nonnull
    +		private final Map<String, ColumnFamilyHandle> priorityQueueColumnFamilies;
    +
    +		RocksDBPriorityQueueFactory() {
    +			this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
    +			this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream);
    +			this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions);
    +			this.priorityQueueColumnFamilies = new HashMap<>();
    +		}
    +
    +		@Override
    +		public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
    +			@Nonnull String stateName,
    +			@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
    +			@Nonnull Comparator<T> elementComparator,
    +			@Nonnull KeyExtractorFunction<T> keyExtractor) {
    +
    +			final ColumnFamilyHandle columnFamilyHandle =
    +				priorityQueueColumnFamilies.computeIfAbsent(stateName, RocksDBKeyedStateBackend.this::createColumnFamily);
    +
    +			return new KeyGroupPartitionedPriorityQueue<>(
    +				keyExtractor,
    +				elementComparator,
    +				new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, CachingInternalPriorityQueueSet<T>>() {
    --- End diff --
    
    I'm not a super huge fan of anonymous internal classes. Could we make this a class instead? I think this would reduce the length of this method.


---

[GitHub] flink issue #6276: [FLINK-9486] Introduce TimerState in keyed state backend

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6276
  
    CC @tillrohrmann 


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201016908
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -2579,4 +2604,85 @@ public static RocksIteratorWrapper getRocksIterator(
     		ReadOptions readOptions) {
     		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
     	}
    +
    +	/**
    +	 * Encapsulates the logic and resources in connection with creating priority queue state structures.
    +	 */
    +	class RocksDBPriorityQueueFactory implements PriorityQueueSetFactory, AutoCloseable {
    +
    +		/** Default cache size per key-group. */
    +		private static final int DEFAULT_CACHES_SIZE = 8 * 1024;
    +
    +		/** A shared buffer to serialize elements for the priority queue. */
    +		@Nonnull
    +		private final ByteArrayOutputStreamWithPos elementSerializationOutStream;
    +
    +		/** A shared adapter wrapper around elementSerializationOutStream to become a {@link DataOutputView}. */
    +		@Nonnull
    +		private final DataOutputViewStreamWrapper elementSerializationOutView;
    +
    +		/** A shared {@link RocksDBWriteBatchWrapper} to batch modifications to priority queues. */
    +		@Nonnull
    +		private final RocksDBWriteBatchWrapper writeBatchWrapper;
    +
    +		/** Map to track all column families created to back priority queues. */
    +		@Nonnull
    +		private final Map<String, ColumnFamilyHandle> priorityQueueColumnFamilies;
    +
    +		RocksDBPriorityQueueFactory() {
    +			this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
    +			this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream);
    +			this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions);
    +			this.priorityQueueColumnFamilies = new HashMap<>();
    --- End diff --
    
    Initial capacity is missing


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201015311
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/TestPriorityQueueSetFactory.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.Comparator;
    +
    +/**
    + * Test implementation of a {@link PriorityQueueSetFactory}.
    + */
    +public class TestPriorityQueueSetFactory implements PriorityQueueSetFactory {
    +
    +	private final KeyGroupRange keyGroupRange;
    +	private final int totalkeyGroups;
    +
    +	public TestPriorityQueueSetFactory(KeyGroupRange keyGroupRange, int totalKeyGroups) {
    +		this.keyGroupRange = keyGroupRange;
    +		this.totalkeyGroups = totalKeyGroups;
    +	}
    +
    +	@Override
    +	public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
    +		@Nonnull String stateName,
    +		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnull KeyExtractorFunction<T> keyExtractor) {
    +		return new HeapPriorityQueueSet<>(elementComparator, keyExtractor, 128,keyGroupRange, totalkeyGroups);
    --- End diff --
    
    Whitespace after `,` is missing.


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201017209
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -2579,4 +2604,85 @@ public static RocksIteratorWrapper getRocksIterator(
     		ReadOptions readOptions) {
     		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
     	}
    +
    +	/**
    +	 * Encapsulates the logic and resources in connection with creating priority queue state structures.
    +	 */
    +	class RocksDBPriorityQueueFactory implements PriorityQueueSetFactory, AutoCloseable {
    +
    +		/** Default cache size per key-group. */
    +		private static final int DEFAULT_CACHES_SIZE = 8 * 1024;
    +
    +		/** A shared buffer to serialize elements for the priority queue. */
    +		@Nonnull
    +		private final ByteArrayOutputStreamWithPos elementSerializationOutStream;
    +
    +		/** A shared adapter wrapper around elementSerializationOutStream to become a {@link DataOutputView}. */
    +		@Nonnull
    +		private final DataOutputViewStreamWrapper elementSerializationOutView;
    +
    +		/** A shared {@link RocksDBWriteBatchWrapper} to batch modifications to priority queues. */
    +		@Nonnull
    +		private final RocksDBWriteBatchWrapper writeBatchWrapper;
    +
    +		/** Map to track all column families created to back priority queues. */
    +		@Nonnull
    +		private final Map<String, ColumnFamilyHandle> priorityQueueColumnFamilies;
    +
    +		RocksDBPriorityQueueFactory() {
    +			this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
    +			this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream);
    +			this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions);
    +			this.priorityQueueColumnFamilies = new HashMap<>();
    +		}
    +
    +		@Override
    +		public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
    +			@Nonnull String stateName,
    +			@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
    +			@Nonnull Comparator<T> elementComparator,
    +			@Nonnull KeyExtractorFunction<T> keyExtractor) {
    +
    +			final ColumnFamilyHandle columnFamilyHandle =
    +				priorityQueueColumnFamilies.computeIfAbsent(stateName, RocksDBKeyedStateBackend.this::createColumnFamily);
    +
    +			return new KeyGroupPartitionedPriorityQueue<>(
    +				keyExtractor,
    +				elementComparator,
    +				new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, CachingInternalPriorityQueueSet<T>>() {
    +					@Nonnull
    +					@Override
    +					public CachingInternalPriorityQueueSet<T> create(
    +						int keyGroupId,
    +						int numKeyGroups,
    +						@Nonnull Comparator<T> elementComparator) {
    --- End diff --
    
    Indentation


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201011077
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java ---
    @@ -0,0 +1,31 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.Set;
    +
    +/**
    + *
    + */
    +@Nonnull
    +public interface KeyGroupedInternalPriorityQueue<T> extends InternalPriorityQueue<T> {
    +	Set<T> getSubsetForKeyGroup(int keyGroupId);
    --- End diff --
    
    Same here for the JavaDocs


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201013659
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java ---
    @@ -0,0 +1,31 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.Set;
    +
    +/**
    + *
    + */
    +@Nonnull
    +public interface KeyGroupedInternalPriorityQueue<T> extends InternalPriorityQueue<T> {
    +	Set<T> getSubsetForKeyGroup(int keyGroupId);
    --- End diff --
    
    They are all added, in one of the later commits.


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201016373
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -394,6 +407,16 @@ public void dispose() {
     		}
     	}
     
    +	@Override
    +	public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
    +		@Nonnull String stateName,
    +		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnull KeyExtractorFunction<T> keyExtractor) {
    --- End diff --
    
    Indentation


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201016655
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -2579,4 +2604,85 @@ public static RocksIteratorWrapper getRocksIterator(
     		ReadOptions readOptions) {
     		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
     	}
    +
    +	/**
    +	 * Encapsulates the logic and resources in connection with creating priority queue state structures.
    +	 */
    +	class RocksDBPriorityQueueFactory implements PriorityQueueSetFactory, AutoCloseable {
    --- End diff --
    
    Does this need to be an inner class? `RocksDBKeyedStateBackend` is already more than 2500 lines long.


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6276


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201014333
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---
    @@ -102,6 +105,21 @@
     			Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create)
     		).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
     
    +	@Override
    +	public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
    +		@Nonnull String stateName,
    +		@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
    +		@Nonnull Comparator<T> elementComparator,
    +		@Nonnull KeyExtractorFunction<T> keyExtractor) {
    --- End diff --
    
    Wrong indentation.


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201016952
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -2579,4 +2604,85 @@ public static RocksIteratorWrapper getRocksIterator(
     		ReadOptions readOptions) {
     		return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
     	}
    +
    +	/**
    +	 * Encapsulates the logic and resources in connection with creating priority queue state structures.
    +	 */
    +	class RocksDBPriorityQueueFactory implements PriorityQueueSetFactory, AutoCloseable {
    +
    +		/** Default cache size per key-group. */
    +		private static final int DEFAULT_CACHES_SIZE = 8 * 1024;
    +
    +		/** A shared buffer to serialize elements for the priority queue. */
    +		@Nonnull
    +		private final ByteArrayOutputStreamWithPos elementSerializationOutStream;
    +
    +		/** A shared adapter wrapper around elementSerializationOutStream to become a {@link DataOutputView}. */
    +		@Nonnull
    +		private final DataOutputViewStreamWrapper elementSerializationOutView;
    +
    +		/** A shared {@link RocksDBWriteBatchWrapper} to batch modifications to priority queues. */
    +		@Nonnull
    +		private final RocksDBWriteBatchWrapper writeBatchWrapper;
    +
    +		/** Map to track all column families created to back priority queues. */
    +		@Nonnull
    +		private final Map<String, ColumnFamilyHandle> priorityQueueColumnFamilies;
    +
    +		RocksDBPriorityQueueFactory() {
    +			this.elementSerializationOutStream = new ByteArrayOutputStreamWithPos();
    +			this.elementSerializationOutView = new DataOutputViewStreamWrapper(elementSerializationOutStream);
    +			this.writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions);
    +			this.priorityQueueColumnFamilies = new HashMap<>();
    +		}
    +
    +		@Override
    +		public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> create(
    +			@Nonnull String stateName,
    +			@Nonnull TypeSerializer<T> byteOrderedElementSerializer,
    +			@Nonnull Comparator<T> elementComparator,
    +			@Nonnull KeyExtractorFunction<T> keyExtractor) {
    --- End diff --
    
    Indentation


---

[GitHub] flink issue #6276: [FLINK-9486] Introduce TimerState in keyed state backend

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6276
  
    @tillrohrmann Thanks for the fast review. Merging.


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201010986
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java ---
    @@ -0,0 +1,31 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.Set;
    +
    +/**
    + *
    --- End diff --
    
    Even if it is a temporary commit please add java docs in the commit where you add the interface.


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201013962
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java ---
    @@ -0,0 +1,45 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.util.Comparator;
    +
    +/**
    + *
    + */
    --- End diff --
    
    Same here with the JavaDocs. If you add JavaDocs later on, then please use fixup commits to combine these commits with the original commit. Otherwise it makes less sense to have separate commits in the first place.


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201019320
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java ---
    @@ -37,10 +62,22 @@
     	/**
     	 * Returns the key that is bound to this timer.
     	 */
    +	@Nonnull
     	K getKey();
     
     	/**
     	 * Returns the namespace that is bound to this timer.
     	 */
    +	@Nonnull
     	N getNamespace();
    +
    +	@SuppressWarnings("unchecked")
    +	static <T extends InternalTimer> Comparator<T> getTimerComparator() {
    +		return (Comparator<T>) TIMER_COMPARATOR;
    +	}
    +
    +	@SuppressWarnings("unchecked")
    +	static <T extends InternalTimer> KeyExtractorFunction<T> getKeyExtractorFunction() {
    --- End diff --
    
    Raw usage of `InternalTimer`


---

[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6276#discussion_r201019266
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java ---
    @@ -37,10 +62,22 @@
     	/**
     	 * Returns the key that is bound to this timer.
     	 */
    +	@Nonnull
     	K getKey();
     
     	/**
     	 * Returns the namespace that is bound to this timer.
     	 */
    +	@Nonnull
     	N getNamespace();
    +
    +	@SuppressWarnings("unchecked")
    +	static <T extends InternalTimer> Comparator<T> getTimerComparator() {
    --- End diff --
    
    Raw usage of `InternalTimer`


---