You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Sihua Zhou (JIRA)" <ji...@apache.org> on 2018/07/12 14:50:00 UTC

[jira] [Closed] (FLINK-9804) KeyedStateBackend.getKeys() does not work on RocksDB MapState

     [ https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sihua Zhou closed FLINK-9804.
-----------------------------
    Resolution: Fixed

Merged in:

1.6.0: def2aed5c75b5a00815186d3343e66cb1dc01ac0

1.5.2: 8a564f82aa521670a0b6813c5deb65586b1fa136

> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -------------------------------------------------------------
>
>                 Key: FLINK-9804
>                 URL: https://issues.apache.org/jira/browse/FLINK-9804
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.5.1
>            Reporter: Aljoscha Krettek
>            Assignee: Sihua Zhou
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
> 	final int namespace1ElementsNum = 1000;
> 	final int namespace2ElementsNum = 1000;
> 	String fieldName = "get-keys-test";
> 	AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
> 	try {
> 		final String ns1 = "ns1";
> 		MapState<String, Integer> keyedState1 = backend.getPartitionedState(
> 			ns1,
> 			StringSerializer.INSTANCE,
> 			new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
> 		);
> 		for (int key = 0; key < namespace1ElementsNum; key++) {
> 			backend.setCurrentKey(key);
> 			keyedState1.put("he", key * 2);
> 			keyedState1.put("ho", key * 2);
> 		}
> 		final String ns2 = "ns2";
> 		MapState<String, Integer> keyedState2 = backend.getPartitionedState(
> 			ns2,
> 			StringSerializer.INSTANCE,
> 			new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
> 		);
> 		for (int key = namespace1ElementsNum; key < namespace1ElementsNum + namespace2ElementsNum; key++) {
> 			backend.setCurrentKey(key);
> 			keyedState2.put("he", key * 2);
> 			keyedState2.put("ho", key * 2);
> 		}
> 		// valid for namespace1
> 		try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns1).sorted()) {
> 			PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
> 			for (int expectedKey = 0; expectedKey < namespace1ElementsNum; expectedKey++) {
> 				assertTrue(actualIterator.hasNext());
> 				assertEquals(expectedKey, actualIterator.nextInt());
> 			}
> 			assertFalse(actualIterator.hasNext());
> 		}
> 		// valid for namespace2
> 		try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns2).sorted()) {
> 			PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();
> 			for (int expectedKey = namespace1ElementsNum; expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
> 				assertTrue(actualIterator.hasNext());
> 				assertEquals(expectedKey, actualIterator.nextInt());
> 			}
> 			assertFalse(actualIterator.hasNext());
> 		}
> 	}
> 	finally {
> 		IOUtils.closeQuietly(backend);
> 		backend.dispose();
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)