You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2022/02/23 02:59:33 UTC
[incubator-inlong] branch master updated: [INLONG-2634][Sort]Support CHDFS filesystem when using hive sink (#2648)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 50eaafd [INLONG-2634][Sort]Support CHDFS filesystem when using hive sink (#2648)
50eaafd is described below
commit 50eaafdd528de6db47c678b9f5ca2fd46095662b
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Wed Feb 23 10:59:28 2022 +0800
[INLONG-2634][Sort]Support CHDFS filesystem when using hive sink (#2648)
Co-authored-by: tianqiwan <ti...@tencent.com>
---
.../inlong/sort/configuration/Constants.java | 2 +
inlong-sort/sort-single-tenant/pom.xml | 6 ++
.../flink/hive/filesystems/CHDFSFsFactory.java | 111 +++++++++++++++++++++
.../org.apache.flink.core.fs.FileSystemFactory | 16 +++
4 files changed, 135 insertions(+)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index c6e8067..4e43e27 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -336,4 +336,6 @@ public class Constants {
public static final ConfigOption<Integer> ORC_SINK_BATCH_SIZE =
key(HIVE_SINK_PREFIX + "orc.row.batch.size")
.defaultValue(64);
+
+ public static final String CHDFS_CONFIG_PREFIX = "fs.ofs.";
}
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index 200e03e..dc8025b 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -130,6 +130,12 @@
<artifactId>inlong-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.qcloud</groupId>
+ <artifactId>chdfs_hadoop_plugin_network</artifactId>
+ <version>2.5</version>
+ </dependency>
+
<!-- test -->
<dependency>
<groupId>org.apache.kafka</groupId>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/hive/filesystems/CHDFSFsFactory.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/hive/filesystems/CHDFSFsFactory.java
new file mode 100644
index 0000000..5b72d6b
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/hive/filesystems/CHDFSFsFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.hive.filesystems;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter;
+import java.io.IOException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.inlong.sort.configuration.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CHDFSFsFactory implements FileSystemFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CHDFSFsFactory.class);
+
+ /**
+ * Flink's configuration object.
+ */
+ private Configuration flinkConfig;
+
+ /**
+ * Hadoop's configuration for the file systems.
+ */
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ @Override
+ public String getScheme() {
+ return "ofs";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ flinkConfig = config;
+ hadoopConfig = null; // reset the Hadoop Config
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ checkNotNull(fsUri);
+ final String scheme = fsUri.getScheme();
+ checkArgument(scheme != null, "file system has null scheme");
+
+ try {
+ // -- (1) get the loaded Hadoop config
+ final org.apache.hadoop.conf.Configuration hadoopConfig;
+ if (this.hadoopConfig != null) {
+ hadoopConfig = this.hadoopConfig;
+ } else if (flinkConfig != null) {
+ hadoopConfig = new org.apache.hadoop.conf.Configuration();
+ for (String key : this.flinkConfig.keySet()) {
+ if (key.startsWith(Constants.CHDFS_CONFIG_PREFIX)) {
+ String value = this.flinkConfig.getString(key, null);
+ hadoopConfig.set(key, value);
+ LOG.debug("Adding Flink config entry for {} as {} to Hadoop config",
+ key, hadoopConfig.get(key));
+ }
+ }
+ } else {
+ LOG.warn("Hadoop configuration has not been explicitly initialized prior to loading"
+ + " a Hadoop file system. Using configuration from the classpath.");
+
+ hadoopConfig = new org.apache.hadoop.conf.Configuration();
+ }
+
+ // -- (2) instantiate the Hadoop file system
+ final org.apache.hadoop.fs.FileSystem hadoopFs = new CHDFSHadoopFileSystemAdapter();
+
+ // -- (3) configure the Hadoop file system
+ try {
+ hadoopFs.initialize(fsUri, hadoopConfig);
+ } catch (UnknownHostException e) {
+ String message =
+ "The Hadoop file system's authority ("
+ + fsUri.getAuthority()
+ + "), specified by either the file URI or the configuration, cannot be resolved.";
+
+ throw new IOException(message, e);
+ }
+
+ return new HadoopFileSystem(hadoopFs);
+
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Cannot instantiate file system for URI: " + fsUri, e);
+ }
+ }
+}
diff --git a/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..9b55f09
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.singletenant.flink.hive.filesystems.CHDFSFsFactory