You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/09 05:50:53 UTC

[flink] 02/02: [FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink ()

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ab6cf4083c84bf8c04564c74f6d9f9783adf5e21
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jun 9 13:48:42 2020 +0800

    [FLINK-18130][hive][fs-connector] File name conflict for different jobs in filesystem/hive sink ()
    
    
    This closes #12485
---
 .../flink/connectors/hive/HiveTableSink.java       |  6 +-
 .../flink/connectors/hive/HiveTableSinkITCase.java | 75 ++++++++++++++++++++++
 .../planner/runtime/FileSystemITCaseBase.scala     | 37 +++++++++++
 .../stream/sql/StreamFileSystemITCaseBase.scala    |  6 +-
 .../table/filesystem/FileSystemTableSink.java      |  8 +++
 .../flink/table/filesystem/PartitionLoader.java    |  9 +--
 .../table/filesystem/PartitionTempFileManager.java |  2 +-
 7 files changed, 132 insertions(+), 11 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index a4e3012..1cac9e0 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -77,6 +77,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 
 import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
 import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_TIME_INTERVAL;
@@ -141,9 +142,10 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
 					isCompressed);
 			String extension = Utilities.getFileExtension(jobConf, isCompressed,
 					(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
-			extension = extension == null ? "" : extension;
 			OutputFileConfig outputFileConfig = OutputFileConfig.builder()
-					.withPartSuffix(extension).build();
+					.withPartPrefix("part-" + UUID.randomUUID().toString())
+					.withPartSuffix(extension == null ? "" : extension)
+					.build();
 			if (isBounded) {
 				FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
 				builder.setPartitionComputer(new HiveRowPartitionComputer(
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index a73bce9..7592a33 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -45,6 +45,8 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -60,6 +62,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -228,6 +231,25 @@ public class HiveTableSinkITCase {
 		}
 	}
 
+	@Test
+	public void testBatchAppend() {
+		TableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE);
+		tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+		tEnv.useCatalog(hiveCatalog.getName());
+		tEnv.executeSql("create database db1");
+		tEnv.useDatabase("db1");
+		try {
+			tEnv.executeSql("create table append_table (i int, j int)");
+			TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 1, 1");
+			TableEnvUtil.execInsertSqlAndWaitResult(tEnv, "insert into append_table select 2, 2");
+			ArrayList<Row> rows = Lists.newArrayList(tEnv.executeSql("select * from append_table").collect());
+			rows.sort(Comparator.comparingInt(o -> (int) o.getField(0)));
+			Assert.assertEquals(Arrays.asList(Row.of(1, 1), Row.of(2, 2)), rows);
+		} finally {
+			tEnv.executeSql("drop database db1 cascade");
+		}
+	}
+
 	@Test(timeout = 120000)
 	public void testDefaultSerPartStreamingWrite() throws Exception {
 		testStreamingWrite(true, false, true, this::checkSuccessFiles);
@@ -253,6 +275,34 @@ public class HiveTableSinkITCase {
 		testStreamingWrite(false, true, false, (p) -> {});
 	}
 
+	@Test(timeout = 120000)
+	public void testStreamingAppend() throws Exception {
+		testStreamingWrite(false, false, false, (p) -> {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(1);
+			StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
+			tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+			tEnv.useCatalog(hiveCatalog.getName());
+
+			TableEnvUtil.execInsertSqlAndWaitResult(
+					tEnv,
+					"insert into db1.sink_table select 6,'a','b','2020-05-03','12'");
+
+			assertBatch("db1.sink_table", Arrays.asList(
+					"1,a,b,2020-05-03,7",
+					"1,a,b,2020-05-03,7",
+					"2,p,q,2020-05-03,8",
+					"2,p,q,2020-05-03,8",
+					"3,x,y,2020-05-03,9",
+					"3,x,y,2020-05-03,9",
+					"4,x,y,2020-05-03,10",
+					"4,x,y,2020-05-03,10",
+					"5,x,y,2020-05-03,11",
+					"5,x,y,2020-05-03,11",
+					"6,a,b,2020-05-03,12"));
+		});
+	}
+
 	private void checkSuccessFiles(String path) {
 		File basePath = new File(path, "d=2020-05-03");
 		Assert.assertEquals(5, basePath.list().length);
@@ -317,6 +367,18 @@ public class HiveTableSinkITCase {
 					tEnv.sqlQuery("select * from my_table"),
 					"sink_table");
 
+			assertBatch("db1.sink_table", Arrays.asList(
+					"1,a,b,2020-05-03,7",
+					"1,a,b,2020-05-03,7",
+					"2,p,q,2020-05-03,8",
+					"2,p,q,2020-05-03,8",
+					"3,x,y,2020-05-03,9",
+					"3,x,y,2020-05-03,9",
+					"4,x,y,2020-05-03,10",
+					"4,x,y,2020-05-03,10",
+					"5,x,y,2020-05-03,11",
+					"5,x,y,2020-05-03,11"));
+
 			// using batch table env to query.
 			List<String> results = new ArrayList<>();
 			TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
@@ -346,6 +408,19 @@ public class HiveTableSinkITCase {
 		}
 	}
 
+	private void assertBatch(String table, List<String> expected) {
+		// using batch table env to query.
+		List<String> results = new ArrayList<>();
+		TableEnvironment batchTEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
+		batchTEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+		batchTEnv.useCatalog(hiveCatalog.getName());
+		batchTEnv.executeSql("select * from " + table).collect()
+				.forEachRemaining(r -> results.add(r.toString()));
+		results.sort(String::compareTo);
+		expected.sort(String::compareTo);
+		Assert.assertEquals(expected, results);
+	}
+
 	private RowTypeInfo createHiveDestTable(String dbName, String tblName, TableSchema tableSchema, int numPartCols) throws Exception {
 		CatalogTable catalogTable = createHiveCatalogTable(tableSchema, numPartCols);
 		hiveCatalog.createTable(new ObjectPath(dbName, tblName), catalogTable, false);
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index 3b7fd46..12981e9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -236,6 +236,43 @@ trait FileSystemITCaseBase {
         row(19, 3, "x19")
       ))
   }
+
+  @Test
+  def testInsertAppend(): Unit = {
+    tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT")
+    tableEnv.execute("test1")
+
+    tableEnv.sqlUpdate("insert into partitionedTable select x, y, a, b from originalT")
+    tableEnv.execute("test2")
+
+    check(
+      "select y, b, x from partitionedTable where a=3",
+      Seq(
+        row(17, 1, "x17"),
+        row(18, 2, "x18"),
+        row(19, 3, "x19"),
+        row(17, 1, "x17"),
+        row(18, 2, "x18"),
+        row(19, 3, "x19")
+      ))
+  }
+
+  @Test
+  def testInsertOverwrite(): Unit = {
+    tableEnv.sqlUpdate("insert overwrite partitionedTable select x, y, a, b from originalT")
+    tableEnv.execute("test1")
+
+    tableEnv.sqlUpdate("insert overwrite partitionedTable select x, y, a, b from originalT")
+    tableEnv.execute("test2")
+
+    check(
+      "select y, b, x from partitionedTable where a=3",
+      Seq(
+        row(17, 1, "x17"),
+        row(18, 2, "x18"),
+        row(19, 3, "x19")
+      ))
+  }
 }
 
 object FileSystemITCaseBase {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
index 2a61be8..5b78fee 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSink
 import org.apache.flink.types.Row
 
 import org.junit.Assert.assertEquals
-import org.junit.Before
+import org.junit.{Before, Test}
 
 import scala.collection.Seq
 
@@ -55,4 +55,8 @@ abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSys
       expectedResult.map(TestSinkUtil.rowToString(_)).sorted,
       sink.getAppendResults.sorted)
   }
+
+  // Streaming mode not support overwrite
+  @Test
+  override def testInsertOverwrite(): Unit = {}
 }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 977aa3f..b612746 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
 import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
 import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
