You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/30 09:05:15 UTC
[flink-table-store] branch master updated: [FLINK-28067] Introduce HiveCatalog
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new cb688c5b [FLINK-28067] Introduce HiveCatalog
cb688c5b is described below
commit cb688c5bed4e02ceff608d85a6578a73f4fe2c1f
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Jun 30 17:05:10 2022 +0800
[FLINK-28067] Introduce HiveCatalog
This closes #181
---
.../flink/table/store/connector/FlinkCatalog.java | 12 +-
.../table/store/connector/FlinkCatalogFactory.java | 82 ++--
.../table/store/connector/FlinkCatalogTest.java | 5 +-
.../table/store/file/catalog/AbstractCatalog.java | 39 ++
.../flink/table/store/file/catalog/Catalog.java | 5 -
.../table/store/file/catalog/CatalogFactory.java | 29 ++
.../store/file/catalog/FileSystemCatalog.java | 34 +-
.../file/catalog/FileSystemCatalogFactory.java | 38 ++
...e.flink.table.store.file.catalog.CatalogFactory | 16 +
flink-table-store-dist/pom.xml | 7 +
flink-table-store-e2e-tests/pom.xml | 6 +-
.../flink/table/store/tests/E2eTestBase.java | 5 +-
.../flink/table/store/tests/HiveE2eTest.java | 61 ++-
.../test/resources-filtered/docker-compose.yaml | 8 +-
.../{ => flink-table-store-hive-catalog}/pom.xml | 141 ++++---
.../apache/flink/table/store/hive/HiveCatalog.java | 356 ++++++++++++++++
.../flink/table/store/hive/HiveCatalogFactory.java | 52 +++
...e.flink.table.store.file.catalog.CatalogFactory | 16 +
.../flink/table/store/hive/HiveCatalogITCase.java | 235 +++++++++++
.../flink-table-store-hive-common/pom.xml | 77 ++++
.../flink/table/store/hive/HiveTypeUtils.java | 55 ---
.../{ => flink-table-store-hive-connector}/pom.xml | 18 +-
.../apache/flink/table/store/RowDataContainer.java | 0
.../store/SearchArgumentToPredicateConverter.java | 0
.../flink/table/store/TableStoreJobConf.java | 0
.../apache/flink/table/store/hive/HiveSchema.java | 0
.../table/store/hive/TableStoreHiveMetaHook.java | 0
.../store/hive/TableStoreHiveStorageHandler.java | 0
.../flink/table/store/hive/TableStoreSerDe.java | 0
.../TableStoreCharObjectInspector.java | 0
.../TableStoreDateObjectInspector.java | 0
.../TableStoreDecimalObjectInspector.java | 0
.../TableStoreListObjectInspector.java | 3 +-
.../TableStoreMapObjectInspector.java | 5 +-
.../TableStoreObjectInspectorFactory.java | 79 ++++
.../TableStoreRowDataObjectInspector.java | 2 +-
.../TableStoreStringObjectInspector.java | 0
.../TableStoreTimestampObjectInspector.java | 0
.../TableStoreVarcharObjectInspector.java | 0
.../table/store/mapred/TableStoreInputFormat.java | 0
.../table/store/mapred/TableStoreInputSplit.java | 0
.../table/store/mapred/TableStoreOutputFormat.java | 0
.../table/store/mapred/TableStoreRecordReader.java | 0
.../flink/table/store/FileStoreTestUtils.java | 0
.../SearchArgumentToPredicateConverterTest.java | 0
.../table/store/hive/HiveTableSchemaTest.java | 0
.../store/hive/RandomGenericRowDataGenerator.java | 0
.../hive/TableStoreHiveStorageHandlerITCase.java | 3 +-
.../table/store/hive/TableStoreSerDeTest.java | 0
.../TableStoreCharObjectInspectorTest.java | 0
.../TableStoreDateObjectInspectorTest.java | 0
.../TableStoreDecimalObjectInspectorTest.java | 0
.../TableStoreListObjectInspectorTest.java | 0
.../TableStoreMapObjectInspectorTest.java | 0
.../TableStoreRowDataObjectInspectorTest.java | 0
.../TableStoreStringObjectInspectorTest.java | 0
.../TableStoreTimestampObjectInspectorTest.java | 0
.../TableStoreVarcharObjectInspectorTest.java | 0
.../store/mapred/TableStoreInputSplitTest.java | 0
.../store/mapred/TableStoreRecordReaderTest.java | 0
.../src/test/resources/log4j2-test.properties | 0
flink-table-store-hive/pom.xml | 459 +--------------------
pom.xml | 1 -
63 files changed, 1195 insertions(+), 654 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 232015e5..56840aa3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -220,13 +220,19 @@ public class FlinkCatalog extends AbstractCatalog {
return UpdateSchema.fromCatalogTable(table);
}
- // --------------------- unsupported methods ----------------------------
-
@Override
public final void open() throws CatalogException {}
@Override
- public final void close() throws CatalogException {}
+ public final void close() throws CatalogException {
+ try {
+ catalog.close();
+ } catch (Exception e) {
+ throw new CatalogException("Failed to close catalog " + catalog.toString(), e);
+ }
+ }
+
+ // --------------------- unsupported methods ----------------------------
@Override
public CatalogDatabase getDatabase(String databaseName) throws CatalogException {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
index 6937e888..6021260f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
@@ -21,29 +21,34 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory;
+import org.apache.flink.util.Preconditions;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ServiceLoader;
import java.util.Set;
-
-import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+import java.util.stream.Collectors;
/** Factory for {@link FlinkCatalog}. */
-public class FlinkCatalogFactory implements CatalogFactory {
+public class FlinkCatalogFactory implements org.apache.flink.table.factories.CatalogFactory {
public static final String IDENTIFIER = "table-store";
- public static final ConfigOption<String> DEFAULT_DATABASE =
- ConfigOptions.key("default-database").stringType().defaultValue("default");
-
- public static final ConfigOption<String> WAREHOUSE =
+ public static final ConfigOption<String> METASTORE =
+ ConfigOptions.key("metastore")
+ .stringType()
+ .defaultValue(FileSystemCatalogFactory.IDENTIFIER);
+ private static final ConfigOption<String> WAREHOUSE =
ConfigOptions.key("warehouse")
.stringType()
.noDefaultValue()
.withDescription("The warehouse root path of catalog.");
+ public static final ConfigOption<String> DEFAULT_DATABASE =
+ ConfigOptions.key("default-database").stringType().defaultValue("default");
@Override
public String factoryIdentifier() {
@@ -52,35 +57,58 @@ public class FlinkCatalogFactory implements CatalogFactory {
@Override
public Set<ConfigOption<?>> requiredOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(WAREHOUSE);
- return options;
+ return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(DEFAULT_DATABASE);
- options.add(PROPERTY_VERSION);
- return options;
+ return Collections.emptySet();
}
@Override
public FlinkCatalog createCatalog(Context context) {
FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
- helper.validate();
ReadableConfig options = helper.getOptions();
- return createCatalog(
- new Path(options.get(WAREHOUSE)), context.getName(), options.get(DEFAULT_DATABASE));
+ return createCatalog(context.getName(), options);
}
- public static FlinkCatalog createCatalog(Path warehouse, String catalogName) {
- return createCatalog(warehouse, catalogName, DEFAULT_DATABASE.defaultValue());
- }
+ public static FlinkCatalog createCatalog(String catalogName, ReadableConfig options) {
+ // manual validation
+ // because different catalog types may have different options
+ // we can't list them all in the optionalOptions() method
+ String warehouse =
+ Preconditions.checkNotNull(
+ options.get(WAREHOUSE),
+ "Table store '" + WAREHOUSE.key() + "' path must be set");
+
+ String metastore = options.get(METASTORE);
+ List<CatalogFactory> factories = new ArrayList<>();
+ ServiceLoader.load(CatalogFactory.class, Thread.currentThread().getContextClassLoader())
+ .iterator()
+ .forEachRemaining(
+ f -> {
+ if (f.identifier().equals(metastore)) {
+ factories.add(f);
+ }
+ });
+ if (factories.size() != 1) {
+ throw new RuntimeException(
+ "Found "
+ + factories.size()
+ + " classes implementing "
+ + CatalogFactory.class.getName()
+ + " with metastore "
+ + metastore
+ + ". They are:\n"
+ + factories.stream()
+ .map(t -> t.getClass().getName())
+ .collect(Collectors.joining("\n")));
+ }
- public static FlinkCatalog createCatalog(
- Path warehouse, String catalogName, String defaultDatabase) {
- return new FlinkCatalog(Catalog.create(warehouse), catalogName, defaultDatabase);
+ return new FlinkCatalog(
+ factories.get(0).create(warehouse, options),
+ catalogName,
+ options.get(DEFAULT_DATABASE));
}
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
index 79b8153d..8294294e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.connector;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
@@ -71,7 +72,9 @@ public class FlinkCatalogTest {
@Before
public void beforeEach() throws IOException {
String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
- catalog = FlinkCatalogFactory.createCatalog(new Path(path), "test-catalog");
+ Configuration conf = new Configuration();
+ conf.setString("warehouse", path);
+ catalog = FlinkCatalogFactory.createCatalog("test-catalog", conf);
}
private ResolvedSchema createSchema() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
new file mode 100644
index 00000000..c3e2dd59
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/** Common implementation of {@link Catalog}. */
+public abstract class AbstractCatalog implements Catalog {
+
+ protected static final String DB_SUFFIX = ".db";
+
+ @Override
+ public Path getTableLocation(ObjectPath tablePath) {
+ return new Path(databasePath(tablePath.getDatabaseName()), tablePath.getObjectName());
+ }
+
+ protected Path databasePath(String database) {
+ return new Path(warehouse(), database + DB_SUFFIX);
+ }
+
+ protected abstract String warehouse();
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 01b23661..65c099f4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -241,9 +241,4 @@ public interface Catalog extends AutoCloseable {
return tablePath;
}
}
-
- /** Create a {@link Catalog} from warehouse path. */
- static Catalog create(Path warehouse) {
- return new FileSystemCatalog(warehouse);
- }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
new file mode 100644
index 00000000..5e93a3ed
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Factory to create {@link Catalog}. Each factory should have a unique identifier. */
+public interface CatalogFactory {
+
+ String identifier();
+
+ Catalog create(String warehouse, ReadableConfig options);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index d4a451ea..451ad8f5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.store.file.catalog;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -34,9 +33,7 @@ import java.util.concurrent.Callable;
import static org.apache.flink.table.store.file.utils.FileUtils.safelyListFileStatus;
/** A catalog implementation for {@link FileSystem}. */
-public class FileSystemCatalog implements Catalog {
-
- public static final String DB_SUFFIX = ".db";
+public class FileSystemCatalog extends AbstractCatalog {
private final FileSystem fs;
private final Path warehouse;
@@ -108,14 +105,9 @@ public class FileSystemCatalog implements Catalog {
return tables;
}
- @Override
- public Path getTableLocation(ObjectPath tablePath) {
- return tablePath(tablePath);
- }
-
@Override
public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException {
- Path path = tablePath(tablePath);
+ Path path = getTableLocation(tablePath);
return new SchemaManager(path)
.latest()
.orElseThrow(() -> new TableNotExistException(tablePath));
@@ -123,7 +115,7 @@ public class FileSystemCatalog implements Catalog {
@Override
public boolean tableExists(ObjectPath tablePath) {
- return tableExists(tablePath(tablePath));
+ return tableExists(getTableLocation(tablePath));
}
private boolean tableExists(Path tablePath) {
@@ -133,7 +125,7 @@ public class FileSystemCatalog implements Catalog {
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException {
- Path path = tablePath(tablePath);
+ Path path = getTableLocation(tablePath);
if (!tableExists(path)) {
if (ignoreIfNotExists) {
return;
@@ -152,7 +144,7 @@ public class FileSystemCatalog implements Catalog {
throw new DatabaseNotExistException(tablePath.getDatabaseName());
}
- Path path = tablePath(tablePath);
+ Path path = getTableLocation(tablePath);
if (tableExists(path)) {
if (ignoreIfExists) {
return;
@@ -167,7 +159,7 @@ public class FileSystemCatalog implements Catalog {
@Override
public void alterTable(ObjectPath tablePath, UpdateSchema newTable, boolean ignoreIfNotExists)
throws TableNotExistException {
- Path path = tablePath(tablePath);
+ Path path = getTableLocation(tablePath);
if (!tableExists(path)) {
if (ignoreIfNotExists) {
return;
@@ -196,19 +188,15 @@ public class FileSystemCatalog implements Catalog {
return name.substring(0, name.length() - DB_SUFFIX.length());
}
- private Path databasePath(String database) {
- return new Path(warehouse, database + DB_SUFFIX);
- }
-
- @VisibleForTesting
- Path tablePath(ObjectPath objectPath) {
- return new Path(databasePath(objectPath.getDatabaseName()), objectPath.getObjectName());
- }
-
private void commitTableChange(Path tablePath, UpdateSchema table) {
uncheck(() -> new SchemaManager(tablePath).commitNewVersion(table));
}
@Override
public void close() throws Exception {}
+
+ @Override
+ protected String warehouse() {
+ return warehouse.toString();
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
new file mode 100644
index 00000000..c705872e
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+
+/** Factory to create {@link FileSystemCatalog}. */
+public class FileSystemCatalogFactory implements CatalogFactory {
+
+ public static final String IDENTIFIER = "filesystem";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Catalog create(String warehouse, ReadableConfig options) {
+ return new FileSystemCatalog(new Path(warehouse));
+ }
+}
diff --git a/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory
new file mode 100644
index 00000000..3edb81e1
--- /dev/null
+++ b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory
diff --git a/flink-table-store-dist/pom.xml b/flink-table-store-dist/pom.xml
index f58e91f5..8da30d51 100644
--- a/flink-table-store-dist/pom.xml
+++ b/flink-table-store-dist/pom.xml
@@ -62,6 +62,12 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-hive-catalog</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-store-kafka</artifactId>
@@ -97,6 +103,7 @@ under the License.
<include>org.apache.flink:flink-table-store-connector</include>
<include>org.apache.flink:flink-table-store-core</include>
<include>org.apache.flink:flink-table-store-format</include>
+ <include>org.apache.flink:flink-table-store-hive-catalog</include>
<include>org.apache.flink:flink-table-store-kafka</include>
<include>org.apache.flink:flink-sql-connector-kafka</include>
</includes>
diff --git a/flink-table-store-e2e-tests/pom.xml b/flink-table-store-e2e-tests/pom.xml
index aefec3cb..85a62b5f 100644
--- a/flink-table-store-e2e-tests/pom.xml
+++ b/flink-table-store-e2e-tests/pom.xml
@@ -40,7 +40,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-hive</artifactId>
+ <artifactId>flink-table-store-hive-connector</artifactId>
<version>${project.version}</version>
</dependency>
@@ -105,9 +105,9 @@ under the License.
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-hive</artifactId>
+ <artifactId>flink-table-store-hive-connector</artifactId>
<version>${project.version}</version>
- <destFileName>flink-table-store-hive.jar</destFileName>
+ <destFileName>flink-table-store-hive-connector.jar</destFileName>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/dependencies
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
index 563c121b..be375f42 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
@@ -65,7 +65,8 @@ public abstract class E2eTestBase {
}
private static final String TABLE_STORE_JAR_NAME = "flink-table-store.jar";
- protected static final String TABLE_STORE_HIVE_JAR_NAME = "flink-table-store-hive.jar";
+ protected static final String TABLE_STORE_HIVE_CONNECTOR_JAR_NAME =
+ "flink-table-store-hive-connector.jar";
private static final String BUNDLED_HADOOP_JAR_NAME = "bundled-hadoop.jar";
protected static final String TEST_DATA_DIR = "/test-data";
@@ -122,7 +123,7 @@ public abstract class E2eTestBase {
jobManager.execInContainer("chown", "-R", "flink:flink", TEST_DATA_DIR);
copyResource(TABLE_STORE_JAR_NAME);
- copyResource(TABLE_STORE_HIVE_JAR_NAME);
+ copyResource(TABLE_STORE_HIVE_CONNECTOR_JAR_NAME);
copyResource(BUNDLED_HADOOP_JAR_NAME);
}
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
index 98ce0893..9a4f7833 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
@@ -35,7 +35,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class HiveE2eTest extends E2eTestBase {
private static final String ADD_JAR_HQL =
- "ADD JAR " + TEST_DATA_DIR + "/" + TABLE_STORE_HIVE_JAR_NAME + ";";
+ "ADD JAR " + TEST_DATA_DIR + "/" + TABLE_STORE_HIVE_CONNECTOR_JAR_NAME + ";";
public HiveE2eTest() {
super(false, true);
@@ -43,7 +43,6 @@ public class HiveE2eTest extends E2eTestBase {
@Test
public void testReadExternalTable() throws Exception {
- // TODO write data directly to HDFS after FLINK-27562 is solved
String tableStorePkDdl =
"CREATE TABLE IF NOT EXISTS table_store_pk (\n"
+ " a int,\n"
@@ -54,7 +53,7 @@ public class HiveE2eTest extends E2eTestBase {
+ " 'bucket' = '2',\n"
+ " 'root-path' = '%s'\n"
+ ");";
- String tableStorePkPath = TEST_DATA_DIR + "/" + UUID.randomUUID().toString() + ".store";
+ String tableStorePkPath = HDFS_ROOT + "/" + UUID.randomUUID().toString() + ".store";
tableStorePkDdl = String.format(tableStorePkDdl, tableStorePkPath);
runSql(
"INSERT INTO table_store_pk VALUES "
@@ -68,8 +67,6 @@ public class HiveE2eTest extends E2eTestBase {
"CREATE EXTERNAL TABLE IF NOT EXISTS table_store_pk\n"
+ "STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'\n"
+ "LOCATION '"
- // hive cannot read from local path
- + HDFS_ROOT
+ tableStorePkPath
+ "/default_catalog.catalog/default_database.db/table_store_pk';";
writeSharedFile(
@@ -78,13 +75,9 @@ public class HiveE2eTest extends E2eTestBase {
ADD_JAR_HQL
+ "\n"
+ externalTablePkDdl
- + "\n"
- + "SELECT b, a, c FROM table_store_pk ORDER BY b;");
+ + "\nSELECT b, a, c FROM table_store_pk ORDER BY b;");
ContainerState hive = getHive();
- hive.execInContainer("hdfs", "dfs", "-mkdir", "-p", HDFS_ROOT + TEST_DATA_DIR);
- hive.execInContainer(
- "hdfs", "dfs", "-copyFromLocal", tableStorePkPath, HDFS_ROOT + tableStorePkPath);
Container.ExecResult execResult =
hive.execInContainer(
"/opt/hive/bin/hive",
@@ -99,6 +92,54 @@ public class HiveE2eTest extends E2eTestBase {
}
}
+ @Test
+ public void testFlinkWriteAndHiveRead() throws Exception {
+ String sql =
+ String.join(
+ "\n",
+ "CREATE CATALOG my_hive WITH (",
+ " 'type' = 'table-store',",
+ " 'metastore' = 'hive',",
+ " 'uri' = 'thrift://hive-metastore:9083',",
+ " 'warehouse' = '"
+ + HDFS_ROOT
+ + "/"
+ + UUID.randomUUID().toString()
+ + ".warehouse'",
+ ");",
+ "",
+ "USE CATALOG my_hive;",
+ "",
+ "CREATE TABLE T (",
+ " a int,",
+ " b bigint,",
+ " c string",
+ ") WITH (",
+ " 'bucket' = '2'",
+ ");",
+ "",
+ "INSERT INTO T VALUES (1, 10, 'Hi'), (2, 20, 'Hello');");
+ runSql(sql);
+
+ writeSharedFile(
+ "query.hql",
+ // same default database name as Flink
+ ADD_JAR_HQL + "\nSELECT b, a, c FROM t ORDER BY b;");
+
+ ContainerState hive = getHive();
+ Container.ExecResult execResult =
+ hive.execInContainer(
+ "/opt/hive/bin/hive",
+ "--hiveconf",
+ "hive.root.logger=INFO,console",
+ "-f",
+ TEST_DATA_DIR + "/query.hql");
+ assertThat(execResult.getStdout()).isEqualTo("10\t1\tHi\n" + "20\t2\tHello\n");
+ if (execResult.getExitCode() != 0) {
+ throw new AssertionError("Failed when running hive sql.");
+ }
+ }
+
private ContainerState getHive() {
return environment.getContainerByServiceName("hive-server_1").get();
}
diff --git a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index 42c89b27..d414d99f 100644
--- a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++ b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -25,10 +25,10 @@ services:
# ----------------------------------------
jobmanager:
- image: apache/flink:${flink.version}
+ image: apache/flink:${flink.version}-java8
volumes:
- testdata:/test-data
- command: jobmanager
+ entrypoint: /bin/bash -c "wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar && /docker-entrypoint.sh jobmanager"
env_file:
- ./flink.env
networks:
@@ -39,10 +39,10 @@ services:
- "8081"
taskmanager:
- image: apache/flink:${flink.version}
+ image: apache/flink:${flink.version}-java8
volumes:
- testdata:/test-data
- command: taskmanager
+ entrypoint: /bin/bash -c "wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar && /docker-entrypoint.sh taskmanager"
env_file:
- ./flink.env
networks:
diff --git a/flink-table-store-hive/pom.xml b/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
similarity index 81%
copy from flink-table-store-hive/pom.xml
copy to flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
index 1d6bb57d..8d5c6c86 100644
--- a/flink-table-store-hive/pom.xml
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
@@ -23,68 +23,41 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>flink-table-store-parent</artifactId>
+ <artifactId>flink-table-store-hive</artifactId>
<groupId>org.apache.flink</groupId>
<version>0.2-SNAPSHOT</version>
</parent>
- <artifactId>flink-table-store-hive</artifactId>
- <name>Flink Table Store : Hive</name>
+ <artifactId>flink-table-store-hive-catalog</artifactId>
+ <name>Flink Table Store : Hive Catalog</name>
<packaging>jar</packaging>
- <properties>
- <hiverunner.version>4.0.0</hiverunner.version>
- <reflections.version>0.9.8</reflections.version>
- </properties>
-
<dependencies>
- <!-- Flink All dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-shade</artifactId>
+ <artifactId>flink-table-store-core</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-hive-common</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
+ <artifactId>hive-metastore</artifactId>
<version>${hive.version}</version>
- <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
@@ -110,11 +83,6 @@ under the License.
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
- <!-- this dependency cannot be fetched from central maven repository anymore -->
- <exclusion>
- <groupId>org.pentaho</groupId>
- <artifactId>*</artifactId>
- </exclusion>
</exclusions>
</dependency>
@@ -122,26 +90,31 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-core</artifactId>
+ <artifactId>flink-table-store-connector</artifactId>
<version>${project.version}</version>
<scope>test</scope>
- <type>test-jar</type>
</dependency>
- <!--
- IDEA reads classes from the same project from target/classes of that module,
- so even though we've packaged and shaded avro classes into flink-table-store-format.jar
- we still have to include this test dependency here.
- -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-hive-connector</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-sql-avro</artifactId>
+ <artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
- <!-- dependencies for IT cases -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -158,6 +131,34 @@ under the License.
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>com.klarna</groupId>
<artifactId>hiverunner</artifactId>
@@ -475,11 +476,41 @@ under the License.
<goal>shade</goal>
</goals>
<configuration>
+ <minimizeJar>true</minimizeJar>
<artifactSet>
- <includes combine.children="append">
- <include>org.apache.flink:flink-table-store-shade</include>
+ <includes>
+ <include>org.apache.flink:flink-table-store-hive-common</include>
+ <include>org.apache.thrift:libthrift</include>
+ <include>org.apache.thrift:libfb303</include>
+ <include>com.google.guava:guava</include>
+ <include>org.apache.hive:hive-common</include>
+ <include>org.apache.hive.shims:hive-shims-common</include>
+ <include>org.apache.hive:hive-serde</include>
+ <include>org.apache.hive:hive-metastore</include>
</includes>
</artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.facebook.fb303</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.com.facebook.fb303</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.com.google.common</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.hadoop.hive</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.hadoop.hive</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.hive</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.hive</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.thrift</pattern>
+ <shadedPattern>org.apache.flink.table.store.shaded.org.apache.thrift</shadedPattern>
+ </relocation>
+ </relocations>
</configuration>
</execution>
</executions>
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
new file mode 100644
index 00000000..50a3f635
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.catalog.AbstractCatalog;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.thrift.TException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A catalog implementation for Hive. */
+public class HiveCatalog extends AbstractCatalog {
+
+ // we don't include flink-table-store-hive-connector as dependencies because it depends on
+ // hive-exec
+ private static final String INPUT_FORMAT_CLASS_NAME =
+ "org.apache.flink.table.store.mapred.TableStoreInputFormat";
+ private static final String OUTPUT_FORMAT_CLASS_NAME =
+ "org.apache.flink.table.store.mapred.TableStoreOutputFormat";
+ private static final String SERDE_CLASS_NAME =
+ "org.apache.flink.table.store.hive.TableStoreSerDe";
+ private static final String STORAGE_HANDLER_CLASS_NAME =
+ "org.apache.flink.table.store.hive.TableStoreHiveStorageHandler";
+
+ private final HiveConf hiveConf;
+ private final IMetaStoreClient client;
+
+ public HiveCatalog(String thriftUri, String warehousePath) {
+ Configuration conf = new Configuration();
+ conf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
+ conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousePath);
+ this.hiveConf = new HiveConf(conf, HiveConf.class);
+ try {
+ IMetaStoreClient client =
+ RetryingMetaStoreClient.getProxy(
+ hiveConf, tbl -> null, HiveMetaStoreClient.class.getName());
+ this.client =
+ StringUtils.isNullOrWhitespaceOnly(thriftUri)
+ ? client
+ : HiveMetaStoreClient.newSynchronizedClient(client);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() {
+ try {
+ return client.getAllDatabases();
+ } catch (TException e) {
+ throw new RuntimeException("Failed to list all databases", e);
+ }
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) {
+ try {
+ client.getDatabase(databaseName);
+ return true;
+ } catch (NoSuchObjectException e) {
+ return false;
+ } catch (TException e) {
+ throw new RuntimeException(
+ "Failed to determine if database " + databaseName + " exists", e);
+ }
+ }
+
+ @Override
+ public void createDatabase(String name, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException {
+ try {
+ client.createDatabase(convertToDatabase(name));
+ } catch (AlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new DatabaseAlreadyExistException(name, e);
+ }
+ } catch (TException e) {
+ throw new RuntimeException("Failed to create database " + name, e);
+ }
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException {
+ try {
+ if (!cascade && client.getAllTables(name).size() > 0) {
+ throw new DatabaseNotEmptyException(name);
+ }
+ client.dropDatabase(name, true, false, true);
+ } catch (NoSuchObjectException | UnknownDBException e) {
+ if (!ignoreIfNotExists) {
+ throw new DatabaseNotExistException(name, e);
+ }
+ } catch (TException e) {
+ throw new RuntimeException("Failed to drop database " + name, e);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws DatabaseNotExistException {
+ try {
+ return client.getAllTables(databaseName);
+ } catch (UnknownDBException e) {
+ throw new DatabaseNotExistException(databaseName, e);
+ } catch (TException e) {
+ throw new RuntimeException("Failed to list all tables in database " + databaseName, e);
+ }
+ }
+
+ @Override
+ public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException {
+ if (isTableStoreTableNotExisted(tablePath)) {
+ throw new TableNotExistException(tablePath);
+ }
+ Path tableLocation = getTableLocation(tablePath);
+ return new SchemaManager(tableLocation)
+ .latest()
+ .orElseThrow(
+ () -> new RuntimeException("There is no table stored in " + tableLocation));
+ }
+
+ @Override
+ public boolean tableExists(ObjectPath tablePath) {
+ try {
+ client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+ return true;
+ } catch (NoSuchObjectException e) {
+ return false;
+ } catch (TException e) {
+ throw new RuntimeException(
+ "Failed to determine if table " + tablePath.getFullName() + " exists", e);
+ }
+ }
+
+ @Override
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException {
+ if (isTableStoreTableNotExisted(tablePath)) {
+ if (ignoreIfNotExists) {
+ return;
+ } else {
+ throw new TableNotExistException(tablePath);
+ }
+ }
+
+ try {
+ client.dropTable(
+ tablePath.getDatabaseName(), tablePath.getObjectName(), true, false, true);
+ } catch (TException e) {
+ throw new RuntimeException("Failed to drop table " + tablePath.getFullName(), e);
+ }
+ }
+
+ @Override
+ public void createTable(ObjectPath tablePath, UpdateSchema updateSchema, boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException {
+ String databaseName = tablePath.getDatabaseName();
+ if (!databaseExists(databaseName)) {
+ throw new DatabaseNotExistException(databaseName);
+ }
+ if (tableExists(tablePath)) {
+ if (ignoreIfExists) {
+ return;
+ } else {
+ throw new TableAlreadyExistException(tablePath);
+ }
+ }
+
+ // first commit changes to underlying files
+ // if changes on Hive fails there is no harm to perform the same changes to files again
+ TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema);
+ Table table = newHmsTable(tablePath);
+ updateHmsTable(table, tablePath, schema);
+ try {
+ client.createTable(table);
+ } catch (TException e) {
+ throw new RuntimeException("Failed to create table " + tablePath.getFullName(), e);
+ }
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath tablePath, UpdateSchema updateSchema, boolean ignoreIfNotExists)
+ throws TableNotExistException {
+ if (isTableStoreTableNotExisted(tablePath)) {
+ if (ignoreIfNotExists) {
+ return;
+ } else {
+ throw new TableNotExistException(tablePath);
+ }
+ }
+
+ // first commit changes to underlying files
+ // if changes on Hive fails there is no harm to perform the same changes to files again
+ TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema);
+ try {
+ Table table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+ updateHmsTable(table, tablePath, schema);
+ client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
+ } catch (TException e) {
+ throw new RuntimeException("Failed to alter table " + tablePath.getFullName(), e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ client.close();
+ }
+
+ @Override
+ protected String warehouse() {
+ return hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
+ }
+
+ private Database convertToDatabase(String name) {
+ Database database = new Database();
+ database.setName(name);
+ database.setLocationUri(databasePath(name).toString());
+ return database;
+ }
+
+ private Table newHmsTable(ObjectPath tablePath) {
+ long currentTimeMillis = System.currentTimeMillis();
+ Table table =
+ new Table(
+ tablePath.getObjectName(),
+ tablePath.getDatabaseName(),
+ // current linux user
+ System.getProperty("user.name"),
+ (int) (currentTimeMillis / 1000),
+ (int) (currentTimeMillis / 1000),
+ Integer.MAX_VALUE,
+ null,
+ Collections.emptyList(),
+ new HashMap<>(),
+ null,
+ null,
+ TableType.MANAGED_TABLE.toString());
+ table.getParameters()
+ .put(hive_metastoreConstants.META_TABLE_STORAGE, STORAGE_HANDLER_CLASS_NAME);
+ return table;
+ }
+
+ private void updateHmsTable(Table table, ObjectPath tablePath, TableSchema schema) {
+ StorageDescriptor sd = convertToStorageDescriptor(tablePath, schema);
+ table.setSd(sd);
+ }
+
+ private StorageDescriptor convertToStorageDescriptor(ObjectPath tablePath, TableSchema schema) {
+ StorageDescriptor sd = new StorageDescriptor();
+
+ sd.setCols(
+ schema.fields().stream()
+ .map(this::convertToFieldSchema)
+ .collect(Collectors.toList()));
+ sd.setLocation(getTableLocation(tablePath).toString());
+
+ sd.setInputFormat(INPUT_FORMAT_CLASS_NAME);
+ sd.setOutputFormat(OUTPUT_FORMAT_CLASS_NAME);
+
+ SerDeInfo serDeInfo = new SerDeInfo();
+ serDeInfo.setParameters(new HashMap<>());
+ serDeInfo.setSerializationLib(SERDE_CLASS_NAME);
+ sd.setSerdeInfo(serDeInfo);
+
+ return sd;
+ }
+
+ private FieldSchema convertToFieldSchema(DataField dataField) {
+ return new FieldSchema(
+ dataField.name(),
+ HiveTypeUtils.logicalTypeToTypeInfo(dataField.type().logicalType()).getTypeName(),
+ dataField.description());
+ }
+
+ private boolean isTableStoreTableNotExisted(ObjectPath tablePath) {
+ Table table;
+ try {
+ table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+ } catch (NoSuchObjectException e) {
+ return true;
+ } catch (TException e) {
+ throw new RuntimeException(
+ "Cannot determine if table "
+ + tablePath.getFullName()
+ + " is a table store table.",
+ e);
+ }
+
+ if (!INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat())
+ || !OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat())) {
+ throw new IllegalArgumentException(
+ "Table "
+ + tablePath.getFullName()
+ + " is not a table store table. It's input format is "
+ + table.getSd().getInputFormat()
+ + " and its output format is "
+ + table.getSd().getOutputFormat());
+ }
+ return false;
+ }
+
+ private TableSchema commitToUnderlyingFiles(ObjectPath tablePath, UpdateSchema schema) {
+ Path path = getTableLocation(tablePath);
+ try {
+ return new SchemaManager(path).commitNewVersion(schema);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to commit changes of table "
+ + tablePath.getFullName()
+ + " to underlying files",
+ e);
+ }
+ }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
new file mode 100644
index 00000000..1e2cbabb
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.util.Preconditions;
+
+/** Factory to create {@link HiveCatalog}. */
+public class HiveCatalogFactory implements CatalogFactory {
+
+ private static final String IDENTIFIER = "hive";
+
+ private static final ConfigOption<String> URI =
+ ConfigOptions.key("uri")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Uri of Hive metastore's thrift server.");
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Catalog create(String warehouse, ReadableConfig options) {
+ String uri =
+ Preconditions.checkNotNull(
+ options.get(URI),
+ URI.key() + " must be set for table store " + IDENTIFIER + " catalog");
+ return new HiveCatalog(uri, warehouse);
+ }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory
new file mode 100644
index 00000000..e9404fcd
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.hive.HiveCatalogFactory
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
new file mode 100644
index 00000000..1b5654b7
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/** IT cases for {@link HiveCatalog}. */
+@RunWith(FlinkEmbeddedHiveRunner.class)
+public class HiveCatalogITCase {
+
+ @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+ private String path;
+ private TableEnvironment tEnv;
+
+ @HiveSQL(files = {})
+ private static HiveShell hiveShell;
+
+ @Before
+ public void before() throws Exception {
+ hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
+ hiveShell.execute("USE test_db");
+ hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )");
+ hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200, 'Table')");
+
+ path = folder.newFolder().toURI().toString();
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+ tEnv = TableEnvironmentImpl.create(settings);
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE CATALOG my_hive WITH (",
+ " 'type' = 'table-store',",
+ " 'metastore' = 'hive',",
+ " 'uri' = '',",
+ " 'warehouse' = '" + path + "'",
+ ")"))
+ .await();
+ tEnv.executeSql("USE CATALOG my_hive").await();
+ tEnv.executeSql("USE test_db").await();
+ }
+
+ @After
+ public void after() {
+ hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
+ hiveShell.execute("DROP DATABASE IF EXISTS test_db2 CASCADE");
+ }
+
+ @Test
+ public void testDatabaseOperations() throws Exception {
+ // create database
+ tEnv.executeSql("CREATE DATABASE test_db2").await();
+ Assert.assertEquals(
+ Arrays.asList(Row.of("default"), Row.of("test_db"), Row.of("test_db2")),
+ collect("SHOW DATABASES"));
+ tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db2").await();
+ try {
+ tEnv.executeSql("CREATE DATABASE test_db2").await();
+ Assert.fail("No exception is thrown");
+ } catch (Throwable t) {
+ ExceptionUtils.assertThrowableWithMessage(
+ t, "Database test_db2 already exists in Catalog my_hive");
+ }
+
+ // drop database
+ tEnv.executeSql("DROP DATABASE test_db2").await();
+ Assert.assertEquals(
+ Arrays.asList(Row.of("default"), Row.of("test_db")), collect("SHOW DATABASES"));
+ tEnv.executeSql("DROP DATABASE IF EXISTS test_db2").await();
+ try {
+ tEnv.executeSql("DROP DATABASE test_db2").await();
+ Assert.fail("No exception is thrown");
+ } catch (Throwable t) {
+ ExceptionUtils.assertThrowableWithMessage(
+ t, "Database test_db2 does not exist in Catalog my_hive");
+ }
+
+ // drop non-empty database
+ tEnv.executeSql("CREATE DATABASE test_db2").await();
+ tEnv.executeSql("USE test_db2").await();
+ tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ .await();
+ tEnv.executeSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')").await();
+ Path tablePath = new Path(path, "test_db2.db/T");
+ Assert.assertTrue(tablePath.getFileSystem().exists(tablePath));
+ try {
+ tEnv.executeSql("DROP DATABASE test_db2").await();
+ Assert.fail("No exception is thrown");
+ } catch (Throwable t) {
+ ExceptionUtils.assertThrowableWithMessage(
+ t, "Database test_db2 in catalog my_hive is not empty");
+ }
+ tEnv.executeSql("DROP DATABASE test_db2 CASCADE").await();
+ Assert.assertEquals(
+ Arrays.asList(Row.of("default"), Row.of("test_db")), collect("SHOW DATABASES"));
+ Assert.assertFalse(tablePath.getFileSystem().exists(tablePath));
+ }
+
+ @Test
+ public void testTableOperations() throws Exception {
+ // create table
+ tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ .await();
+ tEnv.executeSql("CREATE TABLE S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ .await();
+ Assert.assertEquals(
+ Arrays.asList(Row.of("hive_table"), Row.of("s"), Row.of("t")),
+ collect("SHOW TABLES"));
+ tEnv.executeSql(
+ "CREATE TABLE IF NOT EXISTS S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ .await();
+ try {
+ tEnv.executeSql("CREATE TABLE S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ .await();
+ Assert.fail("No exception is thrown");
+ } catch (Throwable t) {
+ ExceptionUtils.assertThrowableWithMessage(
+ t, "Table (or view) test_db.S already exists in Catalog my_hive");
+ }
+
+ // drop table
+ tEnv.executeSql("INSERT INTO S VALUES (1, 'Hi'), (2, 'Hello')").await();
+ Path tablePath = new Path(path, "test_db.db/S");
+ Assert.assertTrue(tablePath.getFileSystem().exists(tablePath));
+ tEnv.executeSql("DROP TABLE S").await();
+ Assert.assertEquals(
+ Arrays.asList(Row.of("hive_table"), Row.of("t")), collect("SHOW TABLES"));
+ Assert.assertFalse(tablePath.getFileSystem().exists(tablePath));
+ tEnv.executeSql("DROP TABLE IF EXISTS S").await();
+ try {
+ tEnv.executeSql("DROP TABLE S").await();
+ Assert.fail("No exception is thrown");
+ } catch (Throwable t) {
+ ExceptionUtils.assertThrowableWithMessage(
+ t, "Table with identifier 'my_hive.test_db.S' does not exist");
+ }
+ try {
+ tEnv.executeSql("DROP TABLE hive_table").await();
+ Assert.fail("No exception is thrown");
+ } catch (Throwable t) {
+ ExceptionUtils.assertThrowableWithMessage(
+ t, "Table test_db.hive_table is not a table store table");
+ }
+
+ // alter table
+ tEnv.executeSql("ALTER TABLE T SET ( 'manifest.target-file-size' = '16MB' )").await();
+ List<Row> actual = collect("SHOW CREATE TABLE T");
+ Assert.assertEquals(1, actual.size());
+ Assert.assertTrue(
+ actual.get(0)
+ .getField(0)
+ .toString()
+ .contains("'manifest.target-file-size' = '16MB'"));
+ try {
+ tEnv.executeSql("ALTER TABLE S SET ( 'manifest.target-file-size' = '16MB' )").await();
+ Assert.fail("No exception is thrown");
+ } catch (Throwable t) {
+ ExceptionUtils.assertThrowableWithMessage(
+ t, "Table `my_hive`.`test_db`.`S` doesn't exist or is a temporary table");
+ }
+ try {
+ tEnv.executeSql("ALTER TABLE hive_table SET ( 'manifest.target-file-size' = '16MB' )")
+ .await();
+ Assert.fail("No exception is thrown");
+ } catch (Throwable t) {
+ ExceptionUtils.assertThrowableWithMessage(
+ t, "Table test_db.hive_table is not a table store table");
+ }
+ }
+
+ @Test
+ public void testFlinkWriteAndHiveRead() throws Exception {
+ tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+ .await();
+ tEnv.executeSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')").await();
+ Assert.assertEquals(
+ Arrays.asList("1\tHi", "2\tHello"),
+ hiveShell.executeQuery("SELECT * FROM t ORDER BY a"));
+
+ try {
+ tEnv.executeSql("INSERT INTO hive_table VALUES (1, 'Hi'), (2, 'Hello')").await();
+ Assert.fail("No exception is thrown");
+ } catch (Throwable t) {
+ ExceptionUtils.assertThrowableWithMessage(
+ t, "Table test_db.hive_table is not a table store table");
+ }
+ }
+
+ private List<Row> collect(String sql) throws Exception {
+ List<Row> result = new ArrayList<>();
+ try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+ while (it.hasNext()) {
+ result.add(it.next());
+ }
+ }
+ return result;
+ }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-common/pom.xml b/flink-table-store-hive/flink-table-store-hive-common/pom.xml
new file mode 100644
index 00000000..a87aa767
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-common/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-table-store-hive</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>0.2-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-table-store-hive-common</artifactId>
+ <name>Flink Table Store : Hive Common</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>${hive.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java b/flink-table-store-hive/flink-table-store-hive-common/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
similarity index 55%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
rename to flink-table-store-hive/flink-table-store-hive-common/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
index d6061b12..b9af52f6 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
+++ b/flink-table-store-hive/flink-table-store-hive-common/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
@@ -18,14 +18,6 @@
package org.apache.flink.table.store.hive;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreCharObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreDateObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreDecimalObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreListObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreMapObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreStringObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreTimestampObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreVarcharObjectInspector;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DecimalType;
@@ -33,9 +25,6 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -93,48 +82,4 @@ public class HiveTypeUtils {
"Unsupported logical type " + logicalType.asSummaryString());
}
}
-
- public static ObjectInspector getObjectInspector(LogicalType logicalType) {
- switch (logicalType.getTypeRoot()) {
- case BOOLEAN:
- case TINYINT:
- case SMALLINT:
- case INTEGER:
- case BIGINT:
- case FLOAT:
- case DOUBLE:
- case BINARY:
- case VARBINARY:
- return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
- (PrimitiveTypeInfo) logicalTypeToTypeInfo(logicalType));
- case DECIMAL:
- DecimalType decimalType = (DecimalType) logicalType;
- return new TableStoreDecimalObjectInspector(
- decimalType.getPrecision(), decimalType.getScale());
- case CHAR:
- CharType charType = (CharType) logicalType;
- return new TableStoreCharObjectInspector(charType.getLength());
- case VARCHAR:
- VarCharType varCharType = (VarCharType) logicalType;
- if (varCharType.getLength() == VarCharType.MAX_LENGTH) {
- return new TableStoreStringObjectInspector();
- } else {
- return new TableStoreVarcharObjectInspector(varCharType.getLength());
- }
- case DATE:
- return new TableStoreDateObjectInspector();
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- return new TableStoreTimestampObjectInspector();
- case ARRAY:
- ArrayType arrayType = (ArrayType) logicalType;
- return new TableStoreListObjectInspector(arrayType.getElementType());
- case MAP:
- MapType mapType = (MapType) logicalType;
- return new TableStoreMapObjectInspector(
- mapType.getKeyType(), mapType.getValueType());
- default:
- throw new UnsupportedOperationException(
- "Unsupported logical type " + logicalType.asSummaryString());
- }
- }
}
diff --git a/flink-table-store-hive/pom.xml b/flink-table-store-hive/flink-table-store-hive-connector/pom.xml
similarity index 97%
copy from flink-table-store-hive/pom.xml
copy to flink-table-store-hive/flink-table-store-hive-connector/pom.xml
index 1d6bb57d..23598562 100644
--- a/flink-table-store-hive/pom.xml
+++ b/flink-table-store-hive/flink-table-store-hive-connector/pom.xml
@@ -23,22 +23,23 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>flink-table-store-parent</artifactId>
+ <artifactId>flink-table-store-hive</artifactId>
<groupId>org.apache.flink</groupId>
<version>0.2-SNAPSHOT</version>
</parent>
- <artifactId>flink-table-store-hive</artifactId>
- <name>Flink Table Store : Hive</name>
+ <artifactId>flink-table-store-hive-connector</artifactId>
+ <name>Flink Table Store : Hive Connector</name>
<packaging>jar</packaging>
- <properties>
- <hiverunner.version>4.0.0</hiverunner.version>
- <reflections.version>0.9.8</reflections.version>
- </properties>
-
<dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-hive-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- Flink All dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
@@ -477,6 +478,7 @@ under the License.
<configuration>
<artifactSet>
<includes combine.children="append">
+ <include>org.apache.flink:flink-table-store-hive-common</include>
<include>org.apache.flink:flink-table-store-shade</include>
</includes>
</artifactSet>
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/RowDataContainer.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/RowDataContainer.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/RowDataContainer.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/RowDataContainer.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
similarity index 95%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
index 02b0e724..225d2ed2 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.hive.objectinspector;
import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.store.hive.HiveTypeUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -41,7 +40,7 @@ public class TableStoreListObjectInspector implements ListObjectInspector {
private final ArrayData.ElementGetter elementGetter;
public TableStoreListObjectInspector(LogicalType elementType) {
- this.elementObjectInspector = HiveTypeUtils.getObjectInspector(elementType);
+ this.elementObjectInspector = TableStoreObjectInspectorFactory.create(elementType);
this.elementGetter = ArrayData.createElementGetter(elementType);
}
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
similarity index 94%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
index 4b4d606f..c37d3f2e 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.hive.objectinspector;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.store.hive.HiveTypeUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -45,8 +44,8 @@ public class TableStoreMapObjectInspector implements MapObjectInspector {
private final ArrayData.ElementGetter valueGetter;
public TableStoreMapObjectInspector(LogicalType keyType, LogicalType valueType) {
- this.keyObjectInspector = HiveTypeUtils.getObjectInspector(keyType);
- this.valueObjectInspector = HiveTypeUtils.getObjectInspector(valueType);
+ this.keyObjectInspector = TableStoreObjectInspectorFactory.create(keyType);
+ this.valueObjectInspector = TableStoreObjectInspectorFactory.create(valueType);
this.keyGetter = ArrayData.createElementGetter(keyType);
this.valueGetter = ArrayData.createElementGetter(valueType);
}
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreObjectInspectorFactory.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreObjectInspectorFactory.java
new file mode 100644
index 00000000..dca50ca0
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreObjectInspectorFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive.objectinspector;
+
+import org.apache.flink.table.store.hive.HiveTypeUtils;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+
+/** Factory to create {@link ObjectInspector}s according to the given {@link LogicalType}. */
+public class TableStoreObjectInspectorFactory {
+
+ public static ObjectInspector create(LogicalType logicalType) {
+ switch (logicalType.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case BINARY:
+ case VARBINARY:
+ return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
+ (PrimitiveTypeInfo) HiveTypeUtils.logicalTypeToTypeInfo(logicalType));
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) logicalType;
+ return new TableStoreDecimalObjectInspector(
+ decimalType.getPrecision(), decimalType.getScale());
+ case CHAR:
+ CharType charType = (CharType) logicalType;
+ return new TableStoreCharObjectInspector(charType.getLength());
+ case VARCHAR:
+ VarCharType varCharType = (VarCharType) logicalType;
+ if (varCharType.getLength() == VarCharType.MAX_LENGTH) {
+ return new TableStoreStringObjectInspector();
+ } else {
+ return new TableStoreVarcharObjectInspector(varCharType.getLength());
+ }
+ case DATE:
+ return new TableStoreDateObjectInspector();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return new TableStoreTimestampObjectInspector();
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) logicalType;
+ return new TableStoreListObjectInspector(arrayType.getElementType());
+ case MAP:
+ MapType mapType = (MapType) logicalType;
+ return new TableStoreMapObjectInspector(
+ mapType.getKeyType(), mapType.getValueType());
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported logical type " + logicalType.asSummaryString());
+ }
+ }
+}
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
similarity index 98%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
index 9545ccdc..a783be83 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
@@ -51,7 +51,7 @@ public class TableStoreRowDataObjectInspector extends StructObjectInspector {
TableStoreStructField structField =
new TableStoreStructField(
name,
- HiveTypeUtils.getObjectInspector(logicalType),
+ TableStoreObjectInspectorFactory.create(logicalType),
i,
RowData.createFieldGetter(logicalType, i),
fieldComments.get(i));
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
similarity index 99%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 5b518d50..d6e87bf7 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.store.FileStoreTestUtils;
import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.hive.objectinspector.TableStoreObjectInspectorFactory;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.types.logical.LogicalType;
@@ -225,7 +226,7 @@ public class TableStoreHiveStorageHandlerITCase {
continue;
}
ObjectInspector oi =
- HiveTypeUtils.getObjectInspector(
+ TableStoreObjectInspectorFactory.create(
RandomGenericRowDataGenerator.LOGICAL_TYPES.get(i));
switch (oi.getCategory()) {
case PRIMITIVE:
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
diff --git a/flink-table-store-hive/src/test/resources/log4j2-test.properties b/flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties
similarity index 100%
rename from flink-table-store-hive/src/test/resources/log4j2-test.properties
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties
diff --git a/flink-table-store-hive/pom.xml b/flink-table-store-hive/pom.xml
index 1d6bb57d..ebf590fb 100644
--- a/flink-table-store-hive/pom.xml
+++ b/flink-table-store-hive/pom.xml
@@ -31,459 +31,18 @@ under the License.
<artifactId>flink-table-store-hive</artifactId>
<name>Flink Table Store : Hive</name>
- <packaging>jar</packaging>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>flink-table-store-hive-catalog</module>
+ <module>flink-table-store-hive-common</module>
+ <module>flink-table-store-hive-connector</module>
+ </modules>
<properties>
+ <hive.version>2.3.4</hive.version>
<hiverunner.version>4.0.0</hiverunner.version>
<reflections.version>0.9.8</reflections.version>
</properties>
- <dependencies>
- <!-- Flink All dependencies -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-shade</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>${hive.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.orc</groupId>
- <artifactId>orc-core</artifactId>
- </exclusion>
- <!-- this dependency cannot be fetched from central maven repository anymore -->
- <exclusion>
- <groupId>org.pentaho</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- test dependencies -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-store-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <!--
- IDEA reads classes from the same project from target/classes of that module,
- so even though we've packaged and shaded avro classes into flink-table-store-format.jar
- we still have to include this test dependency here.
- -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-sql-avro</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- dependencies for IT cases -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
-
- <dependency>
- <groupId>com.klarna</groupId>
- <artifactId>hiverunner</artifactId>
- <version>${hiverunner.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-serde</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-jdbc</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-service</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-contrib</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-hcatalog-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-webhcat-java-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.tez</groupId>
- <artifactId>tez-common</artifactId>
- </exclusion>
- <exclusion>
- <!-- This dependency is no longer shipped with the JDK since Java 9.-->
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-common</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-auth</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-annotations</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-hdfs</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-api</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-client</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-common</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-server-common</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-server-web-proxy</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-shim</artifactId>
- <groupId>org.apache.tez</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jms</artifactId>
- <groupId>javax.jms</groupId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.reflections</groupId>
- <artifactId>reflections</artifactId>
- <version>${reflections.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-service</artifactId>
- <version>${hive.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-metastore</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <!-- This dependency is no longer shipped with the JDK since Java 9.-->
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-common</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-auth</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-client</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-annotations</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-hdfs</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-api</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-common</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-registry</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-server-common</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hbase-hadoop-compat</artifactId>
- <groupId>org.apache.hbase</groupId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-hcatalog-core</artifactId>
- <version>${hive.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-common</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-archives</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-annotations</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-hdfs</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>apache-log4j-extras</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-webhcat-java-client</artifactId>
- <version>${hive.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>jms</artifactId>
- <groupId>javax.jms</groupId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.pentaho</groupId>
- <artifactId>pentaho-aggdesigner-algorithm</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit4.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.junit.vintage</groupId>
- <artifactId>junit-vintage-engine</artifactId>
- <version>${junit5.version}</version>
- <exclusions>
- <exclusion>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- </exclusion>
- </exclusions>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <executions>
- <execution>
- <id>shade-flink</id>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <artifactSet>
- <includes combine.children="append">
- <include>org.apache.flink:flink-table-store-shade</include>
- </includes>
- </artifactSet>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index d2f88bc4..92d48714 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,6 @@ under the License.
<flink.shaded.version>15.0</flink.shaded.version>
<flink.shaded.jackson.version>2.12.4</flink.shaded.jackson.version>
<hadoop.version>2.8.5</hadoop.version>
- <hive.version>2.3.4</hive.version>
<scala.version>2.12.7</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<snappy.version>1.1.8.3</snappy.version>