You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/08/24 09:05:46 UTC

[GitHub] [flink] Myasuka commented on a change in pull request #13179: [FLINK-18978][state-backends] Support full table scan of key and namespace from statebackend

Myasuka commented on a change in pull request #13179:
URL: https://github.com/apache/flink/pull/13179#discussion_r475395736



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
##########
@@ -75,6 +76,13 @@
 	 */
 	<N> Stream<K> getKeys(String state, N namespace);
 
+	/**
+	 * @return A stream of all keys for the given state and namespace. Modifications to the state during iterating
+	 * 		   over it keys are not supported.
+	 * @param state State variable for which existing keys will be returned.

Review comment:
       From current implementation, we cannot ensure all entries within one namespace would be returned one by one, I think we'd better add some description here.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys
+ * and namespaces. This class is not thread safe.
+ *
+ * @param <K> the type of the iterated keys in RocksDB.
+ * @param <N> the type of the iterated namespaces in RocksDB.
+ */
+public class RocksStateKeysAndNamespaceIterator<K, N> implements Iterator<Tuple2<K, N>>, AutoCloseable {

Review comment:
       I think this class could share some code with `RocksStateKeysIterator` as the main difference is they return different entry when calling `next()`

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysAndNamespaceIterator.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys
+ * and namespaces. This class is not thread safe.
+ *
+ * @param <K> the type of the iterated keys in RocksDB.
+ * @param <N> the type of the iterated namespaces in RocksDB.
+ */
+public class RocksStateKeysAndNamespaceIterator<K, N> implements Iterator<Tuple2<K, N>>, AutoCloseable {
+
+	@Nonnull
+	private final RocksIteratorWrapper iterator;
+
+	@Nonnull
+	private final String state;
+
+	@Nonnull
+	private final TypeSerializer<K> keySerializer;
+
+	@Nonnull
+	private final TypeSerializer<N> namespaceSerializer;
+
+	private final boolean ambiguousKeyPossible;
+	private final int keyGroupPrefixBytes;
+	private final DataInputDeserializer byteArrayDataInputView;
+	private Tuple2<K, N> nextKey;
+
+	public RocksStateKeysAndNamespaceIterator(
+		@Nonnull RocksIteratorWrapper iterator,
+		@Nonnull String state,
+		@Nonnull TypeSerializer<K> keySerializer,
+		@Nonnull TypeSerializer<N> namespaceSerializer, int keyGroupPrefixBytes,

Review comment:
       This line should be separated.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysAndNamespaceIterator;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the RocksIteratorWrapper.

Review comment:
       This doc should be updated.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysAndNamespacesIteratorTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysAndNamespaceIterator;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the RocksIteratorWrapper.
+ */
+public class RocksDBRocksStateKeysAndNamespacesIteratorTest {

Review comment:
       I think this class might be able to share some code with `RocksDBRocksStateKeysIteratorTest`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org