You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/08/23 06:06:09 UTC
[flink-table-store] branch master updated: [FLINK-29071] Fix Table Store Hive CDH support
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new fe57dfa7 [FLINK-29071] Fix Table Store Hive CDH support
fe57dfa7 is described below
commit fe57dfa704117ff9f14dcf39bdb1dcba6e826972
Author: tsreaper <ts...@gmail.com>
AuthorDate: Tue Aug 23 14:05:39 2022 +0800
[FLINK-29071] Fix Table Store Hive CDH support
This closes #271
---
.github/workflows/build-different-versions.yml | 4 +-
docs/content/docs/engines/build.md | 6 +++
docs/content/docs/engines/hive.md | 4 ++
docs/content/docs/engines/overview.md | 1 +
.../table/store/hive/SerializableHiveConf.java | 46 ++++------------------
flink-table-store-hive/pom.xml | 13 ++++++
6 files changed, 34 insertions(+), 40 deletions(-)
diff --git a/.github/workflows/build-different-versions.yml b/.github/workflows/build-different-versions.yml
index dea1f18b..911c98e6 100644
--- a/.github/workflows/build-different-versions.yml
+++ b/.github/workflows/build-different-versions.yml
@@ -16,10 +16,10 @@ jobs:
- name: Build Flink 1.15
run: |
mvn clean install -Dmaven.test.skip=true
- - name: Build Flink 1.15 & Hive 2.2
+ - name: Build Hive 2.2
run: |
mvn clean install -Dmaven.test.skip=true -Phive-2.2 -f flink-table-store-hive
- - name: Build Flink 1.15 & Hive 2.1
+ - name: Build Hive 2.1
run: |
mvn clean install -Dmaven.test.skip=true -Phive-2.1 -f flink-table-store-hive
- name: Build Flink 1.14
diff --git a/docs/content/docs/engines/build.md b/docs/content/docs/engines/build.md
index 57f58bff..b23fba0e 100644
--- a/docs/content/docs/engines/build.md
+++ b/docs/content/docs/engines/build.md
@@ -62,6 +62,12 @@ To build with Hive 2.1, run the following command.
mvn clean install -Dmaven.test.skip=true -Phive-2.1
```
+To build with Hive 2.1 CDH 6.3, run the following command.
+
+```bash
+mvn clean install -Dmaven.test.skip=true -Phive-2.1-cdh-6.3
+```
+
You can find Hive catalog jar in `./flink-table-store-hive/flink-table-store-hive-catalog/target/flink-table-store-hive-catalog-{{< version >}}.jar`.
You can find Hive connector jar in `./flink-table-store-hive/flink-table-store-hive-connector/target/flink-table-store-hive-connector-{{< version >}}.jar`.
diff --git a/docs/content/docs/engines/hive.md b/docs/content/docs/engines/hive.md
index a51feb7e..b96ebf2b 100644
--- a/docs/content/docs/engines/hive.md
+++ b/docs/content/docs/engines/hive.md
@@ -58,6 +58,8 @@ You are using an unreleased version of Table Store. See [Build From Source]({{<
{{< /unstable >}}
+If you're aiming for Hive 2.1 CDH 6.3, see [Build From Source]({{< ref "docs/engines/build" >}}) for more information.
+
To enable Table Store Hive Catalog support in Flink, you can pick one of the following two methods.
* Copy the jar file into the `lib` directory of your Flink installation directory. Note that this must be done before starting your Flink cluster.
* If you're using Flink's SQL client, append `--jar /path/to/flink-table-store-hive-catalog-{{< version >}}.jar` to the starting command of SQL client.
@@ -80,6 +82,8 @@ You are using an unreleased version of Table Store. See [Build From Source]({{<
{{< /unstable >}}
+If you're aiming for Hive 2.1 CDH 6.3, see [Build From Source]({{< ref "docs/engines/build" >}}) for more information.
+
There are several ways to add this jar to Hive.
* You can create an `auxlib` folder under the root directory of Hive, and copy `flink-table-store-hive-connector-{{< version >}}.jar` into `auxlib`.
diff --git a/docs/content/docs/engines/overview.md b/docs/content/docs/engines/overview.md
index 0de71fe2..ec3625a5 100644
--- a/docs/content/docs/engines/overview.md
+++ b/docs/content/docs/engines/overview.md
@@ -37,6 +37,7 @@ Apache Hive and Apache Spark.
| Flink | 1.14 | read, write, create/drop table, create/drop database | Projection, Filter |
| Flink | 1.15 | read, write, create/drop table, create/drop database | Projection, Filter |
| Hive | 2.1 | read | Projection, Filter |
+| Hive | 2.1 CDH 6.3 | read | Projection, Filter |
| Hive | 2.2 | read | Projection, Filter |
| Hive | 2.3 | read | Projection, Filter |
| Spark | 2.4 | read | Projection, Filter |
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java
index 056598f1..1d2d2ebc 100644
--- a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/SerializableHiveConf.java
@@ -20,33 +20,26 @@ package org.apache.flink.table.store.hive;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-/** Wrap {@link JobConf} to a serializable class. */
+/** Wrap {@link HiveConf} to a serializable class. */
public class SerializableHiveConf implements Serializable {
private static final long serialVersionUID = 1L;
- private transient JobConf jobConf;
+ private transient HiveConf conf;
public SerializableHiveConf(HiveConf conf) {
- this.jobConf = createJobConfWithCredentials(conf);
+ this.conf = conf;
}
public HiveConf conf() {
- HiveConf hiveConf = new HiveConf(jobConf, HiveConf.class);
- // to make sure Hive configuration properties in conf not be overridden
- hiveConf.addResource(jobConf);
- return hiveConf;
+ return conf;
}
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -55,7 +48,7 @@ public class SerializableHiveConf implements Serializable {
// we write the jobConf through a separate serializer to avoid cryptic exceptions when it
// corrupts the serialization stream
final DataOutputSerializer ser = new DataOutputSerializer(256);
- jobConf.write(ser);
+ conf.write(ser);
out.writeInt(ser.length());
out.write(ser.getSharedBuffer(), 0, ser.length());
}
@@ -66,36 +59,13 @@ public class SerializableHiveConf implements Serializable {
final byte[] data = new byte[in.readInt()];
in.readFully(data);
final DataInputDeserializer deser = new DataInputDeserializer(data);
- this.jobConf = new JobConf();
+ this.conf = new HiveConf();
try {
- jobConf.readFields(deser);
+ conf.readFields(deser);
} catch (IOException e) {
throw new IOException(
- "Could not deserialize JobConf, the serialized and de-serialized don't match.",
+ "Could not deserialize HiveConf, the serialized and de-serialized don't match.",
e);
}
- Credentials currentUserCreds = UserGroupInformation.getCurrentUser().getCredentials();
- if (currentUserCreds != null) {
- jobConf.getCredentials().addAll(currentUserCreds);
- }
- }
-
- private static void addCredentialsIntoJobConf(JobConf jobConf) {
- UserGroupInformation currentUser;
- try {
- currentUser = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- throw new RuntimeException("Unable to determine current user", e);
- }
- Credentials currentUserCreds = currentUser.getCredentials();
- if (currentUserCreds != null) {
- jobConf.getCredentials().mergeAll(currentUserCreds);
- }
- }
-
- private static JobConf createJobConfWithCredentials(Configuration configuration) {
- JobConf jobConf = new JobConf(configuration);
- addCredentialsIntoJobConf(jobConf);
- return jobConf;
}
}
diff --git a/flink-table-store-hive/pom.xml b/flink-table-store-hive/pom.xml
index 95a0da5a..0c81be0d 100644
--- a/flink-table-store-hive/pom.xml
+++ b/flink-table-store-hive/pom.xml
@@ -59,6 +59,19 @@ under the License.
<hive.version>2.2.0</hive.version>
</properties>
</profile>
+
+ <profile>
+ <id>hive-2.1-cdh-6.3</id>
+ <properties>
+ <hive.version>2.1.1-cdh6.3.4</hive.version>
+ </properties>
+ <repositories>
+ <repository>
+ <id>cloudera</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+ </repository>
+ </repositories>
+ </profile>
</profiles>
</project>
\ No newline at end of file