You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2017/09/25 14:39:01 UTC

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/4722

    [FLINK-7683] Iterate over keys in KeyedStateBackend

    ## What is the purpose of the change
    
    This is required to make possible preserving backward compatibility while changing state definition of a keyed state operator.
    
    ## Verifying this change
    
    This change added new unit tests to cover new feature.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? JavaDocs
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink keys

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4722.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4722
    
----
commit ac41b26cdb5c2341bb25e520c4d449ec4e956a8f
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-09-14T10:39:30Z

    [FLINK-7683] Iterate over keys in KeyedStateBackend

----


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r141258536
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1946,4 +1974,44 @@ public File getInstanceBasePath() {
     	public boolean supportsAsynchronousSnapshots() {
     		return true;
     	}
    +
    +	private static class RocksIteratorWrapper<K> implements Iterator<K> {
    +		private final RocksIterator iterator;
    +		private final String field;
    +		private final TypeSerializer<K> keySerializer;
    +		private final int keyGroupPrefixBytes;
    +
    +		public RocksIteratorWrapper(
    +				RocksIterator iterator,
    +				String field,
    +				TypeSerializer<K> keySerializer,
    +				int keyGroupPrefixBytes) {
    +			this.iterator = Preconditions.checkNotNull(iterator);
    +			this.field = Preconditions.checkNotNull(field);
    +			this.keySerializer = Preconditions.checkNotNull(keySerializer);
    +			this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
    +		}
    +
    +		@Override
    +		public boolean hasNext() {
    +			return iterator.isValid();
    +		}
    +
    +		@Override
    +		public K next() {
    +			if (!iterator.isValid()) {
    --- End diff --
    
    fixed


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r141034332
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---
    @@ -442,6 +444,32 @@ public void testDisposeDeletesAllDirectories() throws Exception {
     	}
     
     	@Test
    +	public void testGetKeys() throws Exception {
    --- End diff --
    
    This test should go into `StateBackendTestBase`. This way, you don't have to repeat it for the Rocks and the Heap backend.


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r141390478
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---
    @@ -190,6 +193,39 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception {
     	}
     
     	@Test
    +	public void testGetKeys() throws Exception {
    +		final int elementsToTest = 1000;
    +		String fieldName = "get-keys-while-modifying-test";
    --- End diff --
    
    nit: this name is a bit misleading now


---

[GitHub] flink issue #4722: [FLINK-7683] Iterate over keys in KeyedStateBackend

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/4722
  
    I have one more talking point for this: the behaviour of the iterators for RocksDB and heap backends are different now: for RocksDB we iterate over a snapshot of the data and modifications concurrently (by that, I mean not in a different thread, but just interleaved with the iteration in the same thread) to the iteration will work and not be reflected in the iterator. In the heap backend, the concurrent modifications will make the iterator fail. Is this a problem for the intended use case and what is the expected behaviour?


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4722


---

[GitHub] flink issue #4722: [FLINK-7683] Iterate over keys in KeyedStateBackend

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4722
  
    We have got carried away with those concurrent modifications and snapshots. To rewrite keyed state variable from an old to a new one, old is used as a read only. Thus `getKeys` for the the old one doesn't have to care about any modifications. 
    
    I have reverted snapshot rocks db changes.


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r140908693
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend(
     		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
     	}
     
    +	@Override
    +	public <N> Stream<K> getKeys(String field, N namespace) {
    +		Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(field);
    +		if (columnInfo == null) {
    +			return Stream.empty();
    +		}
    +
    +		RocksIterator iterator = db.newIterator(columnInfo.f0);
    +		iterator.seekToFirst();
    +
    +		Iterator<K> sourceIterator = new Iterator<K>() {
    +			@Override
    +			public boolean hasNext() {
    +				return iterator.isValid();
    +			}
    +
    +			@Override
    +			public K next() {
    +				try {
    +					byte[] key = iterator.key();
    +
    +					DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
    +						new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes));
    +					K value = keySerializer.deserialize(dataInput);
    +					iterator.next();
    --- End diff --
    
    This will throw exceptions implicitly if `iterator` doesn't have any more elements.
    
    I'd rather have it fail early and faster by checking `iterator.hasNext()` first and throwing exceptions explicitly.


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r141390556
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---
    @@ -190,6 +193,39 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception {
     	}
     
     	@Test
    +	public void testGetKeys() throws Exception {
    +		final int elementsToTest = 1000;
    +		String fieldName = "get-keys-while-modifying-test";
    +		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
    +		try {
    +			ValueState<Integer> keyedState = backend.getOrCreateKeyedState(
    +				VoidNamespaceSerializer.INSTANCE,
    +				new ValueStateDescriptor<>(fieldName, IntSerializer.INSTANCE));
    +			((InternalValueState<VoidNamespace, Integer>) keyedState).setCurrentNamespace(VoidNamespace.INSTANCE);
    +
    +			for (int key = 0; key < elementsToTest; key++) {
    +				backend.setCurrentKey(key);
    +				keyedState.update(key * 2);
    +			}
    +
    +			try (Stream<Integer> keysStream = backend.getKeys(fieldName, VoidNamespace.INSTANCE).sorted()) {
    +				PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
    +
    +				for (int expectedKey = 0; expectedKey < elementsToTest; expectedKey++) {
    +					assertTrue(actualIterator.hasNext());
    +					assertEquals(expectedKey, actualIterator.nextInt());
    +				}
    +
    +				assertFalse(actualIterator.hasNext());
    +			}
    +		}
    +		finally {
    +			org.apache.commons.io.IOUtils.closeQuietly(backend);
    --- End diff --
    
    nit: why do we need the fully qualified name her?


---

[GitHub] flink issue #4722: [FLINK-7683] Iterate over keys in KeyedStateBackend

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/4722
  
    LGTM!
    
    One more ask: can you please add comments to both the class and the code explaining that concurrent modification will fail the iteration?


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r141136758
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -1946,4 +1974,44 @@ public File getInstanceBasePath() {
     	public boolean supportsAsynchronousSnapshots() {
     		return true;
     	}
    +
    +	private static class RocksIteratorWrapper<K> implements Iterator<K> {
    +		private final RocksIterator iterator;
    +		private final String field;
    +		private final TypeSerializer<K> keySerializer;
    +		private final int keyGroupPrefixBytes;
    +
    +		public RocksIteratorWrapper(
    +				RocksIterator iterator,
    +				String field,
    +				TypeSerializer<K> keySerializer,
    +				int keyGroupPrefixBytes) {
    +			this.iterator = Preconditions.checkNotNull(iterator);
    +			this.field = Preconditions.checkNotNull(field);
    +			this.keySerializer = Preconditions.checkNotNull(keySerializer);
    +			this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
    +		}
    +
    +		@Override
    +		public boolean hasNext() {
    +			return iterator.isValid();
    +		}
    +
    +		@Override
    +		public K next() {
    +			if (!iterator.isValid()) {
    --- End diff --
    
    minor: I usually call `hasNext()` here, otherwise it'll duplicate logic
    
    ```
    if(!hasNext()) {
      throw exception....
    }
    ```


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r140821446
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java ---
    @@ -287,6 +289,14 @@ public S get(K key, N namespace) {
     	}
     
     	@Override
    +	public Stream<K> getKeys(N namespace) {
    +		return Arrays.stream(primaryTable)
    --- End diff --
    
    This hash table applies incremental rehashing. This method must also consider the data stored ``incrementalRehashTable`` if it isn't ``null`` or we can miss some data.


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r140907699
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend(
     		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
     	}
     
    +	@Override
    +	public <N> Stream<K> getKeys(String field, N namespace) {
    +		Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(field);
    +		if (columnInfo == null) {
    +			return Stream.empty();
    +		}
    +
    +		RocksIterator iterator = db.newIterator(columnInfo.f0);
    +		iterator.seekToFirst();
    +
    +		Iterator<K> sourceIterator = new Iterator<K>() {
    +			@Override
    +			public boolean hasNext() {
    --- End diff --
    
    Is there a lock in RocksDB that will guard the iteration against state change by users?


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r141083304
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend(
     		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
     	}
     
    +	@Override
    +	public <N> Stream<K> getKeys(String field, N namespace) {
    +		Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(field);
    +		if (columnInfo == null) {
    +			return Stream.empty();
    +		}
    +
    +		RocksIterator iterator = db.newIterator(columnInfo.f0);
    +		iterator.seekToFirst();
    +
    +		Iterator<K> sourceIterator = new Iterator<K>() {
    +			@Override
    +			public boolean hasNext() {
    --- End diff --
    
    I've changed this code to iterate over a snapshot of a db, so this should be fine now for RocksDB.


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r141090682
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend(
     		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
     	}
     
    +	@Override
    +	public <N> Stream<K> getKeys(String field, N namespace) {
    +		Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(field);
    +		if (columnInfo == null) {
    +			return Stream.empty();
    +		}
    +
    +		RocksIterator iterator = db.newIterator(columnInfo.f0);
    +		iterator.seekToFirst();
    +
    +		Iterator<K> sourceIterator = new Iterator<K>() {
    +			@Override
    +			public boolean hasNext() {
    +				return iterator.isValid();
    +			}
    +
    +			@Override
    +			public K next() {
    +				try {
    +					byte[] key = iterator.key();
    +
    +					DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
    +						new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes));
    +					K value = keySerializer.deserialize(dataInput);
    +					iterator.next();
    --- End diff --
    
    Added sanity check at the beginning of this method.


---

[GitHub] flink issue #4722: [FLINK-7683] Iterate over keys in KeyedStateBackend

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on the issue:

    https://github.com/apache/flink/pull/4722
  
    LGTM


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by bowenli86 <gi...@git.apache.org>.
Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r140907101
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -252,6 +257,43 @@ public RocksDBKeyedStateBackend(
     		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
     	}
     
    +	@Override
    +	public <N> Stream<K> getKeys(String field, N namespace) {
    +		Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(field);
    +		if (columnInfo == null) {
    +			return Stream.empty();
    +		}
    +
    +		RocksIterator iterator = db.newIterator(columnInfo.f0);
    +		iterator.seekToFirst();
    +
    +		Iterator<K> sourceIterator = new Iterator<K>() {
    --- End diff --
    
    I believe it's better to move impl of this iterator to a standalone or inner class


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r141100065
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java ---
    @@ -287,6 +289,14 @@ public S get(K key, N namespace) {
     	}
     
     	@Override
    +	public Stream<K> getKeys(N namespace) {
    +		return Arrays.stream(primaryTable)
    --- End diff --
    
    also added larger test that covers this case


---

[GitHub] flink pull request #4722: [FLINK-7683] Iterate over keys in KeyedStateBacken...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r141272559
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java ---
    @@ -37,6 +39,12 @@
     	void setCurrentKey(K newKey);
     
     	/**
    +	 * @return Stream of existing keys in the namespace.
    --- End diff --
    
    Nit: This could be changed to "Returns a stream of all keys for the given state and namespace.". The first parameter should be renamed to `state` or something like it because `field` is not used anywhere else, I think.


---