You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "vinoyang (JIRA)" <ji...@apache.org> on 2018/07/11 12:17:00 UTC
[jira] [Assigned] (FLINK-9804) KeyedStateBackend.getKeys() does not
work on RocksDB MapState
[ https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
vinoyang reassigned FLINK-9804:
-------------------------------
Assignee: vinoyang
> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -------------------------------------------------------------
>
> Key: FLINK-9804
> URL: https://issues.apache.org/jira/browse/FLINK-9804
> Project: Flink
> Issue Type: Improvement
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.0, 1.5.1
> Reporter: Aljoscha Krettek
> Assignee: vinoyang
> Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
> final int namespace1ElementsNum = 1000;
> final int namespace2ElementsNum = 1000;
> String fieldName = "get-keys-test";
> AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
> try {
> final String ns1 = "ns1";
> MapState<String, Integer> keyedState1 = backend.getPartitionedState(
> ns1,
> StringSerializer.INSTANCE,
> new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
> );
> for (int key = 0; key < namespace1ElementsNum; key++) {
> backend.setCurrentKey(key);
> keyedState1.put("he", key * 2);
> keyedState1.put("ho", key * 2);
> }
> final String ns2 = "ns2";
> MapState<String, Integer> keyedState2 = backend.getPartitionedState(
> ns2,
> StringSerializer.INSTANCE,
> new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
> );
> for (int key = namespace1ElementsNum; key < namespace1ElementsNum + namespace2ElementsNum; key++) {
> backend.setCurrentKey(key);
> keyedState2.put("he", key * 2);
> keyedState2.put("ho", key * 2);
> }
> // valid for namespace1
> try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns1).sorted()) {
> PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
> for (int expectedKey = 0; expectedKey < namespace1ElementsNum; expectedKey++) {
> assertTrue(actualIterator.hasNext());
> assertEquals(expectedKey, actualIterator.nextInt());
> }
> assertFalse(actualIterator.hasNext());
> }
> // valid for namespace2
> try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns2).sorted()) {
> PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
> for (int expectedKey = namespace1ElementsNum; expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
> assertTrue(actualIterator.hasNext());
> assertEquals(expectedKey, actualIterator.nextInt());
> }
> assertFalse(actualIterator.hasNext());
> }
> }
> finally {
> IOUtils.closeQuietly(backend);
> backend.dispose();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)