You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/06 03:43:43 UTC

[GitHub] [flink-table-store] tsreaper opened a new pull request, #200: [FLINK-28221] Store commit user name into Flink sink state and correct endInput behavior

tsreaper opened a new pull request, #200:
URL: https://github.com/apache/flink-table-store/pull/200

   Currently commit user name is not stored in state. If user stop with savepoint and later recover the job, commit user name will change and may commit duplicated snapshots to file store. This PR fixes this issue.
   
   This PR also corrects the behavior of `endInput` method in `CommitOperator`. According to the docs in `BoundedOneInput` this method should only flush data and should never commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #200: [FLINK-28221] Store commit user name into Flink sink state and correct endInput behavior

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #200:
URL: https://github.com/apache/flink-table-store/pull/200


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #200: [FLINK-28221] Store commit user name into Flink sink state and correct endInput behavior

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #200:
URL: https://github.com/apache/flink-table-store/pull/200#discussion_r914403345


##########
flink-table-store-format/src/main/java/org/apache/flink/table/store/format/avro/AbstractAvroBulkFormat.java:
##########
@@ -173,7 +173,18 @@ public RecordIterator<T> readBatch() throws IOException {
                     iterator,
                     currentBlockStart,
                     recordsToSkip,
-                    () -> pool.recycler().recycle(reuse));
+                    new Runnable() {
+                        private boolean shouldRecycle = true;
+
+                        @Override
+                        public void run() {
+                            // make sure this method is reentrant

Review Comment:
   Why this method needs to be reentrant?



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java:
##########
@@ -86,11 +93,31 @@ public CommitterOperator(
     @Override
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
-        committer = committerFactory.get();
+
+        // commit user name state of this job
+        // each job can only have one user name and this name must be consistent across restarts
+        ListState<String> commitUserState =
+                context.getOperatorStateStore()
+                        .getListState(new ListStateDescriptor<>("commit_user_state", String.class));
+        // we cannot use job id as commit user name here because user may change job id by creating
+        // a savepoint, stop the job and then resume from savepoint
+        while (true) {
+            for (String user : commitUserState.get()) {

Review Comment:
   Just assert there is only one if isRestore?



##########
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileStoreSinkITCase.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** IT cases for {@link StoreSink} when writing file store. */
+public class FileStoreSinkITCase extends AbstractTestBase {

Review Comment:
   `SinkSavepointITCase`?



##########
pom.xml:
##########
@@ -422,7 +422,6 @@ under the License.
                     <trimStackTrace>false</trimStackTrace>
                     <systemPropertyVariables>
                         <forkNumber>0${surefire.forkNumber}</forkNumber>
-                        <checkpointing.randomization>true</checkpointing.randomization>

Review Comment:
   Can you add some comments? Why we don't support random checkpointing.



##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/AbstractTableStoreFactory.java:
##########
@@ -70,9 +72,17 @@ public TableStoreSource createDynamicTableSource(Context context) {
 
     @Override
     public TableStoreSink createDynamicTableSink(Context context) {
+        boolean checkpointEnabled =
+                context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                                == RuntimeExecutionMode.STREAMING
+                        && context.getConfiguration()
+                                .getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)
+                                .map(d -> d.compareTo(Duration.ZERO) > 0)
+                                .orElse(false);

Review Comment:
   `FlinkSinkBuilder.build` can get all information from `StreamExecutionEnvironment`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org