You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/11 02:39:03 UTC
[incubator-inlong] branch master updated: [INLONG-2413][Sort]Support non-partitioned table when using hive sink (#2428)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7800dee [INLONG-2413][Sort]Support non-partitioned table when using hive sink (#2428)
7800dee is described below
commit 7800dee6ff2c88aa03fdb34f2f6e5fd83d7d53bb
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Fri Feb 11 10:38:58 2022 +0800
[INLONG-2413][Sort]Support non-partitioned table when using hive sink (#2428)
Co-authored-by: tianqiwan <ti...@tencent.com>
---
inlong-sort/pom.xml | 4 +
.../inlong/sort/protocol/sink/HiveSinkInfo.java | 9 +-
inlong-sort/sort-single-tenant/pom.xml | 6 +
.../inlong/sort/singletenant/flink/Entrance.java | 26 ++-
.../hive/HiveSinkWithoutPartitionTestCase.java | 246 +++++++++++++++++++++
5 files changed, 279 insertions(+), 12 deletions(-)
diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index a57e9a2..14cbd4f 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -301,6 +301,10 @@
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
index c4ff8d3..89ee544 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.protocol.sink;
+import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
@@ -188,7 +189,7 @@ public class HiveSinkInfo extends SinkInfo {
public interface HiveFileFormat {
}
- public static class TextFileFormat implements HiveFileFormat {
+ public static class TextFileFormat implements HiveFileFormat, Serializable {
@JsonProperty("splitter")
private final Character splitter;
@@ -220,7 +221,7 @@ public class HiveSinkInfo extends SinkInfo {
}
}
- public static class OrcFileFormat implements HiveFileFormat {
+ public static class OrcFileFormat implements HiveFileFormat, Serializable {
@JsonProperty("batch_size")
private final int batchSize;
@@ -236,7 +237,7 @@ public class HiveSinkInfo extends SinkInfo {
}
}
- public static class SequenceFileFormat implements HiveFileFormat {
+ public static class SequenceFileFormat implements HiveFileFormat, Serializable {
@JsonProperty("splitter")
private final Character splitter;
@@ -262,7 +263,7 @@ public class HiveSinkInfo extends SinkInfo {
}
}
- public static class ParquetFileFormat implements HiveFileFormat {
+ public static class ParquetFileFormat implements HiveFileFormat, Serializable {
public ParquetFileFormat() {
}
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 4f32c57..c2d8115 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -117,6 +117,12 @@
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index 479da10..cfbd134 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -127,14 +127,24 @@ public class Entrance {
Preconditions.checkState(sinkInfo instanceof HiveSinkInfo);
HiveSinkInfo hiveSinkInfo = (HiveSinkInfo) sinkInfo;
- sourceStream
- .process(new HiveWriter(config, dataflowId, hiveSinkInfo))
- .uid(Constants.SINK_UID)
- .name("Hive Sink")
- .setParallelism(sinkParallelism)
- .addSink(new HiveCommitter(config, hiveSinkInfo))
- .name("Hive Committer")
- .setParallelism(1);
+ if (hiveSinkInfo.getPartitions().length == 0) {
+ // The committer operator is not necessary if partition is not existent.
+ sourceStream
+ .process(new HiveWriter(config, dataflowId, hiveSinkInfo))
+ .uid(Constants.SINK_UID)
+ .name("Hive Sink")
+ .setParallelism(sinkParallelism);
+ } else {
+ sourceStream
+ .process(new HiveWriter(config, dataflowId, hiveSinkInfo))
+ .uid(Constants.SINK_UID)
+ .name("Hive Sink")
+ .setParallelism(sinkParallelism)
+ .addSink(new HiveCommitter(config, hiveSinkInfo))
+ .name("Hive Committer")
+ .setParallelism(1);
+ }
+
break;
case Constants.SINK_TYPE_ICEBERG:
Preconditions.checkState(sinkInfo instanceof IcebergSinkInfo);
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/hive/HiveSinkWithoutPartitionTestCase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/hive/HiveSinkWithoutPartitionTestCase.java
new file mode 100644
index 0000000..1e42a90
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/hive/HiveSinkWithoutPartitionTestCase.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.hive;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.flink.hive.HiveWriter;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.TextFileFormat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveSinkWithoutPartitionTestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveSinkWithoutPartitionTestCase.class);
+
+ private static final CountDownLatch verificationFinishedLatch = new CountDownLatch(1);
+
+ private static final CountDownLatch jobFinishedLatch = new CountDownLatch(1);
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final long dataflowId = 9527;
+
+ private final org.apache.inlong.sort.configuration.Configuration config =
+ new org.apache.inlong.sort.configuration.Configuration();
+
+ private FileSystem dfs;
+
+ private String dfsSchema;
+
+ private MiniDFSCluster hdfsCluster;
+
+ private String hdfsDataDir;
+
+ private final String hiveDb = "hive_db";
+
+ private final String hiveTable = "hive_table";
+
+ @Before
+ public void setUp() throws Exception {
+ initializeHdfs();
+ }
+
+ @After
+ public void cleanUp() {
+ showdownHdfs();
+ }
+
+ @Test(timeout = 60000)
+ public void test() throws Exception {
+ config.setLong(Constants.SINK_HIVE_ROLLING_POLICY_CHECK_INTERVAL, 1000L);
+ config.setLong(Constants.SINK_HIVE_ROLLING_POLICY_ROLLOVER_INTERVAL, 1000L);
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService.execute(() -> {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(1000L);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.addSource(new TestingSourceFunction())
+ .setParallelism(1)
+ .process(new HiveWriter(config, dataflowId, prepareSinkInfo()))
+ .setParallelism(1);
+ try {
+ env.execute(); // will block here
+ } catch (Exception e) {
+ LOG.error("Unexpected exception thrown", e);
+ } finally {
+ jobFinishedLatch.countDown();
+ }
+ });
+
+ try {
+ boolean fileVerified = false;
+
+ while (true) {
+ if (!fileVerified) {
+ fileVerified = verifyHdfsFile();
+ //noinspection BusyWait
+ Thread.sleep(1000);
+ } else {
+ break;
+ }
+ }
+ verificationFinishedLatch.countDown();
+ jobFinishedLatch.await();
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ private void initializeHdfs() throws IOException {
+ final File dataDir = tempFolder.newFolder();
+ final UserGroupInformation currentUser = UserGroupInformation.getLoginUser();
+
+ Configuration conf = new Configuration();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+ conf.set("hadoop.proxyuser." + currentUser.getUserName() + ".groups", "*");
+ conf.set("hadoop.proxyuser." + currentUser.getUserName() + ".hosts", "*");
+
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ hdfsCluster = builder.build();
+
+ dfs = hdfsCluster.getFileSystem();
+ dfsSchema = "hdfs://" + NetUtils.hostAndPortToUrlString(
+ hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort());
+ hdfsDataDir = "/user/test_user/root_path/" + hiveDb + ".db/" + hiveTable;
+
+ prepareHdfsPath();
+ }
+
+ private void showdownHdfs() {
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown();
+ hdfsCluster = null;
+ }
+ }
+
+ private void prepareHdfsPath() throws IOException {
+ final Path hdfsRootPath = new Path(hdfsDataDir);
+ assertTrue(dfs.mkdirs(hdfsRootPath));
+ dfs.setPermission(hdfsRootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ }
+
+ private HiveSinkInfo prepareSinkInfo() {
+ return new HiveSinkInfo(
+ new FieldInfo[]{
+ new FieldInfo("f1", StringFormatInfo.INSTANCE),
+ new FieldInfo("f2", IntFormatInfo.INSTANCE)
+ },
+ "127.0.0.1:10000",
+ hiveDb,
+ hiveTable,
+ "username",
+ "password",
+ dfsSchema + hdfsDataDir,
+ new HivePartitionInfo[]{},
+ new TextFileFormat("\t".charAt(0))
+ );
+ }
+
+ private boolean verifyHdfsFile() throws IOException {
+ final FileStatus[] fileStatuses = dfs.listStatus(new Path(hdfsDataDir));
+ if (fileStatuses == null || fileStatuses.length < 1) {
+ return false;
+ }
+
+ final List<String> results = new ArrayList<>();
+ for (FileStatus dataFileStatus : fileStatuses) {
+ final Path dataFilePath = dataFileStatus.getPath();
+
+ if (dataFileStatus.getLen() <= 0 || dataFilePath.getName().startsWith(".")) {
+ // File name starts with . means that the file is an in-progress file
+ continue;
+ }
+
+ // Read data files
+ final FSDataInputStream inputStream = dfs.open(dataFilePath);
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+ String line;
+ while ((line = br.readLine()) != null) {
+ results.add(line);
+ }
+ }
+ }
+
+ // Verify results
+ if (results.size() < 10) {
+ return false;
+ }
+
+ Collections.sort(results);
+ for (int i = 0; i < results.size(); ++i) {
+ assertEquals("name_" + i + "\t" + i, results.get(i));
+ }
+ return true;
+ }
+
+ private static class TestingSourceFunction extends RichSourceFunction<Row> {
+
+ private static final long serialVersionUID = -5969057934139454695L;
+
+ @Override
+ public void run(SourceContext<Row> sourceContext) throws Exception {
+ for (int i = 0; i < 10; i++) {
+ sourceContext.collect(Row.of("name_" + i, i));
+ }
+ verificationFinishedLatch.await();
+ }
+
+ @Override
+ public void cancel() {
+
+ }
+ }
+}