You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/09 05:50:53 UTC
[flink] 02/02: [FLINK-18130][hive][fs-connector] File name conflict
for different jobs in filesystem/hive sink ()
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ab6cf4083c84bf8c04564c74f6d9f9783adf5e21
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jun 9 13:48:42 2020 +0800
[FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink ()
This closes #12485
---
.../flink/connectors/hive/HiveTableSink.java | 6 +-
.../flink/connectors/hive/HiveTableSinkITCase.java | 75 ++++++++++++++++++++++
.../planner/runtime/FileSystemITCaseBase.scala | 37 +++++++++++
.../stream/sql/StreamFileSystemITCaseBase.scala | 6 +-
.../table/filesystem/FileSystemTableSink.java | 8 +++
.../flink/table/filesystem/PartitionLoader.java | 9 +--
.../table/filesystem/PartitionTempFileManager.java | 2 +-
7 files changed, 132 insertions(+), 11 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index a4e3012..1cac9e0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -77,6 +77,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL;
@@ -141,9 +142,10 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
isCompressed);
String extension = Utilities.getFileExtension(jobConf, isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
- extension = extension == null ? "" : extension;
OutputFileConfig outputFileConfig = OutputFileConfig.builder()
- .withPartSuffix(extension).build();
+ .withPartPrefix("part-" + UUID.randomUUID().toString())
+ .withPartSuffix(extension == null ? "" : extension)
+ .build();
if (isBounded) {
FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(new HiveRowPartitionComputer(
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index a73bce9..7592a33 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -45,6 +45,8 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -60,6 +62,7 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -228,6 +231,25 @@ public class HiveTableSinkITCase {
}
}
+ @Test
+ public void testBatchAppend() {
+ TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+ tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ tEnv.useCatalog(hiveCatalog.getName());
+ tEnv.executeSql("create database db1");
+ tEnv.useDatabase("db1");
+ try {
+ tEnv.executeSql("create table append_table (i int, j int)");
+ TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 1, 1");
+ TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 2, 2");
+ ArrayList<Row> rows = Lists.newArrayList(tEnv.executeSql("select * from append_table").collect());
+ rows.sort(Comparator.comparingInt(o -> (int) o.getField(0)));
+ Assert.assertEquals(Arrays.asList(Row.of(1, 1), Row.of(2, 2)), rows);
+ } finally {
+ tEnv.executeSql("drop database db1 cascade");
+ }
+ }
+
@Test(timeout = 120000)
public void testDefaultSerPartStreamingWrite() throws Exception {
testStreamingWrite(true, false, true, this::checkSuccessFiles);
@@ -253,6 +275,34 @@ public class HiveTableSinkITCase {
testStreamingWrite(false, true, false, (p) -> {});
}
+ @Test(timeout = 120000)
+ public void testStreamingAppend() throws Exception {
+ testStreamingWrite(false, false, false, (p) -> {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
+ tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ tEnv.useCatalog(hiveCatalog.getName());
+
+ TableEnvUtil.execInsertSqlAndWaitResult(
+ tEnv,
+ "insert into db1.sink_table select 6,'a','b','2020-05-03','12'");
+
+ assertBatch("db1.sink_table", Arrays.asList(
+ "1,a,b,2020-05-03,7",
+ "1,a,b,2020-05-03,7",
+ "2,p,q,2020-05-03,8",
+ "2,p,q,2020-05-03,8",
+ "3,x,y,2020-05-03,9",
+ "3,x,y,2020-05-03,9",
+ "4,x,y,2020-05-03,10",
+ "4,x,y,2020-05-03,10",
+ "5,x,y,2020-05-03,11",
+ "5,x,y,2020-05-03,11",
+ "6,a,b,2020-05-03,12"));
+ });
+ }
+
private void checkSuccessFiles(String path) {
File basePath = new File(path, "d=2020-05-03");
Assert.assertEquals(5, basePath.list().length);
@@ -317,6 +367,18 @@ public class HiveTableSinkITCase {
tEnv.sqlQuery("select * from my_table"),
"sink_table");
+ assertBatch("db1.sink_table", Arrays.asList(
+ "1,a,b,2020-05-03,7",
+ "1,a,b,2020-05-03,7",
+ "2,p,q,2020-05-03,8",
+ "2,p,q,2020-05-03,8",
+ "3,x,y,2020-05-03,9",
+ "3,x,y,2020-05-03,9",
+ "4,x,y,2020-05-03,10",
+ "4,x,y,2020-05-03,10",
+ "5,x,y,2020-05-03,11",
+ "5,x,y,2020-05-03,11"));
+
// using batch table env to query.
List<String> results = new ArrayList<>();
TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
@@ -346,6 +408,19 @@ public class HiveTableSinkITCase {
}
}
+ private void assertBatch(String table, List<String> expected) {
+ // using batch table env to query.
+ List<String> results = new ArrayList<>();
+ TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+ batchTEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ batchTEnv.useCatalog(hiveCatalog.getName());
+ batchTEnv.executeSql("select * from " + table).collect()
+ .forEachRemaining(r -> results.add(r.toString()));
+ results.sort(String::compareTo);
+ expected.sort(String::compareTo);
+ Assert.assertEquals(expected, results);
+ }
+
private RowTypeInfo createHiveDestTable(String dbName, String tblName, TableSchema tableSchema, int numPartCols) throws Exception {
CatalogTable catalogTable = createHiveCatalogTable(tableSchema, numPartCols);
hiveCatalog.createTable(new ObjectPath(dbName, tblName), catalogTable, false);
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index 3b7fd46..12981e9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -236,6 +236,43 @@ trait FileSystemITCaseBase {
row(19, 3, "x19")
))
}
+
+ @Test
+ def testInsertAppend(): Unit = {
+ tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT")
+ tableEnv.execute("test1")
+
+ tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT")
+ tableEnv.execute("test2")
+
+ check(
+ "select y, b, x from partitionedTable where a=3",
+ Seq(
+ row(17, 1, "x17"),
+ row(18, 2, "x18"),
+ row(19, 3, "x19"),
+ row(17, 1, "x17"),
+ row(18, 2, "x18"),
+ row(19, 3, "x19")
+ ))
+ }
+
+ @Test
+ def testInsertOverwrite(): Unit = {
+ tableEnv.sqlUpdate("insert overwrite partitionedTable select x, y, a, b from originalT")
+ tableEnv.execute("test1")
+
+ tableEnv.sqlUpdate("insert overwrite partitionedTable select x, y, a, b from originalT")
+ tableEnv.execute("test2")
+
+ check(
+ "select y, b, x from partitionedTable where a=3",
+ Seq(
+ row(17, 1, "x17"),
+ row(18, 2, "x18"),
+ row(19, 3, "x19")
+ ))
+ }
}
object FileSystemITCaseBase {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
index 2a61be8..5b78fee 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSink
import org.apache.flink.types.Row
import org.junit.Assert.assertEquals
-import org.junit.Before
+import org.junit.{Before, Test}
import scala.collection.Seq
@@ -55,4 +55,8 @@ abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSys
expectedResult.map(TestSinkUtil.rowToString(_)).sorted,
sink.getAppendResults.sorted)
}
+
+ // Streaming mode not support overwrite
+ @Test
+ override def testInsertOverwrite(): Unit = {}
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 977aa3f..b612746 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
@@ -63,6 +64,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
@@ -126,6 +128,9 @@ public class FileSystemTableSink implements
partitionKeys.toArray(new String[0]));
EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
+ OutputFileConfig outputFileConfig = OutputFileConfig.builder()
+ .withPartPrefix("part-" + UUID.randomUUID().toString())
+ .build();
if (isBounded) {
FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
@@ -137,6 +142,7 @@ public class FileSystemTableSink implements
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitions);
builder.setTempPath(toStagingPath());
+ builder.setOutputFileConfig(outputFileConfig);
return dataStream.writeUsingOutputFormat(builder.build())
.setParallelism(dataStream.getParallelism());
} else {
@@ -155,12 +161,14 @@ public class FileSystemTableSink implements
bucketsBuilder = StreamingFileSink.forRowFormat(
path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
.withBucketAssigner(assigner)
+ .withOutputFileConfig(outputFileConfig)
.withRollingPolicy(rollingPolicy);
} else {
//noinspection unchecked
bucketsBuilder = StreamingFileSink.forBulkFormat(
path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
.withBucketAssigner(assigner)
+ .withOutputFileConfig(outputFileConfig)
.withRollingPolicy(rollingPolicy);
}
return createStreamingSink(
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java
index 9e0b98d..9a8f8f6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java
@@ -103,7 +103,7 @@ public class PartitionLoader implements Closeable {
}
/**
- * Moves files from srcDir to destDir. Delete files in destDir first when overwrite.
+ * Moves files from srcDir to destDir.
*/
private void renameFiles(List<Path> srcDirs, Path destDir) throws Exception {
for (Path srcDir : srcDirs) {
@@ -113,12 +113,7 @@ public class PartitionLoader implements Closeable {
for (FileStatus srcFile : srcFiles) {
Path srcPath = srcFile.getPath();
Path destPath = new Path(destDir, srcPath.getName());
- int count = 1;
- while (!fs.rename(srcPath, destPath)) {
- String name = srcPath.getName() + "_copy_" + count;
- destPath = new Path(destDir, name);
- count++;
- }
+ fs.rename(srcPath, destPath);
}
}
}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
index b00bed3..8e225ff 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
@@ -97,7 +97,7 @@ public class PartitionTempFileManager {
}
private String newFileName() {
- return String.format("%s%s-%s-file-%d%s",
+ return String.format("%s-%s-%s-file-%d%s",
outputFileConfig.getPartPrefix(), checkpointName(checkpointId),
taskName(taskNumber), nameCounter++, outputFileConfig.getPartSuffix());
}