You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/30 09:05:15 UTC

[flink-table-store] branch master updated: [FLINK-28067] Introduce HiveCatalog

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new cb688c5b [FLINK-28067] Introduce HiveCatalog
cb688c5b is described below

commit cb688c5bed4e02ceff608d85a6578a73f4fe2c1f
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Jun 30 17:05:10 2022 +0800

    [FLINK-28067] Introduce HiveCatalog
    
    This closes #181
---
 .../flink/table/store/connector/FlinkCatalog.java  |  12 +-
 .../table/store/connector/FlinkCatalogFactory.java |  82 ++--
 .../table/store/connector/FlinkCatalogTest.java    |   5 +-
 .../table/store/file/catalog/AbstractCatalog.java  |  39 ++
 .../flink/table/store/file/catalog/Catalog.java    |   5 -
 .../table/store/file/catalog/CatalogFactory.java   |  29 ++
 .../store/file/catalog/FileSystemCatalog.java      |  34 +-
 .../file/catalog/FileSystemCatalogFactory.java     |  38 ++
 ...e.flink.table.store.file.catalog.CatalogFactory |  16 +
 flink-table-store-dist/pom.xml                     |   7 +
 flink-table-store-e2e-tests/pom.xml                |   6 +-
 .../flink/table/store/tests/E2eTestBase.java       |   5 +-
 .../flink/table/store/tests/HiveE2eTest.java       |  61 ++-
 .../test/resources-filtered/docker-compose.yaml    |   8 +-
 .../{ => flink-table-store-hive-catalog}/pom.xml   | 141 ++++---
 .../apache/flink/table/store/hive/HiveCatalog.java | 356 ++++++++++++++++
 .../flink/table/store/hive/HiveCatalogFactory.java |  52 +++
 ...e.flink.table.store.file.catalog.CatalogFactory |  16 +
 .../flink/table/store/hive/HiveCatalogITCase.java  | 235 +++++++++++
 .../flink-table-store-hive-common/pom.xml          |  77 ++++
 .../flink/table/store/hive/HiveTypeUtils.java      |  55 ---
 .../{ => flink-table-store-hive-connector}/pom.xml |  18 +-
 .../apache/flink/table/store/RowDataContainer.java |   0
 .../store/SearchArgumentToPredicateConverter.java  |   0
 .../flink/table/store/TableStoreJobConf.java       |   0
 .../apache/flink/table/store/hive/HiveSchema.java  |   0
 .../table/store/hive/TableStoreHiveMetaHook.java   |   0
 .../store/hive/TableStoreHiveStorageHandler.java   |   0
 .../flink/table/store/hive/TableStoreSerDe.java    |   0
 .../TableStoreCharObjectInspector.java             |   0
 .../TableStoreDateObjectInspector.java             |   0
 .../TableStoreDecimalObjectInspector.java          |   0
 .../TableStoreListObjectInspector.java             |   3 +-
 .../TableStoreMapObjectInspector.java              |   5 +-
 .../TableStoreObjectInspectorFactory.java          |  79 ++++
 .../TableStoreRowDataObjectInspector.java          |   2 +-
 .../TableStoreStringObjectInspector.java           |   0
 .../TableStoreTimestampObjectInspector.java        |   0
 .../TableStoreVarcharObjectInspector.java          |   0
 .../table/store/mapred/TableStoreInputFormat.java  |   0
 .../table/store/mapred/TableStoreInputSplit.java   |   0
 .../table/store/mapred/TableStoreOutputFormat.java |   0
 .../table/store/mapred/TableStoreRecordReader.java |   0
 .../flink/table/store/FileStoreTestUtils.java      |   0
 .../SearchArgumentToPredicateConverterTest.java    |   0
 .../table/store/hive/HiveTableSchemaTest.java      |   0
 .../store/hive/RandomGenericRowDataGenerator.java  |   0
 .../hive/TableStoreHiveStorageHandlerITCase.java   |   3 +-
 .../table/store/hive/TableStoreSerDeTest.java      |   0
 .../TableStoreCharObjectInspectorTest.java         |   0
 .../TableStoreDateObjectInspectorTest.java         |   0
 .../TableStoreDecimalObjectInspectorTest.java      |   0
 .../TableStoreListObjectInspectorTest.java         |   0
 .../TableStoreMapObjectInspectorTest.java          |   0
 .../TableStoreRowDataObjectInspectorTest.java      |   0
 .../TableStoreStringObjectInspectorTest.java       |   0
 .../TableStoreTimestampObjectInspectorTest.java    |   0
 .../TableStoreVarcharObjectInspectorTest.java      |   0
 .../store/mapred/TableStoreInputSplitTest.java     |   0
 .../store/mapred/TableStoreRecordReaderTest.java   |   0
 .../src/test/resources/log4j2-test.properties      |   0
 flink-table-store-hive/pom.xml                     | 459 +--------------------
 pom.xml                                            |   1 -
 63 files changed, 1195 insertions(+), 654 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 232015e5..56840aa3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -220,13 +220,19 @@ public class FlinkCatalog extends AbstractCatalog {
         return UpdateSchema.fromCatalogTable(table);
     }
 
-    // --------------------- unsupported methods ----------------------------
-
     @Override
     public final void open() throws CatalogException {}
 
     @Override
