You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/11 02:39:03 UTC

[incubator-inlong] branch master updated: [INLONG-2413][Sort]Support non-partitioned table when using hive sink (#2428)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7800dee  [INLONG-2413][Sort]Support non-partitioned table when using hive sink (#2428)
7800dee is described below

commit 7800dee6ff2c88aa03fdb34f2f6e5fd83d7d53bb
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Fri Feb 11 10:38:58 2022 +0800

    [INLONG-2413][Sort]Support non-partitioned table when using hive sink (#2428)
    
    Co-authored-by: tianqiwan <ti...@tencent.com>
---
 inlong-sort/pom.xml                                |   4 +
 .../inlong/sort/protocol/sink/HiveSinkInfo.java    |   9 +-
 inlong-sort/sort-single-tenant/pom.xml             |   6 +
 .../inlong/sort/singletenant/flink/Entrance.java   |  26 ++-
 .../hive/HiveSinkWithoutPartitionTestCase.java     | 246 +++++++++++++++++++++
 5 files changed, 279 insertions(+), 12 deletions(-)

diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index a57e9a2..14cbd4f 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -301,6 +301,10 @@
                         <groupId>xml-apis</groupId>
                         <artifactId>xml-apis</artifactId>
                     </exclusion>
+                    <exclusion>
+                        <groupId>org.apache.avro</groupId>
+                        <artifactId>avro</artifactId>
+                    </exclusion>
                 </exclusions>
             </dependency>
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
index c4ff8d3..89ee544 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/HiveSinkInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.protocol.sink;
 
+import java.io.Serializable;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
@@ -188,7 +189,7 @@ public class HiveSinkInfo extends SinkInfo {
     public interface HiveFileFormat {
     }
 
-    public static class TextFileFormat implements HiveFileFormat {
+    public static class TextFileFormat implements HiveFileFormat, Serializable {
 
         @JsonProperty("splitter")
         private final Character splitter;
@@ -220,7 +221,7 @@ public class HiveSinkInfo extends SinkInfo {
         }
     }
 
-    public static class OrcFileFormat implements HiveFileFormat {
+    public static class OrcFileFormat implements HiveFileFormat, Serializable {
 
         @JsonProperty("batch_size")
         private final int batchSize;
@@ -236,7 +237,7 @@ public class HiveSinkInfo extends SinkInfo {
         }
     }
 
-    public static class SequenceFileFormat implements HiveFileFormat {
+    public static class SequenceFileFormat implements HiveFileFormat, Serializable {
 
         @JsonProperty("splitter")
         private final Character splitter;
@@ -262,7 +263,7 @@ public class HiveSinkInfo extends SinkInfo {
         }
     }
 
-    public static class ParquetFileFormat implements HiveFileFormat {
+    public static class ParquetFileFormat implements HiveFileFormat, Serializable {
 
         public ParquetFileFormat() {
         }
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 4f32c57..c2d8115 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -117,6 +117,12 @@
             <artifactId>curator-test</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index 479da10..cfbd134 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -127,14 +127,24 @@ public class Entrance {
                 Preconditions.checkState(sinkInfo instanceof HiveSinkInfo);
                 HiveSinkInfo hiveSinkInfo = (HiveSinkInfo) sinkInfo;
 
-                sourceStream
-                        .process(new HiveWriter(config, dataflowId, hiveSinkInfo))
-                        .uid(Constants.SINK_UID)
-                        .name("Hive Sink")
-                        .setParallelism(sinkParallelism)
-                        .addSink(new HiveCommitter(config, hiveSinkInfo))
-                        .name("Hive Committer")
-                        .setParallelism(1);
+                if (hiveSinkInfo.getPartitions().length == 0) {
+                    // The committer operator is not necessary if partition is not existent.
+                    sourceStream
+                            .process(new HiveWriter(config, dataflowId, hiveSinkInfo))
+                            .uid(Constants.SINK_UID)
+                            .name("Hive Sink")
+                            .setParallelism(sinkParallelism);
+                } else {
+                    sourceStream
+                            .process(new HiveWriter(config, dataflowId, hiveSinkInfo))
+                            .uid(Constants.SINK_UID)
+                            .name("Hive Sink")
+                            .setParallelism(sinkParallelism)
+                            .addSink(new HiveCommitter(config, hiveSinkInfo))
+                            .name("Hive Committer")
+                            .setParallelism(1);
+                }
+
                 break;
             case Constants.SINK_TYPE_ICEBERG:
                 Preconditions.checkState(sinkInfo instanceof IcebergSinkInfo);
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/hive/HiveSinkWithoutPartitionTestCase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/hive/HiveSinkWithoutPartitionTestCase.java
new file mode 100644
index 0000000..1e42a90
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/hive/HiveSinkWithoutPartitionTestCase.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.hive;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.NetUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.flink.hive.HiveWriter;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.TextFileFormat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HiveSinkWithoutPartitionTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(HiveSinkWithoutPartitionTestCase.class);
+
+    private static final CountDownLatch verificationFinishedLatch = new CountDownLatch(1);
+
+    private static final CountDownLatch jobFinishedLatch = new CountDownLatch(1);
+
+    @ClassRule
+    public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    private static final long dataflowId = 9527;
+
+    private final org.apache.inlong.sort.configuration.Configuration config =
+            new org.apache.inlong.sort.configuration.Configuration();
+
+    private FileSystem dfs;
+
+    private String dfsSchema;
+
+    private MiniDFSCluster hdfsCluster;
+
+    private String hdfsDataDir;
+
+    private final String hiveDb = "hive_db";
+
+    private final String hiveTable = "hive_table";
+
+    @Before
+    public void setUp() throws Exception {
+        initializeHdfs();
+    }
+
+    @After
+    public void cleanUp() {
+        showdownHdfs();
+    }
+
+    @Test(timeout = 60000)
+    public void test() throws Exception {
+        config.setLong(Constants.SINK_HIVE_ROLLING_POLICY_CHECK_INTERVAL, 1000L);
+        config.setLong(Constants.SINK_HIVE_ROLLING_POLICY_ROLLOVER_INTERVAL, 1000L);
+        final ExecutorService executorService = Executors.newSingleThreadExecutor();
+        executorService.execute(() -> {
+            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.enableCheckpointing(1000L);
+            env.setRestartStrategy(RestartStrategies.noRestart());
+            env.addSource(new TestingSourceFunction())
+                    .setParallelism(1)
+                    .process(new HiveWriter(config, dataflowId, prepareSinkInfo()))
+                    .setParallelism(1);
+            try {
+                env.execute(); // will block here
+            } catch (Exception e) {
+                LOG.error("Unexpected exception thrown", e);
+            } finally {
+                jobFinishedLatch.countDown();
+            }
+        });
+
+        try {
+            boolean fileVerified = false;
+
+            while (true) {
+                if (!fileVerified) {
+                    fileVerified = verifyHdfsFile();
+                    //noinspection BusyWait
+                    Thread.sleep(1000);
+                } else {
+                    break;
+                }
+            }
+            verificationFinishedLatch.countDown();
+            jobFinishedLatch.await();
+        } finally {
+            executorService.shutdown();
+        }
+    }
+
+    private void initializeHdfs() throws IOException {
+        final File dataDir = tempFolder.newFolder();
+        final UserGroupInformation currentUser = UserGroupInformation.getLoginUser();
+
+        Configuration conf = new Configuration();
+        conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath());
+        conf.set("hadoop.proxyuser." + currentUser.getUserName() + ".groups", "*");
+        conf.set("hadoop.proxyuser." + currentUser.getUserName() + ".hosts", "*");
+
+        MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+        hdfsCluster = builder.build();
+
+        dfs = hdfsCluster.getFileSystem();
+        dfsSchema = "hdfs://" + NetUtils.hostAndPortToUrlString(
+                hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort());
+        hdfsDataDir = "/user/test_user/root_path/" + hiveDb + ".db/" + hiveTable;
+
+        prepareHdfsPath();
+    }
+
+    private void showdownHdfs() {
+        if (hdfsCluster != null) {
+            hdfsCluster.shutdown();
+            hdfsCluster = null;
+        }
+    }
+
+    private void prepareHdfsPath() throws IOException {
+        final Path hdfsRootPath = new Path(hdfsDataDir);
+        assertTrue(dfs.mkdirs(hdfsRootPath));
+        dfs.setPermission(hdfsRootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    }
+
+    private HiveSinkInfo prepareSinkInfo() {
+        return new HiveSinkInfo(
+                new FieldInfo[]{
+                        new FieldInfo("f1", StringFormatInfo.INSTANCE),
+                        new FieldInfo("f2", IntFormatInfo.INSTANCE)
+                },
+                "127.0.0.1:10000",
+                hiveDb,
+                hiveTable,
+                "username",
+                "password",
+                dfsSchema + hdfsDataDir,
+                new HivePartitionInfo[]{},
+                new TextFileFormat("\t".charAt(0))
+        );
+    }
+
+    private boolean verifyHdfsFile() throws IOException {
+        final FileStatus[] fileStatuses = dfs.listStatus(new Path(hdfsDataDir));
+        if (fileStatuses == null || fileStatuses.length < 1) {
+            return false;
+        }
+
+        final List<String> results = new ArrayList<>();
+        for (FileStatus dataFileStatus : fileStatuses) {
+            final Path dataFilePath = dataFileStatus.getPath();
+
+            if (dataFileStatus.getLen() <= 0 || dataFilePath.getName().startsWith(".")) {
+                // File name starts with . means that the file is an in-progress file
+                continue;
+            }
+
+            // Read data files
+            final FSDataInputStream inputStream = dfs.open(dataFilePath);
+            try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+                String line;
+                while ((line = br.readLine()) != null) {
+                    results.add(line);
+                }
+            }
+        }
+
+        // Verify results
+        if (results.size() < 10) {
+            return false;
+        }
+
+        Collections.sort(results);
+        for (int i = 0; i < results.size(); ++i) {
+            assertEquals("name_" + i + "\t" + i, results.get(i));
+        }
+        return true;
+    }
+
+    private static class TestingSourceFunction extends RichSourceFunction<Row> {
+
+        private static final long serialVersionUID = -5969057934139454695L;
+
+        @Override
+        public void run(SourceContext<Row> sourceContext) throws Exception {
+            for (int i = 0; i < 10; i++) {
+                sourceContext.collect(Row.of("name_" + i, i));
+            }
+            verificationFinishedLatch.await();
+        }
+
+        @Override
+        public void cancel() {
+
+        }
+    }
+}