@@ -63,6 +64,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 
 import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_PARTITION_COMMIT_POLICY_KIND;
 import static org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
@@ -126,6 +128,9 @@ public class FileSystemTableSink implements
 				partitionKeys.toArray(new String[0]));
 
 		EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
+		OutputFileConfig outputFileConfig = OutputFileConfig.builder()
+				.withPartPrefix("part-" + UUID.randomUUID().toString())
+				.build();
 
 		if (isBounded) {
 			FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
@@ -137,6 +142,7 @@ public class FileSystemTableSink implements
 			builder.setOverwrite(overwrite);
 			builder.setStaticPartitions(staticPartitions);
 			builder.setTempPath(toStagingPath());
+			builder.setOutputFileConfig(outputFileConfig);
 			return dataStream.writeUsingOutputFormat(builder.build())
 					.setParallelism(dataStream.getParallelism());
 		} else {
@@ -155,12 +161,14 @@ public class FileSystemTableSink implements
 				bucketsBuilder = StreamingFileSink.forRowFormat(
 						path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
 						.withBucketAssigner(assigner)
+						.withOutputFileConfig(outputFileConfig)
 						.withRollingPolicy(rollingPolicy);
 			} else {
 				//noinspection unchecked
 				bucketsBuilder = StreamingFileSink.forBulkFormat(
 						path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
 						.withBucketAssigner(assigner)
+						.withOutputFileConfig(outputFileConfig)
 						.withRollingPolicy(rollingPolicy);
 			}
 			return createStreamingSink(
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java
index 9e0b98d..9a8f8f6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionLoader.java
@@ -103,7 +103,7 @@ public class PartitionLoader implements Closeable {
 	}
 
 	/**
-	 * Moves files from srcDir to destDir. Delete files in destDir first when overwrite.
+	 * Moves files from srcDir to destDir.
 	 */
 	private void renameFiles(List<Path> srcDirs, Path destDir) throws Exception {
 		for (Path srcDir : srcDirs) {
@@ -113,12 +113,7 @@ public class PartitionLoader implements Closeable {
 					for (FileStatus srcFile : srcFiles) {
 						Path srcPath = srcFile.getPath();
 						Path destPath = new Path(destDir, srcPath.getName());
-						int count = 1;
-						while (!fs.rename(srcPath, destPath)) {
-							String name = srcPath.getName() + "_copy_" + count;
-							destPath = new Path(destDir, name);
-							count++;
-						}
+						fs.rename(srcPath, destPath);
 					}
 				}
 			}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
index b00bed3..8e225ff 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTempFileManager.java
@@ -97,7 +97,7 @@ public class PartitionTempFileManager {
 	}
 
 	private String newFileName() {
-		return String.format("%s%s-%s-file-%d%s",
+		return String.format("%s-%s-%s-file-%d%s",
 				outputFileConfig.getPartPrefix(), checkpointName(checkpointId),
 				taskName(taskNumber), nameCounter++, outputFileConfig.getPartSuffix());
 	}