You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/08/13 04:09:55 UTC

[flink] branch release-1.11 updated: [FLINK-18659][hive][orc] Fix streaming write for Hive 1.x Orc table

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new b728f22  [FLINK-18659][hive][orc] Fix streaming write for Hive 1.x Orc table
b728f22 is described below

commit b728f22e9f95a0ea8952aa234150718d796e72ef
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Aug 13 12:09:15 2020 +0800

    [FLINK-18659][hive][orc] Fix streaming write for Hive 1.x Orc table
    
    This closes #13130
---
 flink-connectors/flink-connector-hive/pom.xml      |  2 ++
 .../hive/write/HiveBulkWriterFactory.java          |  3 ++-
 .../flink/connectors/hive/HiveTableSinkITCase.java | 30 ++++++++++++++++------
 3 files changed, 26 insertions(+), 9 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index fe261c6..8249323 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -1064,6 +1064,8 @@ under the License.
 			<properties>
 				<hive.version>3.1.1</hive.version>
 				<derby.version>10.14.1.0</derby.version>
+                <!-- need a hadoop version that fixes HADOOP-14683 -->
+				<hivemetastore.hadoop.version>2.8.2</hivemetastore.hadoop.version>
 			</properties>
 
 			<dependencyManagement>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveBulkWriterFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveBulkWriterFactory.java
index 9a22d56..1fb7193 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveBulkWriterFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveBulkWriterFactory.java
@@ -51,7 +51,8 @@ public class HiveBulkWriterFactory implements HadoopPathBasedBulkWriter.Factory<
 
 			@Override
 			public long getSize() throws IOException {
-				return fs.getFileStatus(inProgressPath).getLen();
+				// it's possible the in-progress file hasn't yet been created, due to writer lazy init or data buffering
+				return fs.exists(inProgressPath) ? fs.getFileStatus(inProgressPath).getLen() : 0;
 			}
 
 			@Override
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index 7592a33..b9819ad 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -252,32 +252,46 @@ public class HiveTableSinkITCase {
 
 	@Test(timeout = 120000)
 	public void testDefaultSerPartStreamingWrite() throws Exception {
-		testStreamingWrite(true, false, true, this::checkSuccessFiles);
+		testStreamingWrite(true, false, "textfile", this::checkSuccessFiles);
 	}
 
 	@Test(timeout = 120000)
 	public void testPartStreamingWrite() throws Exception {
-		testStreamingWrite(true, false, false, this::checkSuccessFiles);
+		testStreamingWrite(true, false, "parquet", this::checkSuccessFiles);
+		// vector orc writer only works with Hive 3.x
+		if (hiveCatalog.getHiveVersion().startsWith("3.")) {
+			testStreamingWrite(true, false, "orc", this::checkSuccessFiles);
+		}
 	}
 
 	@Test(timeout = 120000)
 	public void testNonPartStreamingWrite() throws Exception {
-		testStreamingWrite(false, false, false, (p) -> {});
+		testStreamingWrite(false, false, "parquet", (p) -> {});
+		// vector orc writer only works with Hive 3.x
+		if (hiveCatalog.getHiveVersion().startsWith("3.")) {
+			testStreamingWrite(false, false, "orc", (p) -> {});
+		}
 	}
 
 	@Test(timeout = 120000)
 	public void testPartStreamingMrWrite() throws Exception {
-		testStreamingWrite(true, true, false, this::checkSuccessFiles);
+		testStreamingWrite(true, true, "parquet", this::checkSuccessFiles);
+		if (!hiveCatalog.getHiveVersion().startsWith("2.")) {
+			testStreamingWrite(true, true, "orc", this::checkSuccessFiles);
+		}
 	}
 
 	@Test(timeout = 120000)
 	public void testNonPartStreamingMrWrite() throws Exception {
-		testStreamingWrite(false, true, false, (p) -> {});
+		testStreamingWrite(false, true, "parquet", (p) -> {});
+		if (!hiveCatalog.getHiveVersion().startsWith("2.")) {
+			testStreamingWrite(false, true, "orc", (p) -> {});
+		}
 	}
 
 	@Test(timeout = 120000)
 	public void testStreamingAppend() throws Exception {
-		testStreamingWrite(false, false, false, (p) -> {
+		testStreamingWrite(false, false, "parquet", (p) -> {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 			env.setParallelism(1);
 			StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
@@ -316,7 +330,7 @@ public class HiveTableSinkITCase {
 	private void testStreamingWrite(
 			boolean part,
 			boolean useMr,
-			boolean defaultSer,
+			String format,
 			Consumer<String> pathConsumer) throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
@@ -355,7 +369,7 @@ public class HiveTableSinkITCase {
 					(part ? "" : ",d string,e string") +
 					") " +
 					(part ? "partitioned by (d string,e string) " : "") +
-					(defaultSer ? "" : " stored as parquet") +
+					" stored as " + format +
 					" TBLPROPERTIES (" +
 					"'" + PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key() + "'='$d $e:00:00'," +
 					"'" + SINK_PARTITION_COMMIT_DELAY.key() + "'='1h'," +