You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/30 06:04:32 UTC

[GitHub] [flink] JingsongLi opened a new pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

JingsongLi opened a new pull request #13852:
URL: https://github.com/apache/flink/pull/13852


   
   ## What is the purpose of the change
   
   Integrate file compaction to filesystem connector
   
   ## Brief change log
   
   Introduce Filesystem options:
   - AUTO_COMPACTION
   - COMPACTION_FILE_SIZE (Default is the rolling file size)
   
   Complete compaction:
   - Introduce `CompactFileWriter`
   - Introduce compaction to `FileSystemTableSink`
   
   ## Verifying this change
   
   - `CsvFileCompactionITCase` compaction based on `FileInputFormat`
   - `ParquetFileCompactionITCase` compaction based on `BulkFormat`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13852:
URL: https://github.com/apache/flink/pull/13852#issuecomment-719283002


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8650",
       "triggerID" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8659",
       "triggerID" : "cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eaa32dd4c7689a8241956b8441105d760bd1747a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8650) 
   * cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8659) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13852:
URL: https://github.com/apache/flink/pull/13852#issuecomment-719283002


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8650",
       "triggerID" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eaa32dd4c7689a8241956b8441105d760bd1747a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8650) 
   * cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lirui-apache commented on a change in pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
lirui-apache commented on a change in pull request #13852:
URL: https://github.com/apache/flink/pull/13852#discussion_r515763622



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Streaming sink File Compaction ITCase base, test checkpoint.
+ */
+public abstract class FileCompactionITCaseBase extends StreamingTestBase {
+
+	@Rule
+	public Timeout timeoutPerTest = Timeout.seconds(60);
+
+	private String resultPath;
+
+	private List<Row> rows;
+
+	@Before
+	public void init() throws IOException {
+		resultPath = tempFolder().newFolder().toURI().toString();
+		clear();

Review comment:
       Why do we need to call clear here?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Streaming sink File Compaction ITCase base, test checkpoint.
+ */
+public abstract class FileCompactionITCaseBase extends StreamingTestBase {
+
+	@Rule
+	public Timeout timeoutPerTest = Timeout.seconds(60);
+
+	private String resultPath;
+
+	private List<Row> rows;
+
+	@Before
+	public void init() throws IOException {
+		resultPath = tempFolder().newFolder().toURI().toString();
+		clear();
+
+		env().setParallelism(3);
+		env().enableCheckpointing(100);
+
+		rows = new ArrayList<>();
+		for (int i = 0; i < 100; i++) {
+			rows.add(Row.of(i, String.valueOf(i % 10), String.valueOf(i)));
+		}
+
+		DataStream<Row> stream = new DataStream<>(env().getJavaEnv().addSource(
+				new ParallelFiniteTestSource<>(rows),
+				new RowTypeInfo(
+						new TypeInformation[] {Types.INT, Types.STRING, Types.STRING},
+						new String[] {"a", "b", "c"})));
+
+		tEnv().createTemporaryView("my_table",  stream);
+	}
+
+	@After
+	public void clear() throws IOException {
+		FileUtils.deleteDirectory(new File(URI.create(resultPath)));
+	}
+
+	protected abstract String format();
+
+	@Test
+	public void testNonPartition() throws Exception {
+		tEnv().executeSql("CREATE TABLE sink_table (a int, b string, c string) with (" + options() + ")");
+		tEnv().executeSql("insert into sink_table select * from my_table").await();
+
+		List<Row> results = toListAndClose(tEnv().executeSql("select * from sink_table").collect());
+		results.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+		assertEquals(rows, results);
+
+		File[] files = new File(URI.create(resultPath)).listFiles(
+				(dir, name) -> name.startsWith("compacted-part-"));

Review comment:
       Can we reuse the `COMPACTED_PREFIX` defined in `CompactOperator`?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Streaming sink File Compaction ITCase base, test checkpoint.
+ */
+public abstract class FileCompactionITCaseBase extends StreamingTestBase {
+
+	@Rule
+	public Timeout timeoutPerTest = Timeout.seconds(60);
+
+	private String resultPath;
+
+	private List<Row> rows;
+
+	@Before
+	public void init() throws IOException {
+		resultPath = tempFolder().newFolder().toURI().toString();
+		clear();
+
+		env().setParallelism(3);
+		env().enableCheckpointing(100);
+
+		rows = new ArrayList<>();
+		for (int i = 0; i < 100; i++) {
+			rows.add(Row.of(i, String.valueOf(i % 10), String.valueOf(i)));
+		}
+
+		DataStream<Row> stream = new DataStream<>(env().getJavaEnv().addSource(
+				new ParallelFiniteTestSource<>(rows),
+				new RowTypeInfo(
+						new TypeInformation[] {Types.INT, Types.STRING, Types.STRING},
+						new String[] {"a", "b", "c"})));
+
+		tEnv().createTemporaryView("my_table",  stream);
+	}
+
+	@After
+	public void clear() throws IOException {
+		FileUtils.deleteDirectory(new File(URI.create(resultPath)));
+	}
+
+	protected abstract String format();
+
+	@Test
+	public void testNonPartition() throws Exception {
+		tEnv().executeSql("CREATE TABLE sink_table (a int, b string, c string) with (" + options() + ")");
+		tEnv().executeSql("insert into sink_table select * from my_table").await();
+
+		List<Row> results = toListAndClose(tEnv().executeSql("select * from sink_table").collect());
+		results.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+		assertEquals(rows, results);
+
+		File[] files = new File(URI.create(resultPath)).listFiles(
+				(dir, name) -> name.startsWith("compacted-part-"));
+		assertEquals(Arrays.toString(files), 1, files.length);
+
+		String fileName = files[0].getName();
+		assertTrue(fileName, fileName.startsWith("compacted-part-"));

Review comment:
       Seems this is redundant because we list files with a filter that only returns files with this prefix

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -161,20 +190,120 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
 						.withOutputFileConfig(outputFileConfig)
 						.withRollingPolicy(rollingPolicy);
 			}
-			return createStreamingSink(
-					tableOptions,
+
+			long bucketCheckInterval = tableOptions.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
+
+			DataStream<PartitionCommitInfo> writerStream;
+			if (autoCompaction) {
+				long compactionSize = tableOptions
+						.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE)
+						.orElse(tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE))
+						.getBytes();
+
+				CompactReader.Factory<RowData> reader = createCompactReader(sinkContext).orElseThrow(
+						() -> new TableException("Please implement available reader for compaction:" +
+								" BulkFormat, FileInputFormat."));
+
+				writerStream = StreamingSink.compactionWriter(
+						dataStream,
+						bucketCheckInterval,
+						bucketsBuilder,
+						fsFactory,
+						path,
+						reader,
+						compactionSize);
+			} else {
+				writerStream = StreamingSink.writer(
+						dataStream, bucketCheckInterval, bucketsBuilder);
+			}
+
+			return StreamingSink.sink(
+					writerStream,
 					path,
-					partitionKeys,
 					tableIdentifier,
-					overwrite,
-					dataStream,
-					bucketsBuilder,
+					partitionKeys,
 					metaStoreFactory,
 					fsFactory,
-					tableOptions.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis());
+					tableOptions);
+		}
+	}
+
+	private Optional<CompactReader.Factory<RowData>> createCompactReader(Context context) {

Review comment:
       `createCompactReaderFactory`?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -138,15 +154,28 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
 			return dataStream.writeUsingOutputFormat(builder.build())
 					.setParallelism(dataStream.getParallelism());
 		} else {
+			if (overwrite) {
+				throw new IllegalStateException("Streaming mode not support overwrite.");
+			}
+
+			boolean autoCompaction = tableOptions.getBoolean(FileSystemOptions.AUTO_COMPACTION);
 			Object writer = createWriter(sinkContext);
+			boolean isEncoder = writer instanceof Encoder;
 			TableBucketAssigner assigner = new TableBucketAssigner(computer);
 			TableRollingPolicy rollingPolicy = new TableRollingPolicy(
-					!(writer instanceof Encoder),
+					!isEncoder || autoCompaction,
 					tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
 					tableOptions.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
 
+			if (autoCompaction) {
+				outputFileConfig = OutputFileConfig.builder()

Review comment:
       Can we have a method to create builder from an `OutputFileConfig` instance? To make sure we won't lose anything here.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/FileInputFormatReader.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem.stream.compact;
+
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+
+import java.io.IOException;
+
+/**
+ * The {@link CompactReader} to delegate {@link FileInputFormat}.
+ */
+public class FileInputFormatReader<T> implements CompactReader<T> {

Review comment:
       FileInputFormatCompactReader?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Streaming sink File Compaction ITCase base, test checkpoint.
+ */
+public abstract class FileCompactionITCaseBase extends StreamingTestBase {
+
+	@Rule
+	public Timeout timeoutPerTest = Timeout.seconds(60);
+
+	private String resultPath;
+
+	private List<Row> rows;
+
+	@Before
+	public void init() throws IOException {
+		resultPath = tempFolder().newFolder().toURI().toString();
+		clear();
+
+		env().setParallelism(3);
+		env().enableCheckpointing(100);
+
+		rows = new ArrayList<>();
+		for (int i = 0; i < 100; i++) {
+			rows.add(Row.of(i, String.valueOf(i % 10), String.valueOf(i)));
+		}
+
+		DataStream<Row> stream = new DataStream<>(env().getJavaEnv().addSource(
+				new ParallelFiniteTestSource<>(rows),
+				new RowTypeInfo(
+						new TypeInformation[] {Types.INT, Types.STRING, Types.STRING},
+						new String[] {"a", "b", "c"})));
+
+		tEnv().createTemporaryView("my_table",  stream);
+	}
+
+	@After
+	public void clear() throws IOException {
+		FileUtils.deleteDirectory(new File(URI.create(resultPath)));
+	}
+
+	protected abstract String format();
+
+	@Test
+	public void testNonPartition() throws Exception {
+		tEnv().executeSql("CREATE TABLE sink_table (a int, b string, c string) with (" + options() + ")");
+		tEnv().executeSql("insert into sink_table select * from my_table").await();
+
+		List<Row> results = toListAndClose(tEnv().executeSql("select * from sink_table").collect());
+		results.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+		assertEquals(rows, results);
+
+		File[] files = new File(URI.create(resultPath)).listFiles(
+				(dir, name) -> name.startsWith("compacted-part-"));
+		assertEquals(Arrays.toString(files), 1, files.length);

Review comment:
       Can we also verify there's no un-compacted files left?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -185,4 +185,17 @@
 					.defaultValue("_SUCCESS")
 					.withDescription("The file name for success-file partition commit policy," +
 							" default is '_SUCCESS'.");
+
+	public static final ConfigOption<Boolean> AUTO_COMPACTION =
+			key("auto-compaction")
+					.booleanType()
+					.defaultValue(false)
+					.withDescription("Whether to enable automatic compaction in streaming sink or not." +

Review comment:
       Let's have some more high-level explanations here, e.g. what will be compacted, when the compaction happens, whether files are usable before compaction, etc.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -127,108 +127,118 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
 	}
 
 	private DataStreamSink<?> consume(DataStream<RowData> dataStream, Context sinkContext) {
-		RowDataPartitionComputer computer = new RowDataPartitionComputer(
+		if (sinkContext.isBounded()) {
+			return createBatchSink(dataStream, sinkContext);
+		} else {
+			if (overwrite) {
+				throw new IllegalStateException("Streaming mode not support overwrite.");
+			}
+
+			return createStreamingSink(dataStream, sinkContext);
+		}
+	}
+
+	private RowDataPartitionComputer partitionComputer() {
+		return new RowDataPartitionComputer(
 				defaultPartName,
 				schema.getFieldNames(),
 				schema.getFieldDataTypes(),
 				partitionKeys.toArray(new String[0]));
+	}
 
-		EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
-		OutputFileConfig outputFileConfig = OutputFileConfig.builder()
+	private DataStreamSink<RowData> createBatchSink(
+			DataStream<RowData> inputStream, Context sinkContext) {
+		FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
+		builder.setPartitionComputer(partitionComputer());
+		builder.setDynamicGrouped(dynamicGrouping);
+		builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
+		builder.setFormatFactory(createOutputFormatFactory(sinkContext));
+		builder.setMetaStoreFactory(new EmptyMetaStoreFactory(path));
+		builder.setOverwrite(overwrite);
+		builder.setStaticPartitions(staticPartitions);
+		builder.setTempPath(toStagingPath());
+		builder.setOutputFileConfig(OutputFileConfig.builder()
 				.withPartPrefix("part-" + UUID.randomUUID().toString())
-				.build();
+				.build());
+		return inputStream.writeUsingOutputFormat(builder.build())
+				.setParallelism(inputStream.getParallelism());
+	}
+
+	private DataStreamSink<?> createStreamingSink(
+			DataStream<RowData> dataStream, Context sinkContext) {
 		FileSystemFactory fsFactory = FileSystem::get;
+		RowDataPartitionComputer computer = partitionComputer();
 
-		if (sinkContext.isBounded()) {
-			FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
-			builder.setPartitionComputer(computer);
-			builder.setDynamicGrouped(dynamicGrouping);
-			builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
-			builder.setFormatFactory(createOutputFormatFactory(sinkContext));
-			builder.setMetaStoreFactory(metaStoreFactory);
-			builder.setFileSystemFactory(fsFactory);
-			builder.setOverwrite(overwrite);
-			builder.setStaticPartitions(staticPartitions);
-			builder.setTempPath(toStagingPath());
-			builder.setOutputFileConfig(outputFileConfig);
-			return dataStream.writeUsingOutputFormat(builder.build())
-					.setParallelism(dataStream.getParallelism());
+		boolean autoCompaction = tableOptions.getBoolean(FileSystemOptions.AUTO_COMPACTION);
+		Object writer = createWriter(sinkContext);
+		boolean isEncoder = writer instanceof Encoder;
+		TableBucketAssigner assigner = new TableBucketAssigner(computer);
+		TableRollingPolicy rollingPolicy = new TableRollingPolicy(
+				!isEncoder || autoCompaction,
+				tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
+				tableOptions.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
+
+		String randomPrefix = "part-" + UUID.randomUUID().toString();
+		OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder = OutputFileConfig.builder();
+		fileNamingBuilder = autoCompaction ?
+				fileNamingBuilder.withPartPrefix(convertToUncompacted(randomPrefix)) :
+				fileNamingBuilder.withPartPrefix(randomPrefix);

Review comment:
       Why do we call `withPartPrefix` twice?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -127,108 +127,118 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
 	}
 
 	private DataStreamSink<?> consume(DataStream<RowData> dataStream, Context sinkContext) {
-		RowDataPartitionComputer computer = new RowDataPartitionComputer(
+		if (sinkContext.isBounded()) {
+			return createBatchSink(dataStream, sinkContext);
+		} else {
+			if (overwrite) {
+				throw new IllegalStateException("Streaming mode not support overwrite.");
+			}
+
+			return createStreamingSink(dataStream, sinkContext);
+		}
+	}
+
+	private RowDataPartitionComputer partitionComputer() {
+		return new RowDataPartitionComputer(
 				defaultPartName,
 				schema.getFieldNames(),
 				schema.getFieldDataTypes(),
 				partitionKeys.toArray(new String[0]));
+	}
 
-		EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
-		OutputFileConfig outputFileConfig = OutputFileConfig.builder()
+	private DataStreamSink<RowData> createBatchSink(
+			DataStream<RowData> inputStream, Context sinkContext) {
+		FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
+		builder.setPartitionComputer(partitionComputer());
+		builder.setDynamicGrouped(dynamicGrouping);
+		builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
+		builder.setFormatFactory(createOutputFormatFactory(sinkContext));
+		builder.setMetaStoreFactory(new EmptyMetaStoreFactory(path));
+		builder.setOverwrite(overwrite);
+		builder.setStaticPartitions(staticPartitions);
+		builder.setTempPath(toStagingPath());
+		builder.setOutputFileConfig(OutputFileConfig.builder()
 				.withPartPrefix("part-" + UUID.randomUUID().toString())
-				.build();
+				.build());
+		return inputStream.writeUsingOutputFormat(builder.build())
+				.setParallelism(inputStream.getParallelism());
+	}
+
+	private DataStreamSink<?> createStreamingSink(
+			DataStream<RowData> dataStream, Context sinkContext) {
 		FileSystemFactory fsFactory = FileSystem::get;
+		RowDataPartitionComputer computer = partitionComputer();
 
-		if (sinkContext.isBounded()) {
-			FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
-			builder.setPartitionComputer(computer);
-			builder.setDynamicGrouped(dynamicGrouping);
-			builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
-			builder.setFormatFactory(createOutputFormatFactory(sinkContext));
-			builder.setMetaStoreFactory(metaStoreFactory);
-			builder.setFileSystemFactory(fsFactory);
-			builder.setOverwrite(overwrite);
-			builder.setStaticPartitions(staticPartitions);
-			builder.setTempPath(toStagingPath());
-			builder.setOutputFileConfig(outputFileConfig);
-			return dataStream.writeUsingOutputFormat(builder.build())
-					.setParallelism(dataStream.getParallelism());
+		boolean autoCompaction = tableOptions.getBoolean(FileSystemOptions.AUTO_COMPACTION);
+		Object writer = createWriter(sinkContext);
+		boolean isEncoder = writer instanceof Encoder;
+		TableBucketAssigner assigner = new TableBucketAssigner(computer);
+		TableRollingPolicy rollingPolicy = new TableRollingPolicy(
+				!isEncoder || autoCompaction,
+				tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
+				tableOptions.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
+
+		String randomPrefix = "part-" + UUID.randomUUID().toString();
+		OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder = OutputFileConfig.builder();
+		fileNamingBuilder = autoCompaction ?
+				fileNamingBuilder.withPartPrefix(convertToUncompacted(randomPrefix)) :
+				fileNamingBuilder.withPartPrefix(randomPrefix);

Review comment:
       Never mind... Just noted it's for different cases




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #13852:
URL: https://github.com/apache/flink/pull/13852


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13852:
URL: https://github.com/apache/flink/pull/13852#issuecomment-719283002


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8650",
       "triggerID" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eaa32dd4c7689a8241956b8441105d760bd1747a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8650) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13852:
URL: https://github.com/apache/flink/pull/13852#issuecomment-719283002






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13852:
URL: https://github.com/apache/flink/pull/13852#discussion_r515799302



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Streaming sink File Compaction ITCase base, test checkpoint.
+ */
+public abstract class FileCompactionITCaseBase extends StreamingTestBase {
+
+	@Rule
+	public Timeout timeoutPerTest = Timeout.seconds(60);
+
+	private String resultPath;
+
+	private List<Row> rows;
+
+	@Before
+	public void init() throws IOException {
+		resultPath = tempFolder().newFolder().toURI().toString();
+		clear();

Review comment:
       we don't clear here.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -138,15 +154,28 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
 			return dataStream.writeUsingOutputFormat(builder.build())
 					.setParallelism(dataStream.getParallelism());
 		} else {
+			if (overwrite) {
+				throw new IllegalStateException("Streaming mode not support overwrite.");
+			}
+
+			boolean autoCompaction = tableOptions.getBoolean(FileSystemOptions.AUTO_COMPACTION);
 			Object writer = createWriter(sinkContext);
+			boolean isEncoder = writer instanceof Encoder;
 			TableBucketAssigner assigner = new TableBucketAssigner(computer);
 			TableRollingPolicy rollingPolicy = new TableRollingPolicy(
-					!(writer instanceof Encoder),
+					!isEncoder || autoCompaction,
 					tableOptions.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
 					tableOptions.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
 
+			if (autoCompaction) {
+				outputFileConfig = OutputFileConfig.builder()

Review comment:
       I think I can keep builder as a local field

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Streaming sink File Compaction ITCase base, test checkpoint.
+ */
+public abstract class FileCompactionITCaseBase extends StreamingTestBase {
+
+	@Rule
+	public Timeout timeoutPerTest = Timeout.seconds(60);
+
+	private String resultPath;
+
+	private List<Row> rows;
+
+	@Before
+	public void init() throws IOException {
+		resultPath = tempFolder().newFolder().toURI().toString();
+		clear();
+
+		env().setParallelism(3);
+		env().enableCheckpointing(100);
+
+		rows = new ArrayList<>();
+		for (int i = 0; i < 100; i++) {
+			rows.add(Row.of(i, String.valueOf(i % 10), String.valueOf(i)));
+		}
+
+		DataStream<Row> stream = new DataStream<>(env().getJavaEnv().addSource(
+				new ParallelFiniteTestSource<>(rows),
+				new RowTypeInfo(
+						new TypeInformation[] {Types.INT, Types.STRING, Types.STRING},
+						new String[] {"a", "b", "c"})));
+
+		tEnv().createTemporaryView("my_table",  stream);
+	}
+
+	@After
+	public void clear() throws IOException {
+		FileUtils.deleteDirectory(new File(URI.create(resultPath)));
+	}
+
+	protected abstract String format();
+
+	@Test
+	public void testNonPartition() throws Exception {
+		tEnv().executeSql("CREATE TABLE sink_table (a int, b string, c string) with (" + options() + ")");
+		tEnv().executeSql("insert into sink_table select * from my_table").await();
+
+		List<Row> results = toListAndClose(tEnv().executeSql("select * from sink_table").collect());
+		results.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+		assertEquals(rows, results);
+
+		File[] files = new File(URI.create(resultPath)).listFiles(
+				(dir, name) -> name.startsWith("compacted-part-"));
+		assertEquals(Arrays.toString(files), 1, files.length);

Review comment:
       We cannot...
   - We never know when the files will be deleted.
   - We never know when the `endInput` will be invoked too.

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Streaming sink File Compaction ITCase base, test checkpoint.
+ */
+public abstract class FileCompactionITCaseBase extends StreamingTestBase {
+
+	@Rule
+	public Timeout timeoutPerTest = Timeout.seconds(60);
+
+	private String resultPath;
+
+	private List<Row> rows;
+
+	@Before
+	public void init() throws IOException {
+		resultPath = tempFolder().newFolder().toURI().toString();
+		clear();
+
+		env().setParallelism(3);
+		env().enableCheckpointing(100);
+
+		rows = new ArrayList<>();
+		for (int i = 0; i < 100; i++) {
+			rows.add(Row.of(i, String.valueOf(i % 10), String.valueOf(i)));
+		}
+
+		DataStream<Row> stream = new DataStream<>(env().getJavaEnv().addSource(
+				new ParallelFiniteTestSource<>(rows),
+				new RowTypeInfo(
+						new TypeInformation[] {Types.INT, Types.STRING, Types.STRING},
+						new String[] {"a", "b", "c"})));
+
+		tEnv().createTemporaryView("my_table",  stream);
+	}
+
+	@After
+	public void clear() throws IOException {
+		FileUtils.deleteDirectory(new File(URI.create(resultPath)));
+	}
+
+	protected abstract String format();
+
+	@Test
+	public void testNonPartition() throws Exception {
+		tEnv().executeSql("CREATE TABLE sink_table (a int, b string, c string) with (" + options() + ")");
+		tEnv().executeSql("insert into sink_table select * from my_table").await();
+
+		List<Row> results = toListAndClose(tEnv().executeSql("select * from sink_table").collect());
+		results.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+		assertEquals(rows, results);
+
+		File[] files = new File(URI.create(resultPath)).listFiles(
+				(dir, name) -> name.startsWith("compacted-part-"));
+		assertEquals(Arrays.toString(files), 1, files.length);

Review comment:
       We cannot...
   - We never know when the files will be deleted.
   - We never know when the `endInput` will be invoked too.

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Streaming sink File Compaction ITCase base, test checkpoint.
+ */
+public abstract class FileCompactionITCaseBase extends StreamingTestBase {
+
+	@Rule
+	public Timeout timeoutPerTest = Timeout.seconds(60);
+
+	private String resultPath;
+
+	private List<Row> rows;
+
+	@Before
+	public void init() throws IOException {
+		resultPath = tempFolder().newFolder().toURI().toString();
+		clear();
+
+		env().setParallelism(3);
+		env().enableCheckpointing(100);
+
+		rows = new ArrayList<>();
+		for (int i = 0; i < 100; i++) {
+			rows.add(Row.of(i, String.valueOf(i % 10), String.valueOf(i)));
+		}
+
+		DataStream<Row> stream = new DataStream<>(env().getJavaEnv().addSource(
+				new ParallelFiniteTestSource<>(rows),
+				new RowTypeInfo(
+						new TypeInformation[] {Types.INT, Types.STRING, Types.STRING},
+						new String[] {"a", "b", "c"})));
+
+		tEnv().createTemporaryView("my_table",  stream);
+	}
+
+	@After
+	public void clear() throws IOException {
+		FileUtils.deleteDirectory(new File(URI.create(resultPath)));
+	}
+
+	protected abstract String format();
+
+	@Test
+	public void testNonPartition() throws Exception {
+		tEnv().executeSql("CREATE TABLE sink_table (a int, b string, c string) with (" + options() + ")");
+		tEnv().executeSql("insert into sink_table select * from my_table").await();
+
+		List<Row> results = toListAndClose(tEnv().executeSql("select * from sink_table").collect());
+		results.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+		assertEquals(rows, results);
+
+		File[] files = new File(URI.create(resultPath)).listFiles(
+				(dir, name) -> name.startsWith("compacted-part-"));
+		assertEquals(Arrays.toString(files), 1, files.length);

Review comment:
       I  think we can

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Streaming sink File Compaction ITCase base, test checkpoint.
+ */
+public abstract class FileCompactionITCaseBase extends StreamingTestBase {
+
+	@Rule
+	public Timeout timeoutPerTest = Timeout.seconds(60);
+
+	private String resultPath;
+
+	private List<Row> rows;
+
+	@Before
+	public void init() throws IOException {
+		resultPath = tempFolder().newFolder().toURI().toString();
+		clear();
+
+		env().setParallelism(3);
+		env().enableCheckpointing(100);
+
+		rows = new ArrayList<>();
+		for (int i = 0; i < 100; i++) {
+			rows.add(Row.of(i, String.valueOf(i % 10), String.valueOf(i)));
+		}
+
+		DataStream<Row> stream = new DataStream<>(env().getJavaEnv().addSource(
+				new ParallelFiniteTestSource<>(rows),
+				new RowTypeInfo(
+						new TypeInformation[] {Types.INT, Types.STRING, Types.STRING},
+						new String[] {"a", "b", "c"})));
+
+		tEnv().createTemporaryView("my_table",  stream);
+	}
+
+	@After
+	public void clear() throws IOException {
+		FileUtils.deleteDirectory(new File(URI.create(resultPath)));
+	}
+
+	protected abstract String format();
+
+	@Test
+	public void testNonPartition() throws Exception {
+		tEnv().executeSql("CREATE TABLE sink_table (a int, b string, c string) with (" + options() + ")");
+		tEnv().executeSql("insert into sink_table select * from my_table").await();
+
+		List<Row> results = toListAndClose(tEnv().executeSql("select * from sink_table").collect());
+		results.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+		assertEquals(rows, results);
+
+		File[] files = new File(URI.create(resultPath)).listFiles(
+				(dir, name) -> name.startsWith("compacted-part-"));
+		assertEquals(Arrays.toString(files), 1, files.length);

Review comment:
       We can not assert just one file, This is because `ParallelFiniteTestSource` may spread data across multiple checkpoints.

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FileCompactionITCaseBase.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.scala.DataStream;
+import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Streaming sink File Compaction ITCase base, test checkpoint.
+ */
+public abstract class FileCompactionITCaseBase extends StreamingTestBase {
+
+	@Rule
+	public Timeout timeoutPerTest = Timeout.seconds(60);
+
+	private String resultPath;
+
+	private List<Row> rows;
+
+	@Before
+	public void init() throws IOException {
+		resultPath = tempFolder().newFolder().toURI().toString();
+		clear();
+
+		env().setParallelism(3);
+		env().enableCheckpointing(100);
+
+		rows = new ArrayList<>();
+		for (int i = 0; i < 100; i++) {
+			rows.add(Row.of(i, String.valueOf(i % 10), String.valueOf(i)));
+		}
+
+		DataStream<Row> stream = new DataStream<>(env().getJavaEnv().addSource(
+				new ParallelFiniteTestSource<>(rows),
+				new RowTypeInfo(
+						new TypeInformation[] {Types.INT, Types.STRING, Types.STRING},
+						new String[] {"a", "b", "c"})));
+
+		tEnv().createTemporaryView("my_table",  stream);
+	}
+
+	@After
+	public void clear() throws IOException {
+		FileUtils.deleteDirectory(new File(URI.create(resultPath)));
+	}
+
+	protected abstract String format();
+
+	@Test
+	public void testNonPartition() throws Exception {
+		tEnv().executeSql("CREATE TABLE sink_table (a int, b string, c string) with (" + options() + ")");
+		tEnv().executeSql("insert into sink_table select * from my_table").await();
+
+		List<Row> results = toListAndClose(tEnv().executeSql("select * from sink_table").collect());
+		results.sort(Comparator.comparingInt(o -> (Integer) o.getField(0)));
+		assertEquals(rows, results);
+
+		File[] files = new File(URI.create(resultPath)).listFiles(
+				(dir, name) -> name.startsWith("compacted-part-"));
+		assertEquals(Arrays.toString(files), 1, files.length);

Review comment:
       Find a bug in `endInput`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13852:
URL: https://github.com/apache/flink/pull/13852#issuecomment-719283002


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8650",
       "triggerID" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8659",
       "triggerID" : "cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e8dc8dcf6c81859a3bab198a560e239de6269fa6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8664",
       "triggerID" : "e8dc8dcf6c81859a3bab198a560e239de6269fa6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eaa32dd4c7689a8241956b8441105d760bd1747a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8650) 
   * cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8659) 
   * e8dc8dcf6c81859a3bab198a560e239de6269fa6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8664) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13852:
URL: https://github.com/apache/flink/pull/13852#issuecomment-719283002


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eaa32dd4c7689a8241956b8441105d760bd1747a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #13852:
URL: https://github.com/apache/flink/pull/13852#issuecomment-720307971






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13852:
URL: https://github.com/apache/flink/pull/13852#issuecomment-719283002


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8650",
       "triggerID" : "eaa32dd4c7689a8241956b8441105d760bd1747a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8659",
       "triggerID" : "cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e8dc8dcf6c81859a3bab198a560e239de6269fa6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e8dc8dcf6c81859a3bab198a560e239de6269fa6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * eaa32dd4c7689a8241956b8441105d760bd1747a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8650) 
   * cfc8a594e8c116bb711c13c05cf4ef62a5bdc98a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8659) 
   * e8dc8dcf6c81859a3bab198a560e239de6269fa6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13852: [FLINK-19875][table] Integrate file compaction to filesystem connector

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13852:
URL: https://github.com/apache/flink/pull/13852#issuecomment-719248005


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit eaa32dd4c7689a8241956b8441105d760bd1747a (Fri Oct 30 06:07:18 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org