You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/08/13 04:09:55 UTC
[flink] branch release-1.11 updated: [FLINK-18659][hive][orc] Fix
streaming write for Hive 1.x Orc table
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new b728f22 [FLINK-18659][hive][orc] Fix streaming write for Hive 1.x Orc table
b728f22 is described below
commit b728f22e9f95a0ea8952aa234150718d796e72ef
Author: Rui Li <li...@apache.org>
AuthorDate: Thu Aug 13 12:09:15 2020 +0800
[FLINK-18659][hive][orc] Fix streaming write for Hive 1.x Orc table
This closes #13130
---
flink-connectors/flink-connector-hive/pom.xml | 2 ++
.../hive/write/HiveBulkWriterFactory.java | 3 ++-
.../flink/connectors/hive/HiveTableSinkITCase.java | 30 ++++++++++++++++------
3 files changed, 26 insertions(+), 9 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index fe261c6..8249323 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -1064,6 +1064,8 @@ under the License.
<properties>
<hive.version>3.1.1</hive.version>
<derby.version>10.14.1.0</derby.version>
+ <!-- need a hadoop version that fixes HADOOP-14683 -->
+ <hivemetastore.hadoop.version>2.8.2</hivemetastore.hadoop.version>
</properties>
<dependencyManagement>
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveBulkWriterFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveBulkWriterFactory.java
index 9a22d56..1fb7193 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveBulkWriterFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveBulkWriterFactory.java
@@ -51,7 +51,8 @@ public class HiveBulkWriterFactory implements HadoopPathBasedBulkWriter.Factory<
@Override
public long getSize() throws IOException {
- return fs.getFileStatus(inProgressPath).getLen();
+ // it's possible the in-progress file hasn't yet been created, due to writer lazy init or data buffering
+ return fs.exists(inProgressPath) ? fs.getFileStatus(inProgressPath).getLen() : 0;
}
@Override
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
index 7592a33..b9819ad 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java
@@ -252,32 +252,46 @@ public class HiveTableSinkITCase {
@Test(timeout = 120000)
public void testDefaultSerPartStreamingWrite() throws Exception {
- testStreamingWrite(true, false, true, this::checkSuccessFiles);
+ testStreamingWrite(true, false, "textfile", this::checkSuccessFiles);
}
@Test(timeout = 120000)
public void testPartStreamingWrite() throws Exception {
- testStreamingWrite(true, false, false, this::checkSuccessFiles);
+ testStreamingWrite(true, false, "parquet", this::checkSuccessFiles);
+ // vector orc writer only works with Hive 3.x
+ if (hiveCatalog.getHiveVersion().startsWith("3.")) {
+ testStreamingWrite(true, false, "orc", this::checkSuccessFiles);
+ }
}
@Test(timeout = 120000)
public void testNonPartStreamingWrite() throws Exception {
- testStreamingWrite(false, false, false, (p) -> {});
+ testStreamingWrite(false, false, "parquet", (p) -> {});
+ // vector orc writer only works with Hive 3.x
+ if (hiveCatalog.getHiveVersion().startsWith("3.")) {
+ testStreamingWrite(false, false, "orc", (p) -> {});
+ }
}
@Test(timeout = 120000)
public void testPartStreamingMrWrite() throws Exception {
- testStreamingWrite(true, true, false, this::checkSuccessFiles);
+ testStreamingWrite(true, true, "parquet", this::checkSuccessFiles);
+ if (!hiveCatalog.getHiveVersion().startsWith("2.")) {
+ testStreamingWrite(true, true, "orc", this::checkSuccessFiles);
+ }
}
@Test(timeout = 120000)
public void testNonPartStreamingMrWrite() throws Exception {
- testStreamingWrite(false, true, false, (p) -> {});
+ testStreamingWrite(false, true, "parquet", (p) -> {});
+ if (!hiveCatalog.getHiveVersion().startsWith("2.")) {
+ testStreamingWrite(false, true, "orc", (p) -> {});
+ }
}
@Test(timeout = 120000)
public void testStreamingAppend() throws Exception {
- testStreamingWrite(false, false, false, (p) -> {
+ testStreamingWrite(false, false, "parquet", (p) -> {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);
@@ -316,7 +330,7 @@ public class HiveTableSinkITCase {
private void testStreamingWrite(
boolean part,
boolean useMr,
- boolean defaultSer,
+ String format,
Consumer<String> pathConsumer) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
@@ -355,7 +369,7 @@ public class HiveTableSinkITCase {
(part ? "" : ",d string,e string") +
") " +
(part ? "partitioned by (d string,e string) " : "") +
- (defaultSer ? "" : " stored as parquet") +
+ " stored as " + format +
" TBLPROPERTIES (" +
"'" + PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN.key() + "'='$d $e:00:00'," +
"'" + SINK_PARTITION_COMMIT_DELAY.key() + "'='1h'," +