You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/28 07:57:06 UTC

[incubator-streampark-website] branch dev updated: update hadoop-resource-integration (#148)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark-website.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5047aa90 update hadoop-resource-integration (#148)
5047aa90 is described below

commit 5047aa90328a1b2ca54e9742071114703f227666
Author: WHJian <le...@126.com>
AuthorDate: Wed Sep 28 15:57:01 2022 +0800

    update hadoop-resource-integration (#148)
    
    Co-authored-by: hujian.wang <hu...@garena.com>
---
 docs/flink-k8s/3-hadoop-resource-integration.md    | 171 ++++++++++++++++++++-
 .../flink-k8s/3-hadoop-resource-integration.md     | 171 ++++++++++++++++++++-
 2 files changed, 328 insertions(+), 14 deletions(-)

diff --git a/docs/flink-k8s/3-hadoop-resource-integration.md b/docs/flink-k8s/3-hadoop-resource-integration.md
index fb560255..e498a5e1 100644
--- a/docs/flink-k8s/3-hadoop-resource-integration.md
+++ b/docs/flink-k8s/3-hadoop-resource-integration.md
@@ -4,15 +4,172 @@ title: 'Hadoop resource integration'
 sidebar_position: 3
 ---
 
-## Using Hadoop resource in K8s
+## Using Hadoop resource in Flink on K8s
 
+Using Hadoop resources under the StreamPark Flink-K8s runtime, such as checkpoint mount HDFS, read and write Hive, etc. The general process is as follows:
 
-To use Hadoop resource  in StreamPark Flink-K8s runtime such as write/read hive、mount checkpoint at HDFS and son on,  User should make Flink Base Docker Image which contains:
+#### 1、HDFS
+
+​       To put flink on k8s related resources in HDFS, you need to go through the following two steps:
+
+##### i、add `shade jar`
+
+​            By default, the flink image pulled from Docker does not include hadoop-related jars. Here, flink:1.14.5-scala_2.12-java8 is taken as an example, as follows:
+
+```shell
+[flink@ff]  /opt/flink-1.14.5/lib
+$ ls
+flink-csv-1.14.5.jar        flink-shaded-zookeeper-3.4.14.jar  log4j-api-2.17.1.jar
+flink-dist_2.12-1.14.5.jar  flink-table_2.12-1.14.5.jar        log4j-core-2.17.1.jar
+flink-json-1.14.5.jar       log4j-1.2-api-2.17.1.jar           log4j-slf4j-impl-2.17.1.jar
+```
+
+​         This is to download the shaded jar and put it in the lib directory of flink. Take hadoop2 as an example, download `flink-shaded-hadoop-2-uber`:https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-9.0/flink-shaded-hadoop-2-uber-2.7.5-9.0.jar
+
+​	In addition, you can configure the shade jar in a dependent manner in the `Dependency` in the StreamPark task configuration. the following configuration:
+
+```xml
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-shaded-hadoop-2-uber</artifactId>
+    <version>2.7.5-9.0</version>
+    <scope>provided</scope>
+</dependency>
+```
+
+##### ii、add `core-site.xml` and `hdfs-site.xml`
+
+​            With the shade jar, you also need the corresponding configuration file to find the hadoop address. Two configuration files are mainly involved here: `core-site.xml` and `hdfs-site.xml`, through the source code analysis of flink (the classes involved are mainly: org .apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters), the two files have a fixed loading order, as follows:
+
+```java
+// The process of finding hadoop configuration files:
+// 1、Find out whether parameters have been added:${kubernetes.hadoop.conf.config-map.name}
+@Override
+public Optional<String> getExistingHadoopConfigurationConfigMap() {
+    final String existingHadoopConfigMap =
+            flinkConfig.getString(KubernetesConfigOptions.HADOOP_CONF_CONFIG_MAP);
+    if (StringUtils.isBlank(existingHadoopConfigMap)) {
+        return Optional.empty();
+    } else {
+        return Optional.of(existingHadoopConfigMap.trim());
+    }
+}
+
+@Override
+public Optional<String> getLocalHadoopConfigurationDirectory() {
+    // 2、If there is no parameter specified in "1", find out whether the local environment where the native command is submitted has environment variables:${HADOOP_CONF_DIR}
+    final String hadoopConfDirEnv = System.getenv(Constants.ENV_HADOOP_CONF_DIR);
+    if (StringUtils.isNotBlank(hadoopConfDirEnv)) {
+        return Optional.of(hadoopConfDirEnv);
+    }
+    // 3、If there are no "2" environment variables, continue to see if there are environment variables:${HADOOP_HOME}
+    final String hadoopHomeEnv = System.getenv(Constants.ENV_HADOOP_HOME);
+    if (StringUtils.isNotBlank(hadoopHomeEnv)) {
+        // Hadoop 2.x
+        final File hadoop2ConfDir = new File(hadoopHomeEnv, "/etc/hadoop");
+        if (hadoop2ConfDir.exists()) {
+            return Optional.of(hadoop2ConfDir.getAbsolutePath());
+        }
+
+        // Hadoop 1.x
+        final File hadoop1ConfDir = new File(hadoopHomeEnv, "/conf");
+        if (hadoop1ConfDir.exists()) {
+            return Optional.of(hadoop1ConfDir.getAbsolutePath());
+        }
+    }
+
+    return Optional.empty();
+}
+
+final List<File> hadoopConfigurationFileItems = getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
+// If "1", "2", and "3" are not found, there is no hadoop environment
+if (hadoopConfigurationFileItems.isEmpty()) {
+    LOG.warn("Found 0 files in directory {}, skip to mount the Hadoop Configuration ConfigMap.", localHadoopConfigurationDirectory.get());
+    return flinkPod;
+}
+// If "2" or "3" exists, it will look for the core-site.xml and hdfs-site.xml files in the path where the above environment variables are located
+private List<File> getHadoopConfigurationFileItems(String localHadoopConfigurationDirectory) {
+    final List<String> expectedFileNames = new ArrayList<>();
+    expectedFileNames.add("core-site.xml");
+    expectedFileNames.add("hdfs-site.xml");
+
+    final File directory = new File(localHadoopConfigurationDirectory);
+    if (directory.exists() && directory.isDirectory()) {
+        return Arrays.stream(directory.listFiles())
+                .filter(
+                        file ->
+                                file.isFile()
+                                        && expectedFileNames.stream()
+                                                .anyMatch(name -> file.getName().equals(name)))
+                .collect(Collectors.toList());
+    } else {
+        return Collections.emptyList();
+    }
+}
+// If the above files are found, it means that there is a hadoop environment. The above two files will be parsed into kv pairs, and then constructed into a ConfigMap. The naming rules are as follows:
+public static String getHadoopConfConfigMapName(String clusterId) {
+    return Constants.HADOOP_CONF_CONFIG_MAP_PREFIX + clusterId;
+}
+```
+
+
+
+#### 2、Hive
+
+​        To sink data to hive, or use hive metastore as flink's metadata, it is necessary to open the path from flink to hive, which also needs to go through the following two steps:
+
+##### i、Add hive related jars
+
+​	     As mentioned above, the default flink image does not include hive-related jars. The following three hive-related jars need to be placed in the lib directory of flink. Here, hive version 2.3.6 is used as an example:
+
+​                a、`hive-exec`:https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.6/hive-exec-2.3.6.jar
+
+​                b、`flink-connector-hive`:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.14.5/flink-connector-hive_2.12-1.14.5.jar
+
+​                c、`flink-sql-connector-hive`:https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.14.5/flink-sql-connector-hive-2.3.6_2.12-1.14.5.jar
+
+​            Similarly, the above-mentioned hive-related jars can also be dependently configured in the `Dependency` in the task configuration of StreamPark in a dependent manner, which will not be repeated here.
+
+##### ii、Add hive configuration file (hive-site.xml)
+
+​	       The difference from hdfs is that there is no default loading method for the hive configuration file in the flink source code, so developers need to manually add the hive configuration file. There are three main methods here:
+
+​                  a. Put hive-site.xml in the custom image of flink, it is generally recommended to put it under the `/opt/flink/` directory in the image
+
+​                  b. Put hive-site.xml behind the remote storage system, such as HDFS, and load it when it is used
+
+​                  c. Mount hive-site.xml in k8s in the form of ConfigMap. It is recommended to use this method, as follows:
+
+```shell
+# 1、Mount the hive-site.xml at the specified location in the specified namespace
+kubectl create cm hive-conf --from-file=hive-site.xml -n flink-test
+# 2、View the hive-site.xml mounted to k8s
+kubectl describe cm hive-conf -n flink-test 
+# 3、Mount this cm to the specified directory inside the container
+spec:
+  containers:
+    - name: flink-main-container
+      volumeMounts:
+        - mountPath: /opt/flink/hive
+          name: hive-conf
+  volumes:
+    - name: hive-conf
+      configMap:
+        name: hive-conf
+        items:
+          - key: hive-site.xml
+            path: hive-site.xml
+```
+
+
+
+#### Conclusion
+
+​        Through the above method, flink can be connected with hadoop and hive. This method can be extended to general, that is, flink and external systems such as redis, mongo, etc., generally require the following two steps:
+
+​        i. Load the connector jar of the specified external service
+
+​        ii. If there is, load the specified configuration file into the flink system
 
-*  Hadoop Lib, which is set as  `HADOOP_CLASSPATH`;
-*  Hadoop Config,which is set as  `HADOOP_CONF_DIR`;
-*  Hive Config if used;
-<br/>
 
-In next version, automatic integration for hadoop will be supported.
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/flink-k8s/3-hadoop-resource-integration.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/flink-k8s/3-hadoop-resource-integration.md
index 55581bd9..2380be89 100755
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/flink-k8s/3-hadoop-resource-integration.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/flink-k8s/3-hadoop-resource-integration.md
@@ -4,15 +4,172 @@ title: 'Hadoop 资源集成'
 sidebar_position: 3
 ---
 
-## 在 K8s 上使用 Hadoop 资源
+## 在 Flink on K8s 上使用 Hadoop 资源
 
-在 StreamPark Flink-K8s runtime 下使用 Hadoop 资源,如 checkpoint 挂载 HDFS、读写 Hive 等,目前用户需要自行构建相关 Flink Base   Docker Image,Image 中需要包含以下内容:
+在 StreamPark Flink-K8s runtime 下使用 Hadoop 资源,如 checkpoint 挂载 HDFS、读写 Hive 等,大概流程如下:
 
-* 包含 Hadoop Lib, 并设置 `HADOOP_CLASSPATH` 到该目录;
-* 包含 Hadoop Config,并设置 `HADOOP_CONF_DIR` 到该目录;
-* 如果使用 Hive, 需要包含 Hive Config;
+#### 1、HDFS
+
+​       如需将 flink on k8s 相关资源放在 HDFS 中,需要经过以下两个步骤:
+
+##### i、添加 `shade jar`
+
+​           默认情况下,从 Docker 上 pull 的 flink 镜像是不包括 hadoop 相关的 jar,这里以 flink:1.14.5-scala_2.12-java8 为例,如下:
+
+```shell
+[flink@ff]  /opt/flink-1.14.5/lib
+$ ls
+flink-csv-1.14.5.jar        flink-shaded-zookeeper-3.4.14.jar  log4j-api-2.17.1.jar
+flink-dist_2.12-1.14.5.jar  flink-table_2.12-1.14.5.jar        log4j-core-2.17.1.jar
+flink-json-1.14.5.jar       log4j-1.2-api-2.17.1.jar           log4j-slf4j-impl-2.17.1.jar
+```
+
+​         这是需要将 shade jar 下载下来,然后放在 flink 的 lib 目录下,这里 以hadoop2 为例,下载 `flink-shaded-hadoop-2-uber`:https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-9.0/flink-shaded-hadoop-2-uber-2.7.5-9.0.jar
+
+​	另外,可以将 shade jar 以依赖的方式在 StreamPark 的任务配置中的`Dependency` 进行依赖配置,如下配置:
+
+```xml
+<dependency>
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-shaded-hadoop-2-uber</artifactId>
+    <version>2.7.5-9.0</version>
+    <scope>provided</scope>
+</dependency>
+```
+
+##### ii、添加 core-site.xml 和 hdfs-site.xml
+
+​            有了 shade jar 还需要相应的配置文件去找到 hadoop 地址,这里主要涉及到两个配置文件:core-site.xml和hdfs-site.xml,通过 flink 的源码分析(涉及到的类主要是:org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters),该两文件有固定的加载顺序,如下:
+
+```java
+// 寻找 hadoop 配置文件的流程
+// 1、先去寻在是否添加了参数:kubernetes.hadoop.conf.config-map.name
+@Override
+public Optional<String> getExistingHadoopConfigurationConfigMap() {
+    final String existingHadoopConfigMap =
+            flinkConfig.getString(KubernetesConfigOptions.HADOOP_CONF_CONFIG_MAP);
+    if (StringUtils.isBlank(existingHadoopConfigMap)) {
+        return Optional.empty();
+    } else {
+        return Optional.of(existingHadoopConfigMap.trim());
+    }
+}
+
+@Override
+public Optional<String> getLocalHadoopConfigurationDirectory() {
+    // 2、如果没有1中指定的参数,查找提交 native 命令的本地环境是否有环境变量:HADOOP_CONF_DIR
+    final String hadoopConfDirEnv = System.getenv(Constants.ENV_HADOOP_CONF_DIR);
+    if (StringUtils.isNotBlank(hadoopConfDirEnv)) {
+        return Optional.of(hadoopConfDirEnv);
+    }
+    // 3、如果没有2中环境变量,再继续看是否有环境变量:HADOOP_HOME
+    final String hadoopHomeEnv = System.getenv(Constants.ENV_HADOOP_HOME);
+    if (StringUtils.isNotBlank(hadoopHomeEnv)) {
+        // Hadoop 2.x
+        final File hadoop2ConfDir = new File(hadoopHomeEnv, "/etc/hadoop");
+        if (hadoop2ConfDir.exists()) {
+            return Optional.of(hadoop2ConfDir.getAbsolutePath());
+        }
+
+        // Hadoop 1.x
+        final File hadoop1ConfDir = new File(hadoopHomeEnv, "/conf");
+        if (hadoop1ConfDir.exists()) {
+            return Optional.of(hadoop1ConfDir.getAbsolutePath());
+        }
+    }
+
+    return Optional.empty();
+}
+
+final List<File> hadoopConfigurationFileItems = getHadoopConfigurationFileItems(localHadoopConfigurationDirectory.get());
+// 如果没有找到1、2、3说明没有 hadoop 环境
+if (hadoopConfigurationFileItems.isEmpty()) {
+    LOG.warn("Found 0 files in directory {}, skip to mount the Hadoop Configuration ConfigMap.", localHadoopConfigurationDirectory.get());
+    return flinkPod;
+}
+//如果2或者3存在,会在路径下查找 core-site.xml 和 hdfs-site.xml 文件
+private List<File> getHadoopConfigurationFileItems(String localHadoopConfigurationDirectory) {
+    final List<String> expectedFileNames = new ArrayList<>();
+    expectedFileNames.add("core-site.xml");
+    expectedFileNames.add("hdfs-site.xml");
+
+    final File directory = new File(localHadoopConfigurationDirectory);
+    if (directory.exists() && directory.isDirectory()) {
+        return Arrays.stream(directory.listFiles())
+                .filter(
+                        file ->
+                                file.isFile()
+                                        && expectedFileNames.stream()
+                                                .anyMatch(name -> file.getName().equals(name)))
+                .collect(Collectors.toList());
+    } else {
+        return Collections.emptyList();
+    }
+}
+// 如果找到上述文件,说明有 hadoop 的环境,将会把上述两个文件解析为 kv 对,然后构建成一个 ConfigMap,名字命名规则如下:
+public static String getHadoopConfConfigMapName(String clusterId) {
+    return Constants.HADOOP_CONF_CONFIG_MAP_PREFIX + clusterId;
+}
+```
+
+
+
+#### 2、Hive
+
+​        将数据 sink 到 hive,或者以 hive 的 metastore 作为 flink 的元数据,都需要打通 flink 到 hive 的路径,同样需要经过一下两个步骤:
+
+##### i、添加 hive 相关的 jar
+
+​	     如上所述,默认 flink 镜像是不包括 hive 相关的 jar,需要将 hive 相关的如下三个 jar 放在 flink 的 lib 目录下,这里以 hive 2.3.6 版本为例:
+
+​                a、`hive-exec`:https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.6/hive-exec-2.3.6.jar
+
+​                b、`flink-connector-hive`:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.14.5/flink-connector-hive_2.12-1.14.5.jar
+
+​                c、`flink-sql-connector-hive`:https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.14.5/flink-sql-connector-hive-2.3.6_2.12-1.14.5.jar
+
+​               同样,也可以将上述 hive 相关 jar 以依赖的方式在 StreamPark 的任务配置中的`Dependency` 进行依赖配置,这里不再赘述。
+
+##### ii、添加 hive 的配置文件(hive-site.xml)
+
+​	       和 hdfs 所不同的是,flink 源码中并没有 hive 的配置文件的默认的加载方式,因此需要开发者手动添加 hive 的配置文件,这里主要采用三种方式:
+
+​		a、将 hive-site.xml 打在 flink 的自定义镜像之中,一般建议放在镜像里的`/opt/flink/`目录之下
+
+​		b、将 hive-site.xml 放在远端的存储系统之后,例如 HDFS,在使用的时候进行加载
+
+​		c、将 hive-site.xml 以 ConfigMap 的形式挂载在 k8s 之中,建议使用此种方式,如下:
+
+```shell
+# 1、在指定的 ns 中挂载指定位置的 hive-site.xml
+kubectl create cm hive-conf --from-file=hive-site.xml -n flink-test
+# 2、查看挂载到 k8s 中的 hive-site.xml
+kubectl describe cm hive-conf -n flink-test 
+# 3、将此 cm 挂载到容器内指定的目录
+spec:
+  containers:
+    - name: flink-main-container
+      volumeMounts:
+        - mountPath: /opt/flink/hive
+          name: hive-conf
+  volumes:
+    - name: hive-conf
+      configMap:
+        name: hive-conf
+        items:
+          - key: hive-site.xml
+            path: hive-site.xml
+```
+
+
+
+#### 总结
+
+​        通过以上的方式便可以将 flink 和 hadoop、hive 打通,此方法可推广至一般,即 flink 与外部系统如redis、mongo 等连通,一般需要如下两个步骤:
+
+​        i、加载指定外部服务的 connector jar
+
+​	ii、如果有,加载指定的配置文件到 flink 系统之中
 
-<br/>
 
-这其实挺不优雅的 🥲,我们将在随后的版本里支持**自动集成 Hadoop** 的功能支持, Plz look forward to !