You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/08/23 06:06:09 UTC

[flink-table-store] branch master updated: [FLINK-29071] Fix Table Store Hive CDH support

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new fe57dfa7 [FLINK-29071] Fix Table Store Hive CDH support
fe57dfa7 is described below

commit fe57dfa704117ff9f14dcf39bdb1dcba6e826972
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Aug 23 14:05:39 2022 +0800

    [FLINK-29071] Fix Table Store Hive CDH support
    
    This closes #271
---
 .github/workflows/build-different-versions.yml     |  4 +-
 docs/content/docs/engines/build.md                 |  6 +++
 docs/content/docs/engines/hive.md                  |  4 ++
 docs/content/docs/engines/overview.md              |  1 +
 .../table/store/hive/SerializableHiveConf.java     | 46 ++++------------------
 flink-table-store-hive/pom.xml                     | 13 ++++++
 6 files changed, 34 insertions(+), 40 deletions(-)

diff --git a/.github/workflows/build-different-versions.yml b/.github/workflows/build-different-versions.yml
index dea1f18b..911c98e6 100644
--- a/.github/workflows/build-different-versions.yml
+++ b/.github/workflows/build-different-versions.yml
@@ -16,10 +16,10 @@ jobs:
       - name: Build Flink 1.15
         run: |
           mvn clean install -Dmaven.test.skip=true
-      - name: Build Flink 1.15 & Hive 2.2
+      - name: Build Hive 2.2
         run: |
           mvn clean install -Dmaven.test.skip=true -Phive-2.2 -f flink-table-store-hive
-      - name: Build Flink 1.15 & Hive 2.1
+      - name: Build Hive 2.1
         run: |
           mvn clean install -Dmaven.test.skip=true -Phive-2.1 -f flink-table-store-hive
       - name: Build Flink 1.14
