You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/02/23 02:59:33 UTC

[incubator-inlong] branch master updated: [INLONG-2634][Sort]Support CHDFS filesystem when using hive sink (#2648)

This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 50eaafd  [INLONG-2634][Sort]Support CHDFS filesystem when using hive sink (#2648)
50eaafd is described below

commit 50eaafdd528de6db47c678b9f5ca2fd46095662b
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Wed Feb 23 10:59:28 2022 +0800

    [INLONG-2634][Sort]Support CHDFS filesystem when using hive sink (#2648)
    
    Co-authored-by: tianqiwan <ti...@tencent.com>
---
 .../inlong/sort/configuration/Constants.java       |   2 +
 inlong-sort/sort-single-tenant/pom.xml             |   6 ++
 .../flink/hive/filesystems/CHDFSFsFactory.java     | 111 +++++++++++++++++++++
 .../org.apache.flink.core.fs.FileSystemFactory     |  16 +++
 4 files changed, 135 insertions(+)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index c6e8067..4e43e27 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -336,4 +336,6 @@ public class Constants {
     public static final ConfigOption<Integer> ORC_SINK_BATCH_SIZE =
             key(HIVE_SINK_PREFIX + "orc.row.batch.size")
                     .defaultValue(64);
+
+    public static final String CHDFS_CONFIG_PREFIX = "fs.ofs.";
 }
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 200e03e..dc8025b 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -130,6 +130,12 @@
             <artifactId>inlong-common</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.qcloud</groupId>
+            <artifactId>chdfs_hadoop_plugin_network</artifactId>
+            <version>2.5</version>
+        </dependency>
+
         <!-- test -->
         <dependency>
             <groupId>org.apache.kafka</groupId>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/hive/filesystems/CHDFSFsFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/hive/filesystems/CHDFSFsFactory.java
new file mode 100644
index 0000000..5b72d6b
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/hive/filesystems/CHDFSFsFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.hive.filesystems;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter;
+import java.io.IOException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.inlong.sort.configuration.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CHDFSFsFactory implements FileSystemFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CHDFSFsFactory.class);
+
+    /**
+     * Flink's configuration object.
+     */
+    private Configuration flinkConfig;
+
+    /**
+     * Hadoop's configuration for the file systems.
+     */
+    private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+    @Override
+    public String getScheme() {
+        return "ofs";
+    }
+
+    @Override
+    public void configure(Configuration config) {
+        flinkConfig = config;
+        hadoopConfig = null; // reset the Hadoop Config
+    }
+
+    @Override
+    public FileSystem create(URI fsUri) throws IOException {
+        checkNotNull(fsUri);
+        final String scheme = fsUri.getScheme();
+        checkArgument(scheme != null, "file system has null scheme");
+
+        try {
+            // -- (1) get the loaded Hadoop config
+            final org.apache.hadoop.conf.Configuration hadoopConfig;
+            if (this.hadoopConfig != null) {
+                hadoopConfig = this.hadoopConfig;
+            } else if (flinkConfig != null) {
+                hadoopConfig = new org.apache.hadoop.conf.Configuration();
+                for (String key : this.flinkConfig.keySet()) {
+                    if (key.startsWith(Constants.CHDFS_CONFIG_PREFIX)) {
+                        String value = this.flinkConfig.getString(key, null);
+                        hadoopConfig.set(key, value);
+                        LOG.debug("Adding Flink config entry for {} as {} to Hadoop config",
+                                key, hadoopConfig.get(key));
+                    }
+                }
+            } else {
+                LOG.warn("Hadoop configuration has not been explicitly initialized prior to loading"
+                        + " a Hadoop file system. Using configuration from the classpath.");
+
+                hadoopConfig = new org.apache.hadoop.conf.Configuration();
+            }
+
+            // -- (2) instantiate the Hadoop file system
+            final org.apache.hadoop.fs.FileSystem hadoopFs = new CHDFSHadoopFileSystemAdapter();
+
+            // -- (3) configure the Hadoop file system
+            try {
+                hadoopFs.initialize(fsUri, hadoopConfig);
+            } catch (UnknownHostException e) {
+                String message =
+                        "The Hadoop file system's authority ("
+                                + fsUri.getAuthority()
+                                + "), specified by either the file URI or the configuration, cannot be resolved.";
+
+                throw new IOException(message, e);
+            }
+
+            return new HadoopFileSystem(hadoopFs);
+
+        } catch (IOException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new IOException("Cannot instantiate file system for URI: " + fsUri, e);
+        }
+    }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..9b55f09
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.singletenant.flink.hive.filesystems.CHDFSFsFactory