-    public final void close() throws CatalogException {}
+    public final void close() throws CatalogException {
+        try {
+            catalog.close();
+        } catch (Exception e) {
+            throw new CatalogException("Failed to close catalog " + catalog.toString(), e);
+        }
+    }
+
+    // --------------------- unsupported methods ----------------------------
 
     @Override
     public CatalogDatabase getDatabase(String databaseName) throws CatalogException {
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
index 6937e888..6021260f 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
@@ -21,29 +21,34 @@ package org.apache.flink.table.store.connector;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory;
+import org.apache.flink.util.Preconditions;
 
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ServiceLoader;
 import java.util.Set;
-
-import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
+import java.util.stream.Collectors;
 
 /** Factory for {@link FlinkCatalog}. */
-public class FlinkCatalogFactory implements CatalogFactory {
+public class FlinkCatalogFactory implements org.apache.flink.table.factories.CatalogFactory {
 
     public static final String IDENTIFIER = "table-store";
 
-    public static final ConfigOption<String> DEFAULT_DATABASE =
-            ConfigOptions.key("default-database").stringType().defaultValue("default");
-
-    public static final ConfigOption<String> WAREHOUSE =
+    public static final ConfigOption<String> METASTORE =
+            ConfigOptions.key("metastore")
+                    .stringType()
+                    .defaultValue(FileSystemCatalogFactory.IDENTIFIER);
+    private static final ConfigOption<String> WAREHOUSE =
             ConfigOptions.key("warehouse")
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The warehouse root path of catalog.");
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key("default-database").stringType().defaultValue("default");
 
     @Override
     public String factoryIdentifier() {
@@ -52,35 +57,58 @@ public class FlinkCatalogFactory implements CatalogFactory {
 
     @Override
     public Set<ConfigOption<?>> requiredOptions() {
-        Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(WAREHOUSE);
-        return options;
+        return Collections.emptySet();
     }
 
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
-        Set<ConfigOption<?>> options = new HashSet<>();
-        options.add(DEFAULT_DATABASE);
-        options.add(PROPERTY_VERSION);
-        return options;
+        return Collections.emptySet();
     }
 
     @Override
     public FlinkCatalog createCatalog(Context context) {
         FactoryUtil.CatalogFactoryHelper helper =
                 FactoryUtil.createCatalogFactoryHelper(this, context);
-        helper.validate();
         ReadableConfig options = helper.getOptions();
-        return createCatalog(
-                new Path(options.get(WAREHOUSE)), context.getName(), options.get(DEFAULT_DATABASE));
+        return createCatalog(context.getName(), options);
     }
 
-    public static FlinkCatalog createCatalog(Path warehouse, String catalogName) {
-        return createCatalog(warehouse, catalogName, DEFAULT_DATABASE.defaultValue());
-    }
+    public static FlinkCatalog createCatalog(String catalogName, ReadableConfig options) {
+        // manual validation
+        // because different catalog types may have different options
+        // we can't list them all in the optionalOptions() method
+        String warehouse =
+                Preconditions.checkNotNull(
+                        options.get(WAREHOUSE),
+                        "Table store '" + WAREHOUSE.key() + "' path must be set");
+
+        String metastore = options.get(METASTORE);
+        List<CatalogFactory> factories = new ArrayList<>();
+        ServiceLoader.load(CatalogFactory.class, Thread.currentThread().getContextClassLoader())
+                .iterator()
+                .forEachRemaining(
+                        f -> {
+                            if (f.identifier().equals(metastore)) {
+                                factories.add(f);
+                            }
+                        });
+        if (factories.size() != 1) {
+            throw new RuntimeException(
+                    "Found "
+                            + factories.size()
+                            + " classes implementing "
+                            + CatalogFactory.class.getName()
+                            + " with metastore "
+                            + metastore
+                            + ". They are:\n"
+                            + factories.stream()
+                                    .map(t -> t.getClass().getName())
+                                    .collect(Collectors.joining("\n")));
+        }
 
-    public static FlinkCatalog createCatalog(
-            Path warehouse, String catalogName, String defaultDatabase) {
-        return new FlinkCatalog(Catalog.create(warehouse), catalogName, defaultDatabase);
+        return new FlinkCatalog(
+                factories.get(0).create(warehouse, options),
+                catalogName,
+                options.get(DEFAULT_DATABASE));
     }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
index 79b8153d..8294294e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.connector;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
@@ -71,7 +72,9 @@ public class FlinkCatalogTest {
     @Before
     public void beforeEach() throws IOException {
         String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
-        catalog = FlinkCatalogFactory.createCatalog(new Path(path), "test-catalog");
+        Configuration conf = new Configuration();
+        conf.setString("warehouse", path);
+        catalog = FlinkCatalogFactory.createCatalog("test-catalog", conf);
     }
 
     private ResolvedSchema createSchema() {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
new file mode 100644
index 00000000..c3e2dd59
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+
+/** Common implementation of {@link Catalog}. */
+public abstract class AbstractCatalog implements Catalog {
+
+    protected static final String DB_SUFFIX = ".db";
+
+    @Override
+    public Path getTableLocation(ObjectPath tablePath) {
+        return new Path(databasePath(tablePath.getDatabaseName()), tablePath.getObjectName());
+    }
+
+    protected Path databasePath(String database) {
+        return new Path(warehouse(), database + DB_SUFFIX);
+    }
+
+    protected abstract String warehouse();
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 01b23661..65c099f4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -241,9 +241,4 @@ public interface Catalog extends AutoCloseable {
             return tablePath;
         }
     }
-
-    /** Create a {@link Catalog} from warehouse path. */
-    static Catalog create(Path warehouse) {
-        return new FileSystemCatalog(warehouse);
-    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
new file mode 100644
index 00000000..5e93a3ed
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Factory to create {@link Catalog}. Each factory should have a unique identifier. */
+public interface CatalogFactory {
+
+    String identifier();
+
+    Catalog create(String warehouse, ReadableConfig options);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index d4a451ea..451ad8f5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.file.catalog;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -34,9 +33,7 @@ import java.util.concurrent.Callable;
 import static org.apache.flink.table.store.file.utils.FileUtils.safelyListFileStatus;
 
 /** A catalog implementation for {@link FileSystem}. */
-public class FileSystemCatalog implements Catalog {
-
-    public static final String DB_SUFFIX = ".db";
+public class FileSystemCatalog extends AbstractCatalog {
 
     private final FileSystem fs;
     private final Path warehouse;
@@ -108,14 +105,9 @@ public class FileSystemCatalog implements Catalog {
         return tables;
     }
 
-    @Override
-    public Path getTableLocation(ObjectPath tablePath) {
-        return tablePath(tablePath);
-    }
-
     @Override
     public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException {
-        Path path = tablePath(tablePath);
+        Path path = getTableLocation(tablePath);
         return new SchemaManager(path)
                 .latest()
                 .orElseThrow(() -> new TableNotExistException(tablePath));
@@ -123,7 +115,7 @@ public class FileSystemCatalog implements Catalog {
 
     @Override
     public boolean tableExists(ObjectPath tablePath) {
-        return tableExists(tablePath(tablePath));
+        return tableExists(getTableLocation(tablePath));
     }
 
     private boolean tableExists(Path tablePath) {
@@ -133,7 +125,7 @@ public class FileSystemCatalog implements Catalog {
     @Override
     public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
             throws TableNotExistException {
-        Path path = tablePath(tablePath);
+        Path path = getTableLocation(tablePath);
         if (!tableExists(path)) {
             if (ignoreIfNotExists) {
                 return;
@@ -152,7 +144,7 @@ public class FileSystemCatalog implements Catalog {
             throw new DatabaseNotExistException(tablePath.getDatabaseName());
         }
 
-        Path path = tablePath(tablePath);
+        Path path = getTableLocation(tablePath);
         if (tableExists(path)) {
             if (ignoreIfExists) {
                 return;
@@ -167,7 +159,7 @@ public class FileSystemCatalog implements Catalog {
     @Override
     public void alterTable(ObjectPath tablePath, UpdateSchema newTable, boolean ignoreIfNotExists)
             throws TableNotExistException {
-        Path path = tablePath(tablePath);
+        Path path = getTableLocation(tablePath);
         if (!tableExists(path)) {
             if (ignoreIfNotExists) {
                 return;
@@ -196,19 +188,15 @@ public class FileSystemCatalog implements Catalog {
         return name.substring(0, name.length() - DB_SUFFIX.length());
     }
 
-    private Path databasePath(String database) {
-        return new Path(warehouse, database + DB_SUFFIX);
-    }
-
-    @VisibleForTesting
-    Path tablePath(ObjectPath objectPath) {
-        return new Path(databasePath(objectPath.getDatabaseName()), objectPath.getObjectName());
-    }
-
     private void commitTableChange(Path tablePath, UpdateSchema table) {
         uncheck(() -> new SchemaManager(tablePath).commitNewVersion(table));
     }
 
     @Override
     public void close() throws Exception {}
+
+    @Override
+    protected String warehouse() {
+        return warehouse.toString();
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
new file mode 100644
index 00000000..c705872e
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.catalog;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.Path;
+
+/** Factory to create {@link FileSystemCatalog}. */
+public class FileSystemCatalogFactory implements CatalogFactory {
+
+    public static final String IDENTIFIER = "filesystem";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Catalog create(String warehouse, ReadableConfig options) {
+        return new FileSystemCatalog(new Path(warehouse));
+    }
+}
diff --git a/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory
new file mode 100644
index 00000000..3edb81e1
--- /dev/null
+++ b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory
diff --git a/flink-table-store-dist/pom.xml b/flink-table-store-dist/pom.xml
index f58e91f5..8da30d51 100644
--- a/flink-table-store-dist/pom.xml
+++ b/flink-table-store-dist/pom.xml
@@ -62,6 +62,12 @@ under the License.
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-hive-catalog</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-store-kafka</artifactId>
@@ -97,6 +103,7 @@ under the License.
                                     <include>org.apache.flink:flink-table-store-connector</include>
                                     <include>org.apache.flink:flink-table-store-core</include>
                                     <include>org.apache.flink:flink-table-store-format</include>
+                                    <include>org.apache.flink:flink-table-store-hive-catalog</include>
                                     <include>org.apache.flink:flink-table-store-kafka</include>
                                     <include>org.apache.flink:flink-sql-connector-kafka</include>
                                 </includes>
diff --git a/flink-table-store-e2e-tests/pom.xml b/flink-table-store-e2e-tests/pom.xml
index aefec3cb..85a62b5f 100644
--- a/flink-table-store-e2e-tests/pom.xml
+++ b/flink-table-store-e2e-tests/pom.xml
@@ -40,7 +40,7 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-hive</artifactId>
+            <artifactId>flink-table-store-hive-connector</artifactId>
             <version>${project.version}</version>
         </dependency>
 
@@ -105,9 +105,9 @@ under the License.
                         </artifactItem>
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
-                            <artifactId>flink-table-store-hive</artifactId>
+                            <artifactId>flink-table-store-hive-connector</artifactId>
                             <version>${project.version}</version>
-                            <destFileName>flink-table-store-hive.jar</destFileName>
+                            <destFileName>flink-table-store-hive-connector.jar</destFileName>
                             <type>jar</type>
                             <overWrite>true</overWrite>
                             <outputDirectory>${project.build.directory}/dependencies
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
index 563c121b..be375f42 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
@@ -65,7 +65,8 @@ public abstract class E2eTestBase {
     }
 
     private static final String TABLE_STORE_JAR_NAME = "flink-table-store.jar";
-    protected static final String TABLE_STORE_HIVE_JAR_NAME = "flink-table-store-hive.jar";
+    protected static final String TABLE_STORE_HIVE_CONNECTOR_JAR_NAME =
+            "flink-table-store-hive-connector.jar";
     private static final String BUNDLED_HADOOP_JAR_NAME = "bundled-hadoop.jar";
 
     protected static final String TEST_DATA_DIR = "/test-data";
@@ -122,7 +123,7 @@ public abstract class E2eTestBase {
         jobManager.execInContainer("chown", "-R", "flink:flink", TEST_DATA_DIR);
 
         copyResource(TABLE_STORE_JAR_NAME);
-        copyResource(TABLE_STORE_HIVE_JAR_NAME);
+        copyResource(TABLE_STORE_HIVE_CONNECTOR_JAR_NAME);
         copyResource(BUNDLED_HADOOP_JAR_NAME);
     }
 
diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
index 98ce0893..9a4f7833 100644
--- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
+++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
@@ -35,7 +35,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class HiveE2eTest extends E2eTestBase {
 
     private static final String ADD_JAR_HQL =
-            "ADD JAR " + TEST_DATA_DIR + "/" + TABLE_STORE_HIVE_JAR_NAME + ";";
+            "ADD JAR " + TEST_DATA_DIR + "/" + TABLE_STORE_HIVE_CONNECTOR_JAR_NAME + ";";
 
     public HiveE2eTest() {
         super(false, true);
@@ -43,7 +43,6 @@ public class HiveE2eTest extends E2eTestBase {
 
     @Test
     public void testReadExternalTable() throws Exception {
-        // TODO write data directly to HDFS after FLINK-27562 is solved
         String tableStorePkDdl =
                 "CREATE TABLE IF NOT EXISTS table_store_pk (\n"
                         + "  a int,\n"
@@ -54,7 +53,7 @@ public class HiveE2eTest extends E2eTestBase {
                         + "  'bucket' = '2',\n"
                         + "  'root-path' = '%s'\n"
                         + ");";
-        String tableStorePkPath = TEST_DATA_DIR + "/" + UUID.randomUUID().toString() + ".store";
+        String tableStorePkPath = HDFS_ROOT + "/" + UUID.randomUUID().toString() + ".store";
         tableStorePkDdl = String.format(tableStorePkDdl, tableStorePkPath);
         runSql(
                 "INSERT INTO table_store_pk VALUES "
@@ -68,8 +67,6 @@ public class HiveE2eTest extends E2eTestBase {
                 "CREATE EXTERNAL TABLE IF NOT EXISTS table_store_pk\n"
                         + "STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'\n"
                         + "LOCATION '"
-                        // hive cannot read from local path
-                        + HDFS_ROOT
                         + tableStorePkPath
                         + "/default_catalog.catalog/default_database.db/table_store_pk';";
         writeSharedFile(
@@ -78,13 +75,9 @@ public class HiveE2eTest extends E2eTestBase {
                 ADD_JAR_HQL
                         + "\n"
                         + externalTablePkDdl
-                        + "\n"
-                        + "SELECT b, a, c FROM table_store_pk ORDER BY b;");
+                        + "\nSELECT b, a, c FROM table_store_pk ORDER BY b;");
 
         ContainerState hive = getHive();
-        hive.execInContainer("hdfs", "dfs", "-mkdir", "-p", HDFS_ROOT + TEST_DATA_DIR);
-        hive.execInContainer(
-                "hdfs", "dfs", "-copyFromLocal", tableStorePkPath, HDFS_ROOT + tableStorePkPath);
         Container.ExecResult execResult =
                 hive.execInContainer(
                         "/opt/hive/bin/hive",
@@ -99,6 +92,54 @@ public class HiveE2eTest extends E2eTestBase {
         }
     }
 
+    @Test
+    public void testFlinkWriteAndHiveRead() throws Exception {
+        String sql =
+                String.join(
+                        "\n",
+                        "CREATE CATALOG my_hive WITH (",
+                        "  'type' = 'table-store',",
+                        "  'metastore' = 'hive',",
+                        "  'uri' = 'thrift://hive-metastore:9083',",
+                        "  'warehouse' = '"
+                                + HDFS_ROOT
+                                + "/"
+                                + UUID.randomUUID().toString()
+                                + ".warehouse'",
+                        ");",
+                        "",
+                        "USE CATALOG my_hive;",
+                        "",
+                        "CREATE TABLE T (",
+                        "  a int,",
+                        "  b bigint,",
+                        "  c string",
+                        ") WITH (",
+                        "  'bucket' = '2'",
+                        ");",
+                        "",
+                        "INSERT INTO T VALUES (1, 10, 'Hi'), (2, 20, 'Hello');");
+        runSql(sql);
+
+        writeSharedFile(
+                "query.hql",
+                // same default database name as Flink
+                ADD_JAR_HQL + "\nSELECT b, a, c FROM t ORDER BY b;");
+
+        ContainerState hive = getHive();
+        Container.ExecResult execResult =
+                hive.execInContainer(
+                        "/opt/hive/bin/hive",
+                        "--hiveconf",
+                        "hive.root.logger=INFO,console",
+                        "-f",
+                        TEST_DATA_DIR + "/query.hql");
+        assertThat(execResult.getStdout()).isEqualTo("10\t1\tHi\n" + "20\t2\tHello\n");
+        if (execResult.getExitCode() != 0) {
+            throw new AssertionError("Failed when running hive sql.");
+        }
+    }
+
     private ContainerState getHive() {
         return environment.getContainerByServiceName("hive-server_1").get();
     }
diff --git a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index 42c89b27..d414d99f 100644
--- a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++ b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -25,10 +25,10 @@ services:
   # ----------------------------------------
 
   jobmanager:
-    image: apache/flink:${flink.version}
+    image: apache/flink:${flink.version}-java8
     volumes:
       - testdata:/test-data
-    command: jobmanager
+    entrypoint: /bin/bash -c "wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar && /docker-entrypoint.sh jobmanager"
     env_file:
       - ./flink.env
     networks:
@@ -39,10 +39,10 @@ services:
       - "8081"
 
   taskmanager:
-    image: apache/flink:${flink.version}
+    image: apache/flink:${flink.version}-java8
     volumes:
       - testdata:/test-data
-    command: taskmanager
+    entrypoint: /bin/bash -c "wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar && /docker-entrypoint.sh taskmanager"
     env_file:
       - ./flink.env
     networks:
diff --git a/flink-table-store-hive/pom.xml b/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
similarity index 81%
copy from flink-table-store-hive/pom.xml
copy to flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
index 1d6bb57d..8d5c6c86 100644
--- a/flink-table-store-hive/pom.xml
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml
@@ -23,68 +23,41 @@ under the License.
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
-        <artifactId>flink-table-store-parent</artifactId>
+        <artifactId>flink-table-store-hive</artifactId>
         <groupId>org.apache.flink</groupId>
         <version>0.2-SNAPSHOT</version>
     </parent>
 
-    <artifactId>flink-table-store-hive</artifactId>
-    <name>Flink Table Store : Hive</name>
+    <artifactId>flink-table-store-hive-catalog</artifactId>
+    <name>Flink Table Store : Hive Catalog</name>
 
     <packaging>jar</packaging>
 
-    <properties>
-        <hiverunner.version>4.0.0</hiverunner.version>
-        <reflections.version>0.9.8</reflections.version>
-    </properties>
-
     <dependencies>
-        <!-- Flink All dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-shade</artifactId>
+            <artifactId>flink-table-store-core</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-hive-common</artifactId>
+            <version>${project.version}</version>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
-            <version>${hadoop.version}</version>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
             <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
 
         <dependency>
             <groupId>org.apache.hive</groupId>
-            <artifactId>hive-exec</artifactId>
+            <artifactId>hive-metastore</artifactId>
             <version>${hive.version}</version>
-            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <groupId>log4j</groupId>
@@ -110,11 +83,6 @@ under the License.
                     <groupId>org.apache.orc</groupId>
                     <artifactId>orc-core</artifactId>
                 </exclusion>
-                <!-- this dependency cannot be fetched from central maven repository anymore -->
-                <exclusion>
-                    <groupId>org.pentaho</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
             </exclusions>
         </dependency>
 
@@ -122,26 +90,31 @@ under the License.
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-core</artifactId>
+            <artifactId>flink-table-store-connector</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
-            <type>test-jar</type>
         </dependency>
 
-        <!--
-        IDEA reads classes from the same project from target/classes of that module,
-        so even though we've packaged and shaded avro classes into flink-table-store-format.jar
-        we still have to include this test dependency here.
-        -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-hive-connector</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-sql-avro</artifactId>
+            <artifactId>flink-table-planner-loader</artifactId>
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
 
-        <!-- dependencies for IT cases -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -158,6 +131,34 @@ under the License.
             <type>test-jar</type>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.junit.vintage</groupId>
+                    <artifactId>junit-vintage-engine</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-avro</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>com.klarna</groupId>
             <artifactId>hiverunner</artifactId>
@@ -475,11 +476,41 @@ under the License.
                             <goal>shade</goal>
                         </goals>
                         <configuration>
+                            <minimizeJar>true</minimizeJar>
                             <artifactSet>
-                                <includes combine.children="append">
-                                    <include>org.apache.flink:flink-table-store-shade</include>
+                                <includes>
+                                    <include>org.apache.flink:flink-table-store-hive-common</include>
+                                    <include>org.apache.thrift:libthrift</include>
+                                    <include>org.apache.thrift:libfb303</include>
+                                    <include>com.google.guava:guava</include>
+                                    <include>org.apache.hive:hive-common</include>
+                                    <include>org.apache.hive.shims:hive-shims-common</include>
+                                    <include>org.apache.hive:hive-serde</include>
+                                    <include>org.apache.hive:hive-metastore</include>
                                 </includes>
                             </artifactSet>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.facebook.fb303</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.com.facebook.fb303</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.com.google.common</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.hadoop.hive</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.hadoop.hive</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.hive</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.hive</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.thrift</pattern>
+                                    <shadedPattern>org.apache.flink.table.store.shaded.org.apache.thrift</shadedPattern>
+                                </relocation>
+                            </relocations>
                         </configuration>
                     </execution>
                 </executions>
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
new file mode 100644
index 00000000..50a3f635
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.catalog.AbstractCatalog;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.thrift.TException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A catalog implementation for Hive. */
+public class HiveCatalog extends AbstractCatalog {
+
+    // we don't include flink-table-store-hive-connector as dependencies because it depends on
+    // hive-exec
+    private static final String INPUT_FORMAT_CLASS_NAME =
+            "org.apache.flink.table.store.mapred.TableStoreInputFormat";
+    private static final String OUTPUT_FORMAT_CLASS_NAME =
+            "org.apache.flink.table.store.mapred.TableStoreOutputFormat";
+    private static final String SERDE_CLASS_NAME =
+            "org.apache.flink.table.store.hive.TableStoreSerDe";
+    private static final String STORAGE_HANDLER_CLASS_NAME =
+            "org.apache.flink.table.store.hive.TableStoreHiveStorageHandler";
+
+    private final HiveConf hiveConf;
+    private final IMetaStoreClient client;
+
+    public HiveCatalog(String thriftUri, String warehousePath) {
+        Configuration conf = new Configuration();
+        conf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri);
+        conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousePath);
+        this.hiveConf = new HiveConf(conf, HiveConf.class);
+        try {
+            IMetaStoreClient client =
+                    RetryingMetaStoreClient.getProxy(
+                            hiveConf, tbl -> null, HiveMetaStoreClient.class.getName());
+            this.client =
+                    StringUtils.isNullOrWhitespaceOnly(thriftUri)
+                            ? client
+                            : HiveMetaStoreClient.newSynchronizedClient(client);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public List<String> listDatabases() {
+        try {
+            return client.getAllDatabases();
+        } catch (TException e) {
+            throw new RuntimeException("Failed to list all databases", e);
+        }
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) {
+        try {
+            client.getDatabase(databaseName);
+            return true;
+        } catch (NoSuchObjectException e) {
+            return false;
+        } catch (TException e) {
+            throw new RuntimeException(
+                    "Failed to determine if database " + databaseName + " exists", e);
+        }
+    }
+
+    @Override
+    public void createDatabase(String name, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException {
+        try {
+            client.createDatabase(convertToDatabase(name));
+        } catch (AlreadyExistsException e) {
+            if (!ignoreIfExists) {
+                throw new DatabaseAlreadyExistException(name, e);
+            }
+        } catch (TException e) {
+            throw new RuntimeException("Failed to create database " + name, e);
+        }
+    }
+
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException {
+        try {
+            if (!cascade && client.getAllTables(name).size() > 0) {
+                throw new DatabaseNotEmptyException(name);
+            }
+            client.dropDatabase(name, true, false, true);
+        } catch (NoSuchObjectException | UnknownDBException e) {
+            if (!ignoreIfNotExists) {
+                throw new DatabaseNotExistException(name, e);
+            }
+        } catch (TException e) {
+            throw new RuntimeException("Failed to drop database " + name, e);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName) throws DatabaseNotExistException {
+        try {
+            return client.getAllTables(databaseName);
+        } catch (UnknownDBException e) {
+            throw new DatabaseNotExistException(databaseName, e);
+        } catch (TException e) {
+            throw new RuntimeException("Failed to list all tables in database " + databaseName, e);
+        }
+    }
+
+    @Override
+    public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException {
+        if (isTableStoreTableNotExisted(tablePath)) {
+            throw new TableNotExistException(tablePath);
+        }
+        Path tableLocation = getTableLocation(tablePath);
+        return new SchemaManager(tableLocation)
+                .latest()
+                .orElseThrow(
+                        () -> new RuntimeException("There is no table stored in " + tableLocation));
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) {
+        try {
+            client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+            return true;
+        } catch (NoSuchObjectException e) {
+            return false;
+        } catch (TException e) {
+            throw new RuntimeException(
+                    "Failed to determine if table " + tablePath.getFullName() + " exists", e);
+        }
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException {
+        if (isTableStoreTableNotExisted(tablePath)) {
+            if (ignoreIfNotExists) {
+                return;
+            } else {
+                throw new TableNotExistException(tablePath);
+            }
+        }
+
+        try {
+            client.dropTable(
+                    tablePath.getDatabaseName(), tablePath.getObjectName(), true, false, true);
+        } catch (TException e) {
+            throw new RuntimeException("Failed to drop table " + tablePath.getFullName(), e);
+        }
+    }
+
+    @Override
+    public void createTable(ObjectPath tablePath, UpdateSchema updateSchema, boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException {
+        String databaseName = tablePath.getDatabaseName();
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(databaseName);
+        }
+        if (tableExists(tablePath)) {
+            if (ignoreIfExists) {
+                return;
+            } else {
+                throw new TableAlreadyExistException(tablePath);
+            }
+        }
+
+        // first commit changes to underlying files
+        // if changes on Hive fails there is no harm to perform the same changes to files again
+        TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema);
+        Table table = newHmsTable(tablePath);
+        updateHmsTable(table, tablePath, schema);
+        try {
+            client.createTable(table);
+        } catch (TException e) {
+            throw new RuntimeException("Failed to create table " + tablePath.getFullName(), e);
+        }
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath tablePath, UpdateSchema updateSchema, boolean ignoreIfNotExists)
+            throws TableNotExistException {
+        if (isTableStoreTableNotExisted(tablePath)) {
+            if (ignoreIfNotExists) {
+                return;
+            } else {
+                throw new TableNotExistException(tablePath);
+            }
+        }
+
+        // first commit changes to underlying files
+        // if changes on Hive fails there is no harm to perform the same changes to files again
+        TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema);
+        try {
+            Table table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+            updateHmsTable(table, tablePath, schema);
+            client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
+        } catch (TException e) {
+            throw new RuntimeException("Failed to alter table " + tablePath.getFullName(), e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        client.close();
+    }
+
+    @Override
+    protected String warehouse() {
+        return hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
+    }
+
+    private Database convertToDatabase(String name) {
+        Database database = new Database();
+        database.setName(name);
+        database.setLocationUri(databasePath(name).toString());
+        return database;
+    }
+
+    private Table newHmsTable(ObjectPath tablePath) {
+        long currentTimeMillis = System.currentTimeMillis();
+        Table table =
+                new Table(
+                        tablePath.getObjectName(),
+                        tablePath.getDatabaseName(),
+                        // current linux user
+                        System.getProperty("user.name"),
+                        (int) (currentTimeMillis / 1000),
+                        (int) (currentTimeMillis / 1000),
+                        Integer.MAX_VALUE,
+                        null,
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        null,
+                        null,
+                        TableType.MANAGED_TABLE.toString());
+        table.getParameters()
+                .put(hive_metastoreConstants.META_TABLE_STORAGE, STORAGE_HANDLER_CLASS_NAME);
+        return table;
+    }
+
+    private void updateHmsTable(Table table, ObjectPath tablePath, TableSchema schema) {
+        StorageDescriptor sd = convertToStorageDescriptor(tablePath, schema);
+        table.setSd(sd);
+    }
+
+    private StorageDescriptor convertToStorageDescriptor(ObjectPath tablePath, TableSchema schema) {
+        StorageDescriptor sd = new StorageDescriptor();
+
+        sd.setCols(
+                schema.fields().stream()
+                        .map(this::convertToFieldSchema)
+                        .collect(Collectors.toList()));
+        sd.setLocation(getTableLocation(tablePath).toString());
+
+        sd.setInputFormat(INPUT_FORMAT_CLASS_NAME);
+        sd.setOutputFormat(OUTPUT_FORMAT_CLASS_NAME);
+
+        SerDeInfo serDeInfo = new SerDeInfo();
+        serDeInfo.setParameters(new HashMap<>());
+        serDeInfo.setSerializationLib(SERDE_CLASS_NAME);
+        sd.setSerdeInfo(serDeInfo);
+
+        return sd;
+    }
+
+    private FieldSchema convertToFieldSchema(DataField dataField) {
+        return new FieldSchema(
+                dataField.name(),
+                HiveTypeUtils.logicalTypeToTypeInfo(dataField.type().logicalType()).getTypeName(),
+                dataField.description());
+    }
+
+    private boolean isTableStoreTableNotExisted(ObjectPath tablePath) {
+        Table table;
+        try {
+            table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
+        } catch (NoSuchObjectException e) {
+            return true;
+        } catch (TException e) {
+            throw new RuntimeException(
+                    "Cannot determine if table "
+                            + tablePath.getFullName()
+                            + " is a table store table.",
+                    e);
+        }
+
+        if (!INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat())
+                || !OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat())) {
+            throw new IllegalArgumentException(
+                    "Table "
+                            + tablePath.getFullName()
+                            + " is not a table store table. It's input format is "
+                            + table.getSd().getInputFormat()
+                            + " and its output format is "
+                            + table.getSd().getOutputFormat());
+        }
+        return false;
+    }
+
+    private TableSchema commitToUnderlyingFiles(ObjectPath tablePath, UpdateSchema schema) {
+        Path path = getTableLocation(tablePath);
+        try {
+            return new SchemaManager(path).commitNewVersion(schema);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to commit changes of table "
+                            + tablePath.getFullName()
+                            + " to underlying files",
+                    e);
+        }
+    }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
new file mode 100644
index 00000000..1e2cbabb
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.store.file.catalog.Catalog;
+import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.util.Preconditions;
+
+/** Factory to create {@link HiveCatalog}. */
+public class HiveCatalogFactory implements CatalogFactory {
+
+    private static final String IDENTIFIER = "hive";
+
+    private static final ConfigOption<String> URI =
+            ConfigOptions.key("uri")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Uri of Hive metastore's thrift server.");
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Catalog create(String warehouse, ReadableConfig options) {
+        String uri =
+                Preconditions.checkNotNull(
+                        options.get(URI),
+                        URI.key() + " must be set for table store " + IDENTIFIER + " catalog");
+        return new HiveCatalog(uri, warehouse);
+    }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory
new file mode 100644
index 00000000..e9404fcd
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.hive.HiveCatalogFactory
diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
new file mode 100644
index 00000000..1b5654b7
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive;
+
+import org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/** IT cases for {@link HiveCatalog}. */
+@RunWith(FlinkEmbeddedHiveRunner.class)
+public class HiveCatalogITCase {
+
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+    private String path;
+    private TableEnvironment tEnv;
+
+    @HiveSQL(files = {})
+    private static HiveShell hiveShell;
+
+    @Before
+    public void before() throws Exception {
+        hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
+        hiveShell.execute("USE test_db");
+        hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )");
+        hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200, 'Table')");
+
+        path = folder.newFolder().toURI().toString();
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
+        tEnv = TableEnvironmentImpl.create(settings);
+        tEnv.executeSql(
+                        String.join(
+                                "\n",
+                                "CREATE CATALOG my_hive WITH (",
+                                "  'type' = 'table-store',",
+                                "  'metastore' = 'hive',",
+                                "  'uri' = '',",
+                                "  'warehouse' = '" + path + "'",
+                                ")"))
+                .await();
+        tEnv.executeSql("USE CATALOG my_hive").await();
+        tEnv.executeSql("USE test_db").await();
+    }
+
+    @After
+    public void after() {
+        hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
+        hiveShell.execute("DROP DATABASE IF EXISTS test_db2 CASCADE");
+    }
+
+    @Test
+    public void testDatabaseOperations() throws Exception {
+        // create database
+        tEnv.executeSql("CREATE DATABASE test_db2").await();
+        Assert.assertEquals(
+                Arrays.asList(Row.of("default"), Row.of("test_db"), Row.of("test_db2")),
+                collect("SHOW DATABASES"));
+        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db2").await();
+        try {
+            tEnv.executeSql("CREATE DATABASE test_db2").await();
+            Assert.fail("No exception is thrown");
+        } catch (Throwable t) {
+            ExceptionUtils.assertThrowableWithMessage(
+                    t, "Database test_db2 already exists in Catalog my_hive");
+        }
+
+        // drop database
+        tEnv.executeSql("DROP DATABASE test_db2").await();
+        Assert.assertEquals(
+                Arrays.asList(Row.of("default"), Row.of("test_db")), collect("SHOW DATABASES"));
+        tEnv.executeSql("DROP DATABASE IF EXISTS test_db2").await();
+        try {
+            tEnv.executeSql("DROP DATABASE test_db2").await();
+            Assert.fail("No exception is thrown");
+        } catch (Throwable t) {
+            ExceptionUtils.assertThrowableWithMessage(
+                    t, "Database test_db2 does not exist in Catalog my_hive");
+        }
+
+        // drop non-empty database
+        tEnv.executeSql("CREATE DATABASE test_db2").await();
+        tEnv.executeSql("USE test_db2").await();
+        tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+                .await();
+        tEnv.executeSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')").await();
+        Path tablePath = new Path(path, "test_db2.db/T");
+        Assert.assertTrue(tablePath.getFileSystem().exists(tablePath));
+        try {
+            tEnv.executeSql("DROP DATABASE test_db2").await();
+            Assert.fail("No exception is thrown");
+        } catch (Throwable t) {
+            ExceptionUtils.assertThrowableWithMessage(
+                    t, "Database test_db2 in catalog my_hive is not empty");
+        }
+        tEnv.executeSql("DROP DATABASE test_db2 CASCADE").await();
+        Assert.assertEquals(
+                Arrays.asList(Row.of("default"), Row.of("test_db")), collect("SHOW DATABASES"));
+        Assert.assertFalse(tablePath.getFileSystem().exists(tablePath));
+    }
+
+    @Test
+    public void testTableOperations() throws Exception {
+        // create table
+        tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+                .await();
+        tEnv.executeSql("CREATE TABLE S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+                .await();
+        Assert.assertEquals(
+                Arrays.asList(Row.of("hive_table"), Row.of("s"), Row.of("t")),
+                collect("SHOW TABLES"));
+        tEnv.executeSql(
+                        "CREATE TABLE IF NOT EXISTS S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+                .await();
+        try {
+            tEnv.executeSql("CREATE TABLE S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+                    .await();
+            Assert.fail("No exception is thrown");
+        } catch (Throwable t) {
+            ExceptionUtils.assertThrowableWithMessage(
+                    t, "Table (or view) test_db.S already exists in Catalog my_hive");
+        }
+
+        // drop table
+        tEnv.executeSql("INSERT INTO S VALUES (1, 'Hi'), (2, 'Hello')").await();
+        Path tablePath = new Path(path, "test_db.db/S");
+        Assert.assertTrue(tablePath.getFileSystem().exists(tablePath));
+        tEnv.executeSql("DROP TABLE S").await();
+        Assert.assertEquals(
+                Arrays.asList(Row.of("hive_table"), Row.of("t")), collect("SHOW TABLES"));
+        Assert.assertFalse(tablePath.getFileSystem().exists(tablePath));
+        tEnv.executeSql("DROP TABLE IF EXISTS S").await();
+        try {
+            tEnv.executeSql("DROP TABLE S").await();
+            Assert.fail("No exception is thrown");
+        } catch (Throwable t) {
+            ExceptionUtils.assertThrowableWithMessage(
+                    t, "Table with identifier 'my_hive.test_db.S' does not exist");
+        }
+        try {
+            tEnv.executeSql("DROP TABLE hive_table").await();
+            Assert.fail("No exception is thrown");
+        } catch (Throwable t) {
+            ExceptionUtils.assertThrowableWithMessage(
+                    t, "Table test_db.hive_table is not a table store table");
+        }
+
+        // alter table
+        tEnv.executeSql("ALTER TABLE T SET ( 'manifest.target-file-size' = '16MB' )").await();
+        List<Row> actual = collect("SHOW CREATE TABLE T");
+        Assert.assertEquals(1, actual.size());
+        Assert.assertTrue(
+                actual.get(0)
+                        .getField(0)
+                        .toString()
+                        .contains("'manifest.target-file-size' = '16MB'"));
+        try {
+            tEnv.executeSql("ALTER TABLE S SET ( 'manifest.target-file-size' = '16MB' )").await();
+            Assert.fail("No exception is thrown");
+        } catch (Throwable t) {
+            ExceptionUtils.assertThrowableWithMessage(
+                    t, "Table `my_hive`.`test_db`.`S` doesn't exist or is a temporary table");
+        }
+        try {
+            tEnv.executeSql("ALTER TABLE hive_table SET ( 'manifest.target-file-size' = '16MB' )")
+                    .await();
+            Assert.fail("No exception is thrown");
+        } catch (Throwable t) {
+            ExceptionUtils.assertThrowableWithMessage(
+                    t, "Table test_db.hive_table is not a table store table");
+        }
+    }
+
+    @Test
+    public void testFlinkWriteAndHiveRead() throws Exception {
+        tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )")
+                .await();
+        tEnv.executeSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')").await();
+        Assert.assertEquals(
+                Arrays.asList("1\tHi", "2\tHello"),
+                hiveShell.executeQuery("SELECT * FROM t ORDER BY a"));
+
+        try {
+            tEnv.executeSql("INSERT INTO hive_table VALUES (1, 'Hi'), (2, 'Hello')").await();
+            Assert.fail("No exception is thrown");
+        } catch (Throwable t) {
+            ExceptionUtils.assertThrowableWithMessage(
+                    t, "Table test_db.hive_table is not a table store table");
+        }
+    }
+
+    private List<Row> collect(String sql) throws Exception {
+        List<Row> result = new ArrayList<>();
+        try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+            while (it.hasNext()) {
+                result.add(it.next());
+            }
+        }
+        return result;
+    }
+}
diff --git a/flink-table-store-hive/flink-table-store-hive-common/pom.xml b/flink-table-store-hive/flink-table-store-hive-common/pom.xml
new file mode 100644
index 00000000..a87aa767
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-common/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-hive</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-hive-common</artifactId>
+    <name>Flink Table Store : Hive Common</name>
+
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-serde</artifactId>
+            <version>${hive.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java b/flink-table-store-hive/flink-table-store-hive-common/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
similarity index 55%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
rename to flink-table-store-hive/flink-table-store-hive-common/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
index d6061b12..b9af52f6 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
+++ b/flink-table-store-hive/flink-table-store-hive-common/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.table.store.hive;
 
-import org.apache.flink.table.store.hive.objectinspector.TableStoreCharObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreDateObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreDecimalObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreListObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreMapObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreStringObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreTimestampObjectInspector;
-import org.apache.flink.table.store.hive.objectinspector.TableStoreVarcharObjectInspector;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -33,9 +25,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.VarCharType;
 
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
@@ -93,48 +82,4 @@ public class HiveTypeUtils {
                         "Unsupported logical type " + logicalType.asSummaryString());
         }
     }
-
-    public static ObjectInspector getObjectInspector(LogicalType logicalType) {
-        switch (logicalType.getTypeRoot()) {
-            case BOOLEAN:
-            case TINYINT:
-            case SMALLINT:
-            case INTEGER:
-            case BIGINT:
-            case FLOAT:
-            case DOUBLE:
-            case BINARY:
-            case VARBINARY:
-                return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
-                        (PrimitiveTypeInfo) logicalTypeToTypeInfo(logicalType));
-            case DECIMAL:
-                DecimalType decimalType = (DecimalType) logicalType;
-                return new TableStoreDecimalObjectInspector(
-                        decimalType.getPrecision(), decimalType.getScale());
-            case CHAR:
-                CharType charType = (CharType) logicalType;
-                return new TableStoreCharObjectInspector(charType.getLength());
-            case VARCHAR:
-                VarCharType varCharType = (VarCharType) logicalType;
-                if (varCharType.getLength() == VarCharType.MAX_LENGTH) {
-                    return new TableStoreStringObjectInspector();
-                } else {
-                    return new TableStoreVarcharObjectInspector(varCharType.getLength());
-                }
-            case DATE:
-                return new TableStoreDateObjectInspector();
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return new TableStoreTimestampObjectInspector();
-            case ARRAY:
-                ArrayType arrayType = (ArrayType) logicalType;
-                return new TableStoreListObjectInspector(arrayType.getElementType());
-            case MAP:
-                MapType mapType = (MapType) logicalType;
-                return new TableStoreMapObjectInspector(
-                        mapType.getKeyType(), mapType.getValueType());
-            default:
-                throw new UnsupportedOperationException(
-                        "Unsupported logical type " + logicalType.asSummaryString());
-        }
-    }
 }
diff --git a/flink-table-store-hive/pom.xml b/flink-table-store-hive/flink-table-store-hive-connector/pom.xml
similarity index 97%
copy from flink-table-store-hive/pom.xml
copy to flink-table-store-hive/flink-table-store-hive-connector/pom.xml
index 1d6bb57d..23598562 100644
--- a/flink-table-store-hive/pom.xml
+++ b/flink-table-store-hive/flink-table-store-hive-connector/pom.xml
@@ -23,22 +23,23 @@ under the License.
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
-        <artifactId>flink-table-store-parent</artifactId>
+        <artifactId>flink-table-store-hive</artifactId>
         <groupId>org.apache.flink</groupId>
         <version>0.2-SNAPSHOT</version>
     </parent>
 
-    <artifactId>flink-table-store-hive</artifactId>
-    <name>Flink Table Store : Hive</name>
+    <artifactId>flink-table-store-hive-connector</artifactId>
+    <name>Flink Table Store : Hive Connector</name>
 
     <packaging>jar</packaging>
 
-    <properties>
-        <hiverunner.version>4.0.0</hiverunner.version>
-        <reflections.version>0.9.8</reflections.version>
-    </properties>
-
     <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-hive-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <!-- Flink All dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -477,6 +478,7 @@ under the License.
                         <configuration>
                             <artifactSet>
                                 <includes combine.children="append">
+                                    <include>org.apache.flink:flink-table-store-hive-common</include>
                                     <include>org.apache.flink:flink-table-store-shade</include>
                                 </includes>
                             </artifactSet>
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/RowDataContainer.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/RowDataContainer.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/RowDataContainer.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/RowDataContainer.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
similarity index 95%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
index 02b0e724..225d2ed2 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.hive.objectinspector;
 
 import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.store.hive.HiveTypeUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -41,7 +40,7 @@ public class TableStoreListObjectInspector implements ListObjectInspector {
     private final ArrayData.ElementGetter elementGetter;
 
     public TableStoreListObjectInspector(LogicalType elementType) {
-        this.elementObjectInspector = HiveTypeUtils.getObjectInspector(elementType);
+        this.elementObjectInspector = TableStoreObjectInspectorFactory.create(elementType);
         this.elementGetter = ArrayData.createElementGetter(elementType);
     }
 
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
similarity index 94%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
index 4b4d606f..c37d3f2e 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.hive.objectinspector;
 
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.store.hive.HiveTypeUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -45,8 +44,8 @@ public class TableStoreMapObjectInspector implements MapObjectInspector {
     private final ArrayData.ElementGetter valueGetter;
 
     public TableStoreMapObjectInspector(LogicalType keyType, LogicalType valueType) {
-        this.keyObjectInspector = HiveTypeUtils.getObjectInspector(keyType);
-        this.valueObjectInspector = HiveTypeUtils.getObjectInspector(valueType);
+        this.keyObjectInspector = TableStoreObjectInspectorFactory.create(keyType);
+        this.valueObjectInspector = TableStoreObjectInspectorFactory.create(valueType);
         this.keyGetter = ArrayData.createElementGetter(keyType);
         this.valueGetter = ArrayData.createElementGetter(valueType);
     }
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreObjectInspectorFactory.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreObjectInspectorFactory.java
new file mode 100644
index 00000000..dca50ca0
--- /dev/null
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreObjectInspectorFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.hive.objectinspector;
+
+import org.apache.flink.table.store.hive.HiveTypeUtils;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+
+/** Factory to create {@link ObjectInspector}s according to the given {@link LogicalType}. */
+public class TableStoreObjectInspectorFactory {
+
+    public static ObjectInspector create(LogicalType logicalType) {
+        switch (logicalType.getTypeRoot()) {
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case BINARY:
+            case VARBINARY:
+                return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
+                        (PrimitiveTypeInfo) HiveTypeUtils.logicalTypeToTypeInfo(logicalType));
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) logicalType;
+                return new TableStoreDecimalObjectInspector(
+                        decimalType.getPrecision(), decimalType.getScale());
+            case CHAR:
+                CharType charType = (CharType) logicalType;
+                return new TableStoreCharObjectInspector(charType.getLength());
+            case VARCHAR:
+                VarCharType varCharType = (VarCharType) logicalType;
+                if (varCharType.getLength() == VarCharType.MAX_LENGTH) {
+                    return new TableStoreStringObjectInspector();
+                } else {
+                    return new TableStoreVarcharObjectInspector(varCharType.getLength());
+                }
+            case DATE:
+                return new TableStoreDateObjectInspector();
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return new TableStoreTimestampObjectInspector();
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) logicalType;
+                return new TableStoreListObjectInspector(arrayType.getElementType());
+            case MAP:
+                MapType mapType = (MapType) logicalType;
+                return new TableStoreMapObjectInspector(
+                        mapType.getKeyType(), mapType.getValueType());
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported logical type " + logicalType.asSummaryString());
+        }
+    }
+}
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
similarity index 98%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
index 9545ccdc..a783be83 100644
--- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
@@ -51,7 +51,7 @@ public class TableStoreRowDataObjectInspector extends StructObjectInspector {
             TableStoreStructField structField =
                     new TableStoreStructField(
                             name,
-                            HiveTypeUtils.getObjectInspector(logicalType),
+                            TableStoreObjectInspectorFactory.create(logicalType),
                             i,
                             RowData.createFieldGetter(logicalType, i),
                             fieldComments.get(i));
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java
diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
similarity index 100%
rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
similarity index 99%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 5b518d50..d6e87bf7 100644
--- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.store.FileStoreTestUtils;
 import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.hive.objectinspector.TableStoreObjectInspectorFactory;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -225,7 +226,7 @@ public class TableStoreHiveStorageHandlerITCase {
                     continue;
                 }
                 ObjectInspector oi =
-                        HiveTypeUtils.getObjectInspector(
+                        TableStoreObjectInspectorFactory.create(
                                 RandomGenericRowDataGenerator.LOGICAL_TYPES.get(i));
                 switch (oi.getCategory()) {
                     case PRIMITIVE:
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java
diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
similarity index 100%
rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
diff --git a/flink-table-store-hive/src/test/resources/log4j2-test.properties b/flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties
similarity index 100%
rename from flink-table-store-hive/src/test/resources/log4j2-test.properties
rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties
diff --git a/flink-table-store-hive/pom.xml b/flink-table-store-hive/pom.xml
index 1d6bb57d..ebf590fb 100644
--- a/flink-table-store-hive/pom.xml
+++ b/flink-table-store-hive/pom.xml
@@ -31,459 +31,18 @@ under the License.
     <artifactId>flink-table-store-hive</artifactId>
     <name>Flink Table Store : Hive</name>
 
-    <packaging>jar</packaging>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>flink-table-store-hive-catalog</module>
+        <module>flink-table-store-hive-common</module>
+        <module>flink-table-store-hive-connector</module>
+    </modules>
 
     <properties>
+        <hive.version>2.3.4</hive.version>
         <hiverunner.version>4.0.0</hiverunner.version>
         <reflections.version>0.9.8</reflections.version>
     </properties>
 
-    <dependencies>
-        <!-- Flink All dependencies -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-shade</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
-            <version>${hadoop.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hive</groupId>
-            <artifactId>hive-exec</artifactId>
-            <version>${hive.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-annotations</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-databind</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.orc</groupId>
-                    <artifactId>orc-core</artifactId>
-                </exclusion>
-                <!-- this dependency cannot be fetched from central maven repository anymore -->
-                <exclusion>
-                    <groupId>org.pentaho</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <!-- test dependencies -->
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-store-core</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-
-        <!--
-        IDEA reads classes from the same project from target/classes of that module,
-        so even though we've packaged and shaded avro classes into flink-table-store-format.jar
-        we still have to include this test dependency here.
-        -->
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-sql-avro</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- dependencies for IT cases -->
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
-            <version>${flink.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
-        </dependency>
-
-        <dependency>
-            <groupId>com.klarna</groupId>
-            <artifactId>hiverunner</artifactId>
-            <version>${hiverunner.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-serde</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-jdbc</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-service</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-contrib</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-exec</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-hcatalog-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hive.hcatalog</groupId>
-                    <artifactId>hive-webhcat-java-client</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.tez</groupId>
-                    <artifactId>tez-common</artifactId>
-                </exclusion>
-                <exclusion>
-                    <!-- This dependency is no longer shipped with the JDK since Java 9.-->
-                    <groupId>jdk.tools</groupId>
-                    <artifactId>jdk.tools</artifactId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-common</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-auth</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-annotations</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-hdfs</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-mapreduce-client-core</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-yarn-api</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-yarn-client</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-yarn-common</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-yarn-server-common</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-yarn-server-web-proxy</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-shim</artifactId>
-                    <groupId>org.apache.tez</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>jms</artifactId>
-                    <groupId>javax.jms</groupId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.reflections</groupId>
-            <artifactId>reflections</artifactId>
-            <version>${reflections.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hive</groupId>
-            <artifactId>hive-service</artifactId>
-            <version>${hive.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-exec</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-metastore</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-                <exclusion>
-                    <!-- This dependency is no longer shipped with the JDK since Java 9.-->
-                    <groupId>jdk.tools</groupId>
-                    <artifactId>jdk.tools</artifactId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-common</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-auth</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-client</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-annotations</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-hdfs</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-mapreduce-client-core</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-yarn-api</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-yarn-common</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-yarn-registry</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-yarn-server-common</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hbase-hadoop-compat</artifactId>
-                    <groupId>org.apache.hbase</groupId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hive.hcatalog</groupId>
-            <artifactId>hive-hcatalog-core</artifactId>
-            <version>${hive.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.hive</groupId>
-                    <artifactId>hive-exec</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-common</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-archives</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-annotations</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-hdfs</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>hadoop-mapreduce-client-core</artifactId>
-                    <groupId>org.apache.hadoop</groupId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>apache-log4j-extras</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-databind</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hive.hcatalog</groupId>
-            <artifactId>hive-webhcat-java-client</artifactId>
-            <version>${hive.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>jdk.tools</groupId>
-                    <artifactId>jdk.tools</artifactId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>jms</artifactId>
-                    <groupId>javax.jms</groupId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.pentaho</groupId>
-                    <artifactId>pentaho-aggdesigner-algorithm</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>${junit4.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.junit.vintage</groupId>
-            <artifactId>junit-vintage-engine</artifactId>
-            <version>${junit5.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>junit</groupId>
-                    <artifactId>junit</artifactId>
-                </exclusion>
-            </exclusions>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>shade-flink</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <artifactSet>
-                                <includes combine.children="append">
-                                    <include>org.apache.flink:flink-table-store-shade</include>
-                                </includes>
-                            </artifactSet>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index d2f88bc4..92d48714 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,6 @@ under the License.
         <flink.shaded.version>15.0</flink.shaded.version>
         <flink.shaded.jackson.version>2.12.4</flink.shaded.jackson.version>
         <hadoop.version>2.8.5</hadoop.version>
-        <hive.version>2.3.4</hive.version>
         <scala.version>2.12.7</scala.version>
         <scala.binary.version>2.12</scala.binary.version>
         <snappy.version>1.1.8.3</snappy.version>