diff --git a/docs/content/docs/engines/build.md b/docs/content/docs/engines/build.md
index 57f58bff..b23fba0e 100644
--- a/docs/content/docs/engines/build.md
+++ b/docs/content/docs/engines/build.md
@@ -62,6 +62,12 @@ To build with Hive 2.1, run the following command.
 mvn clean install -Dmaven.test.skip=true -Phive-2.1
 ```
 
+To build with Hive 2.1 CDH 6.3, run the following command.
+
+```bash
+mvn clean install -Dmaven.test.skip=true -Phive-2.1-cdh-6.3
+```
+
 You can find Hive catalog jar in `./flink-table-store-hive/flink-table-store-hive-catalog/target/flink-table-store-hive-catalog-{{< version >}}.jar`. 
 
 You can find Hive connector jar in `./flink-table-store-hive/flink-table-store-hive-connector/target/flink-table-store-hive-connector-{{< version >}}.jar`.
diff --git a/docs/content/docs/engines/hive.md b/docs/content/docs/engines/hive.md
index a51feb7e..b96ebf2b 100644
--- a/docs/content/docs/engines/hive.md
+++ b/docs/content/docs/engines/hive.md
@@ -58,6 +58,8 @@ You are using an unreleased version of Table Store. See [Build From Source]({{<
 
 {{< /unstable >}}
 
+If you're aiming for Hive 2.1 CDH 6.3, see [Build From Source]({{< ref "docs/engines/build" >}}) for more information.
+
 To enable Table Store Hive Catalog support in Flink, you can pick one of the following two methods.
 * Copy the jar file into the `lib` directory of your Flink installation directory. Note that this must be done before starting your Flink cluster.
 * If you're using Flink's SQL client, append `--jar /path/to/flink-table-store-hive-catalog-{{< version >}}.jar` to the starting command of SQL client.
@@ -80,6 +82,8 @@ You are using an unreleased version of Table Store. See [Build From Source]({{<
 
 {{< /unstable >}}
 
+If you're aiming for Hive 2.1 CDH 6.3, see [Build From Source]({{< ref "docs/engines/build" >}}) for more information.
+
 There are several ways to add this jar to Hive.
 
 * You can create an `auxlib` folder under the root directory of Hive, and copy `flink-table-store-hive-connector-{{< version >}}.jar` into `auxlib`.
diff --git a/docs/content/docs/engines/overview.md b/docs/content/docs/engines/overview.md
index 0de71fe2..ec3625a5 100644
--- a/docs/content/docs/engines/overview.md
+++ b/docs/content/docs/engines/overview.md
@@ -37,6 +37,7 @@ Apache Hive and Apache Spark.
 | Flink     | 1.14     | read, write, create/drop table, create/drop database | Projection, Filter |
 | Flink     | 1.15     | read, write, create/drop table, create/drop database | Projection, Filter |
 | Hive      | 2.1      | read                                                 | Projection, Filter |
+| Hive      | 2.1 CDH 6.3 | read                                                 | Projection, Filter |
 | Hive      | 2.2      | read                                                 | Projection, Filter |
 | Hive      | 2.3      | read                                                 | Projection, Filter |
 | Spark     | 2.4      | read                                                 | Projection, Filter |
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java
index 056598f1..1d2d2ebc 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java
@@ -20,33 +20,26 @@ package org.apache.flink.table.store.hive;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
-/** Wrap {@link JobConf} to a serializable class. */
+/** Wrap {@link HiveConf} to a serializable class. */
 public class SerializableHiveConf implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    private transient JobConf jobConf;
+    private transient HiveConf conf;
 
     public SerializableHiveConf(HiveConf conf) {
-        this.jobConf = createJobConfWithCredentials(conf);
+        this.conf = conf;
     }
 
     public HiveConf conf() {
-        HiveConf hiveConf = new HiveConf(jobConf, HiveConf.class);
-        // to make sure Hive configuration properties in conf not be overridden
-        hiveConf.addResource(jobConf);
-        return hiveConf;
+        return conf;
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -55,7 +48,7 @@ public class SerializableHiveConf implements Serializable {
         // we write the jobConf through a separate serializer to avoid cryptic exceptions when it
         // corrupts the serialization stream
         final DataOutputSerializer ser = new DataOutputSerializer(256);
-        jobConf.write(ser);
+        conf.write(ser);
         out.writeInt(ser.length());
         out.write(ser.getSharedBuffer(), 0, ser.length());
     }
@@ -66,36 +59,13 @@ public class SerializableHiveConf implements Serializable {
         final byte[] data = new byte[in.readInt()];
         in.readFully(data);
         final DataInputDeserializer deser = new DataInputDeserializer(data);
-        this.jobConf = new JobConf();
+        this.conf = new HiveConf();
         try {
-            jobConf.readFields(deser);
+            conf.readFields(deser);
         } catch (IOException e) {
             throw new IOException(
-                    "Could not deserialize JobConf, the serialized and de-serialized don't match.",
+                    "Could not deserialize HiveConf, the serialized and de-serialized don't match.",
                     e);
         }
-        Credentials currentUserCreds = UserGroupInformation.getCurrentUser().getCredentials();
-        if (currentUserCreds != null) {
-            jobConf.getCredentials().addAll(currentUserCreds);
-        }
-    }
-
-    private static void addCredentialsIntoJobConf(JobConf jobConf) {
-        UserGroupInformation currentUser;
-        try {
-            currentUser = UserGroupInformation.getCurrentUser();
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to determine current user", e);
-        }
-        Credentials currentUserCreds = currentUser.getCredentials();
-        if (currentUserCreds != null) {
-            jobConf.getCredentials().mergeAll(currentUserCreds);
-        }
-    }
-
-    private static JobConf createJobConfWithCredentials(Configuration configuration) {
-        JobConf jobConf = new JobConf(configuration);
-        addCredentialsIntoJobConf(jobConf);
-        return jobConf;
     }
 }
diff --git a/flink-table-store-hive/pom.xml b/flink-table-store-hive/pom.xml
index 95a0da5a..0c81be0d 100644
--- a/flink-table-store-hive/pom.xml
+++ b/flink-table-store-hive/pom.xml
@@ -59,6 +59,19 @@ under the License.
                 <hive.version>2.2.0</hive.version>
             </properties>
         </profile>
+
+        <profile>
+            <id>hive-2.1-cdh-6.3</id>
+            <properties>
+                <hive.version>2.1.1-cdh6.3.4</hive.version>
+            </properties>
+            <repositories>
+                <repository>
+                    <id>cloudera</id>
+                    <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+                </repository>
+            </repositories>
+        </profile>
     </profiles>
 
 </project>
\ No newline at end of file