You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/08/23 05:48:34 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3259] Initial implementation of the Hive Connector based on the Spark datasource V2

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d822b3eba [KYUUBI #3259] Initial implementation of the Hive Connector based on the Spark datasource V2
d822b3eba is described below

commit d822b3eba3050eb6a3908cf9c6c059ca29f82100
Author: yikf <yi...@gmail.com>
AuthorDate: Tue Aug 23 13:48:21 2022 +0800

    [KYUUBI #3259] Initial implementation of the Hive Connector based on the Spark datasource V2
    
    ### _Why are the changes needed?_
    
    In a modern database architecture, users may have a strong need for federated queries. Since there are a large number of Hive warehouse in the history database, we tried to implement the Hive V2 Datasource based on Spark Datasource V2 to meet this need. for the discussion, see :https://lists.apache.org/thread/fq8ywr58rzf9bycflj1q4fl1xyz2rq2w
    
    This PR is the first step in fixing https://github.com/apache/incubator-kyuubi/issues/3259, having
    - initialization implementation
    - support read code path
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3260 from yikf/hive-v2-connector.
    
    Closes #3259
    
    753aca30 [yikf] Initial implementation of the Hive Connector based on the Spark datasource V2
    
    Authored-by: yikf <yi...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../spark/kyuubi-spark-connector-hive/pom.xml      | 216 +++++++++++
 .../spark/connector/hive/HiveFileIndex.scala       | 256 +++++++++++++
 .../kyuubi/spark/connector/hive/HiveReader.scala   | 106 ++++++
 .../kyuubi/spark/connector/hive/HiveScan.scala     | 147 ++++++++
 .../spark/connector/hive/HiveScanBuilder.scala     |  42 +++
 .../kyuubi/spark/connector/hive/HiveTable.scala    |  74 ++++
 .../spark/connector/hive/HiveTableCatalog.scala    | 415 +++++++++++++++++++++
 .../hive/KyuubiHiveConnectorException.scala        |  22 ++
 .../connector/hive/read/FilePartitionReader.scala  | 112 ++++++
 .../hive/read/HivePartitionReaderFactory.scala     | 197 ++++++++++
 .../hive/read/HivePartitionedFileReader.scala      |  33 ++
 .../hive/read/HivePartitionedReader.scala          | 154 ++++++++
 .../kyuubi/connector/HiveConnectorHelper.scala     |  80 ++++
 .../spark/connector/hive/HiveCatalogSuite.scala    |  73 ++++
 .../spark/connector/hive/HiveConnectorSuite.scala  |  74 ++++
 pom.xml                                            |   1 +
 16 files changed, 2002 insertions(+)

diff --git a/extensions/spark/kyuubi-spark-connector-hive/pom.xml b/extensions/spark/kyuubi-spark-connector-hive/pom.xml
new file mode 100644
index 000000000..9f0912992
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/pom.xml
@@ -0,0 +1,216 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.kyuubi</groupId>
+        <artifactId>kyuubi-parent</artifactId>
+        <version>1.7.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kyuubi-spark-connector-hive</artifactId>
+    <name>Kyuubi Spark Hive Connector</name>
+    <description>A Kyuubi hive connector based on Spark V2 DataSource</description>
+    <packaging>jar</packaging>
+    <url>https://kyuubi.apache.org/</url>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-spark-connector-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-spark-connector-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scalatestplus</groupId>
+            <artifactId>scalacheck-1-15_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-runtime</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+
+        <!--
+          Spark requires `commons-collections` and `commons-io` but got them from transitive
+          dependencies of `hadoop-client`. As we are using Hadoop Shaded Client, we need add
+          them explicitly. See more details at SPARK-33212.
+          -->
+        <dependency>
+            <groupId>commons-collections</groupId>
+            <artifactId>commons-collections</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>jakarta.xml.bind</groupId>
+            <artifactId>jakarta.xml.bind-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+        <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <configuration>
+                    <shadedArtifactAttached>false</shadedArtifactAttached>
+                    <artifactSet>
+                        <includes>
+                            <include>com.google.guava:guava</include>
+                            <include>org.apache.kyuubi:kyuubi-spark-connector-common_${scala.binary.version}</include>
+                        </includes>
+                    </artifactSet>
+                    <relocations>
+                        <relocation>
+                            <pattern>com.google.common</pattern>
+                            <shadedPattern>${kyuubi.shade.packageName}.com.google.common</shadedPattern>
+                            <includes>
+                                <include>com.google.common.**</include>
+                            </includes>
+                        </relocation>
+                    </relocations>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>prepare-test-jar</id>
+                        <phase>test-compile</phase>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveFileIndex.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveFileIndex.scala
new file mode 100644
index 000000000..0a8ec26b4
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveFileIndex.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import java.net.URI
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, Predicate}
+import org.apache.spark.sql.connector.catalog.CatalogPlugin
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.hiveClientImpl
+import org.apache.spark.sql.types.StructType
+
+class HiveCatalogFileIndex(
+    sparkSession: SparkSession,
+    val catalogTable: CatalogTable,
+    hiveCatalog: HiveTableCatalog,
+    override val sizeInBytes: Long)
+  extends PartitioningAwareFileIndex(
+    sparkSession,
+    catalogTable.storage.properties,
+    Some(catalogTable.schema)) {
+
+  private val table = catalogTable
+
+  private val partPathToBindHivePart: mutable.Map[PartitionPath, HivePartition] = mutable.Map()
+
+  private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+
+  private lazy val hiveTable: Table = hiveClientImpl.toHiveTable(table)
+
+  private val baseLocation: Option[URI] = table.storage.locationUri
+
+  override def partitionSchema: StructType = table.partitionSchema
+
+  private[hive] def listHiveFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression])
+      : (Seq[PartitionDirectory], Map[PartitionDirectory, HivePartition]) = {
+    val fileIndex = filterPartitions(partitionFilters)
+    val partDirs = fileIndex.listFiles(partitionFilters, dataFilters)
+    val partDirToHivePart = fileIndex.partDirToBindHivePartMap()
+    (partDirs, partDirToHivePart)
+  }
+
+  def filterPartitions(filters: Seq[Expression]): HiveInMemoryFileIndex = {
+    if (table.partitionColumnNames.nonEmpty) {
+      val startTime = System.nanoTime()
+      val selectedPartitions: Seq[BindPartition] = {
+        val partitions =
+          ExternalCatalogUtils.listPartitionsByFilter(
+            sparkSession.sessionState.conf,
+            hiveCatalog.catalog,
+            table,
+            filters)
+        partitions.map(buildBindPartition)
+      }
+
+      val partitions = selectedPartitions.map {
+        case BindPartition(catalogTablePartition, hivePartition) =>
+          val path = new Path(catalogTablePartition.location)
+          val fs = path.getFileSystem(hadoopConf)
+          val partPath = PartitionPath(
+            catalogTablePartition.toRow(
+              partitionSchema,
+              sparkSession.sessionState.conf.sessionLocalTimeZone),
+            path.makeQualified(fs.getUri, fs.getWorkingDirectory))
+          partPathToBindHivePart += (partPath -> hivePartition)
+          partPath
+      }
+      val partitionSpec = PartitionSpec(partitionSchema, partitions)
+      val timeNs = System.nanoTime() - startTime
+      new HiveInMemoryFileIndex(
+        sparkSession = sparkSession,
+        rootPathsSpecified = partitionSpec.partitions.map(_.path),
+        parameters = table.properties,
+        partPathToBindHivePart = partPathToBindHivePart.toMap,
+        userSpecifiedSchema = Some(partitionSpec.partitionColumns),
+        fileStatusCache = fileStatusCache,
+        userSpecifiedPartitionSpec = Some(partitionSpec),
+        metadataOpsTimeNs = Some(timeNs))
+    } else {
+      new HiveInMemoryFileIndex(
+        sparkSession = sparkSession,
+        rootPathsSpecified = rootPaths,
+        parameters = table.properties,
+        userSpecifiedSchema = None,
+        fileStatusCache = fileStatusCache)
+    }
+  }
+
+  private def buildBindPartition(partition: CatalogTablePartition): BindPartition =
+    BindPartition(partition, hiveClientImpl.toHivePartition(partition, hiveTable))
+
+  override def partitionSpec(): PartitionSpec = {
+    throw notSupportOperator("partitionSpec")
+  }
+
+  override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
+    throw notSupportOperator("leafFiles")
+  }
+
+  override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
+    throw notSupportOperator("leafDirToChildrenFiles")
+  }
+
+  override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+  override def refresh(): Unit = fileStatusCache.invalidateAll()
+
+  def notSupportOperator(operator: String): UnsupportedOperationException = {
+    new UnsupportedOperationException(s"Not support $operator in Hive file index.")
+  }
+}
+
+class HiveInMemoryFileIndex(
+    sparkSession: SparkSession,
+    rootPathsSpecified: Seq[Path],
+    parameters: Map[String, String],
+    userSpecifiedSchema: Option[StructType],
+    partPathToBindHivePart: Map[PartitionPath, HivePartition] = Map.empty,
+    fileStatusCache: FileStatusCache = NoopCache,
+    userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
+    override val metadataOpsTimeNs: Option[Long] = None)
+  extends InMemoryFileIndex(
+    sparkSession,
+    rootPathsSpecified,
+    parameters,
+    userSpecifiedSchema,
+    fileStatusCache,
+    userSpecifiedPartitionSpec,
+    metadataOpsTimeNs) {
+
+  private val partDirToBindHivePart: mutable.Map[PartitionDirectory, HivePartition] = mutable.Map()
+
+  override def listFiles(
+      partitionFilters: Seq[Expression],
+      dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+    def isNonEmptyFile(f: FileStatus): Boolean = {
+      isDataPath(f.getPath) && f.getLen > 0
+    }
+    val selectedPartitions =
+      if (partitionSpec().partitionColumns.isEmpty) {
+        PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
+      } else {
+        if (recursiveFileLookup) {
+          throw new IllegalArgumentException(
+            "Datasource with partition do not allow recursive file loading.")
+        }
+        prunePartitions(partitionFilters, partitionSpec()).map {
+          case partPath @ PartitionPath(values, path) =>
+            val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
+              case Some(existingDir) =>
+                // Directory has children files in it, return them
+                existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f))
+
+              case None =>
+                // Directory does not exist, or has no children files
+                Nil
+            }
+            val partDir = PartitionDirectory(values, files)
+            // Update Partition Directory -> binding Hive part map
+            updatePartDirHivePartitionMapping(partDir, partPath)
+
+            partDir
+        }
+      }
+    logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
+    selectedPartitions
+  }
+
+  private def prunePartitions(
+      predicates: Seq[Expression],
+      partitionSpec: PartitionSpec): Seq[PartitionPath] = {
+    val PartitionSpec(partitionColumns, partitions) = partitionSpec
+    val partitionColumnNames = partitionColumns.map(_.name).toSet
+    val partitionPruningPredicates = predicates.filter {
+      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+    }
+
+    if (partitionPruningPredicates.nonEmpty) {
+      val predicate = partitionPruningPredicates.reduce(expressions.And)
+
+      val boundPredicate = Predicate.createInterpreted(predicate.transform {
+        case a: AttributeReference =>
+          val index = partitionColumns.indexWhere(a.name == _.name)
+          BoundReference(index, partitionColumns(index).dataType, nullable = true)
+      })
+
+      val selected = partitions.filter {
+        case PartitionPath(values, _) => boundPredicate.eval(values)
+      }
+      logInfo {
+        val total = partitions.length
+        val selectedSize = selected.length
+        val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
+        s"Selected $selectedSize partitions out of $total, " +
+          s"pruned ${if (total == 0) "0" else s"$percentPruned%"} partitions."
+      }
+
+      selected
+    } else {
+      partitions
+    }
+  }
+
+  private def isDataPath(path: Path): Boolean = {
+    val name = path.getName
+    !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
+  }
+
+  def partDirToBindHivePartMap(): Map[PartitionDirectory, HivePartition] = {
+    partDirToBindHivePart.toMap
+  }
+
+  def updatePartDirHivePartitionMapping(
+      partDir: PartitionDirectory,
+      partPath: PartitionPath): Unit = {
+    if (partPathToBindHivePart.contains(partPath)) {
+      partDirToBindHivePart += (partDir -> partPathToBindHivePart(partPath))
+    }
+  }
+}
+
+case class BindPartition(catalogTablePartition: CatalogTablePartition, hivePartition: HivePartition)
+
+object HiveTableCatalogFileIndex {
+  implicit class CatalogHelper(plugin: CatalogPlugin) {
+    def asHiveCatalog: HiveTableCatalog = plugin match {
+      case hiveTableCatalog: HiveTableCatalog =>
+        hiveTableCatalog
+      case _ =>
+        throw KyuubiHiveConnectorException(
+          s"Cannot use catalog ${plugin.name}: not a HiveTableCatalog")
+    }
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveReader.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveReader.scala
new file mode 100644
index 000000000..bd8dadbbe
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveReader.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.hive.ql.metadata.{Table => TableHive}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{InputFormat, JobConf}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.{hiveShim, hiveTableUtil}
+import org.apache.spark.sql.types.StructType
+
+object HiveReader {
+
+  def initializeHiveConf(
+      hiveTable: TableHive,
+      hiveConf: Configuration,
+      dataSchema: StructType,
+      readDataSchema: StructType): Unit = {
+    // Build tableDesc from hiveTable
+    val tableDesc = getTableDec(hiveTable)
+    // Set required columns and serde columns to hiveConf according to schema
+    addColumnMetadataToConf(tableDesc, hiveConf, dataSchema, readDataSchema)
+    // Copy hive table properties to hiveConf. For example,
+    // initial job conf to read files with specified format
+    hiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, hiveConf, false)
+  }
+
+  private def addColumnMetadataToConf(
+      tableDesc: TableDesc,
+      hiveConf: Configuration,
+      dataSchema: StructType,
+      readDataSchema: StructType): Unit = {
+    // Set required column id and name to hiveConf,
+    // to specified read columns without partition schema of Hive file reader
+    val neededColumnNames = readDataSchema.map(_.name)
+    val neededColumnIDs =
+      readDataSchema.map(field => Integer.valueOf(dataSchema.fields.indexOf(field)))
+
+    hiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+    val deserializer = tableDesc.getDeserializerClass.newInstance
+    deserializer.initialize(hiveConf, tableDesc.getProperties)
+
+    // Specifies types and object inspectors of columns to be scanned.
+    val structOI = ObjectInspectorUtils
+      .getStandardObjectInspector(
+        deserializer.getObjectInspector,
+        ObjectInspectorCopyOption.JAVA)
+      .asInstanceOf[StructObjectInspector]
+
+    val columnTypeNames = structOI
+      .getAllStructFieldRefs.asScala
+      .map(_.getFieldObjectInspector)
+      .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+      .mkString(",")
+
+    hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+    hiveConf.set(serdeConstants.LIST_COLUMNS, dataSchema.map(_.name).mkString(","))
+  }
+
+  def getTableDec(hiveTable: TableHive): TableDesc = {
+    new TableDesc(
+      hiveTable.getInputFormatClass,
+      hiveTable.getOutputFormatClass,
+      hiveTable.getMetadata)
+  }
+
+  def toAttributes(structType: StructType): Seq[AttributeReference] =
+    structType.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+
+  def getInputFormat(
+      ifc: Class[_ <: InputFormat[Writable, Writable]],
+      conf: JobConf): InputFormat[Writable, Writable] = {
+    val newInputFormat = ReflectionUtils.newInstance(ifc.asInstanceOf[Class[_]], conf)
+      .asInstanceOf[InputFormat[Writable, Writable]]
+    newInputFormat match {
+      case c: Configurable => c.setConf(conf)
+      case _ =>
+    }
+    newInputFormat
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScan.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScan.scala
new file mode 100644
index 000000000..cb0ba0513
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScan.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import java.util.Locale
+
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.connector.read.PartitionReaderFactory
+import org.apache.spark.sql.execution.PartitionedFileUtil
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.v2.FileScan
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.hiveClientImpl
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory
+
+case class HiveScan(
+    sparkSession: SparkSession,
+    fileIndex: HiveCatalogFileIndex,
+    catalogTable: CatalogTable,
+    dataSchema: StructType,
+    readDataSchema: StructType,
+    readPartitionSchema: StructType,
+    pushedFilters: Array[Filter] = Array.empty,
+    partitionFilters: Seq[Expression] = Seq.empty,
+    dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
+
+  private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+
+  private val partFileToHivePartMap: mutable.Map[PartitionedFile, HivePartition] = mutable.Map()
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    val hiveConf = sparkSession.sessionState.newHadoopConf()
+    addCatalogTableConfToConf(hiveConf, catalogTable)
+
+    val table = hiveClientImpl.toHiveTable(catalogTable)
+    HiveReader.initializeHiveConf(table, hiveConf, dataSchema, readDataSchema)
+    val broadcastHiveConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveConf))
+
+    HivePartitionReaderFactory(
+      sparkSession.sessionState.conf.clone(),
+      broadcastHiveConf,
+      table,
+      dataSchema,
+      readDataSchema,
+      readPartitionSchema,
+      partFileToHivePartMap.toMap,
+      pushedFilters = pushedFilters)
+  }
+
+  override protected def partitions: Seq[FilePartition] = {
+    val (selectedPartitions, partDirToHivePartMap) =
+      fileIndex.listHiveFiles(partitionFilters, dataFilters)
+    val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions)
+    val partitionAttributes = toAttributes(fileIndex.partitionSchema)
+    val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap
+    val readPartitionAttributes = readPartitionSchema.map { readField =>
+      attributeMap.getOrElse(
+        normalizeName(readField.name),
+        throw KyuubiHiveConnectorException(s"Can't find required partition " +
+          s"column ${readField.name} in partition schema ${fileIndex.partitionSchema}"))
+    }
+    lazy val partitionValueProject =
+      GenerateUnsafeProjection.generate(readPartitionAttributes, partitionAttributes)
+    val splitFiles = selectedPartitions.flatMap { partition =>
+      val partitionValues =
+        if (readPartitionAttributes != partitionAttributes) {
+          partitionValueProject(partition.values).copy()
+        } else {
+          partition.values
+        }
+      partition.files.flatMap { file =>
+        val filePath = file.getPath
+        val partFiles = PartitionedFileUtil.splitFiles(
+          sparkSession = sparkSession,
+          file = file,
+          filePath = filePath,
+          isSplitable = isSplitable(filePath),
+          maxSplitBytes = maxSplitBytes,
+          partitionValues = partitionValues)
+
+        if (partDirToHivePartMap.contains(partition)) {
+          partFiles.foreach { partFile =>
+            partFileToHivePartMap += (partFile -> partDirToHivePartMap(partition))
+          }
+        }
+        partFiles
+      }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+    }
+
+    if (splitFiles.length == 1) {
+      val path = new Path(splitFiles(0).filePath)
+      if (!isSplitable(path) && splitFiles(0).length >
+          sparkSession.sparkContext.getConf.getOption("spark.io.warning.largeFileThreshold")
+            .getOrElse("1024000000").toLong) {
+        logWarning(s"Loading one large unsplittable file ${path.toString} with only one " +
+          s"partition, the reason is: ${getFileUnSplittableReason(path)}")
+      }
+    }
+
+    FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes)
+  }
+
+  private def addCatalogTableConfToConf(hiveConf: Configuration, table: CatalogTable): Unit = {
+    table.properties.foreach {
+      case (key, value) =>
+        hiveConf.set(key, value)
+    }
+  }
+
+  private def normalizeName(name: String): String = {
+    if (isCaseSensitive) {
+      name
+    } else {
+      name.toLowerCase(Locale.ROOT)
+    }
+  }
+
+  def toAttributes(structType: StructType): Seq[AttributeReference] =
+    structType.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScanBuilder.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScanBuilder.scala
new file mode 100644
index 000000000..7297af238
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScanBuilder.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
+import org.apache.spark.sql.types.StructType
+
+case class HiveScanBuilder(
+    sparkSession: SparkSession,
+    fileIndex: HiveCatalogFileIndex,
+    dataSchema: StructType,
+    table: CatalogTable)
+  extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+
+  override def build(): Scan = {
+    HiveScan(
+      sparkSession = sparkSession,
+      fileIndex = fileIndex,
+      catalogTable = table,
+      dataSchema = dataSchema,
+      readDataSchema = readDataSchema(),
+      readPartitionSchema = readPartitionSchema())
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
new file mode 100644
index 000000000..3305a830e
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+case class HiveTable(
+    sparkSession: SparkSession,
+    catalogTable: CatalogTable,
+    hiveTableCatalog: HiveTableCatalog)
+  extends Table with SupportsRead with SupportsWrite with Logging {
+
+  lazy val dataSchema: StructType = catalogTable.dataSchema
+
+  lazy val partitionSchema: StructType = catalogTable.partitionSchema
+
+  def rootPaths: Seq[Path] = catalogTable.storage.locationUri.map(new Path(_)).toSeq
+
+  lazy val fileIndex: HiveCatalogFileIndex = {
+    val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
+    new HiveCatalogFileIndex(
+      sparkSession,
+      catalogTable,
+      hiveTableCatalog,
+      catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
+  }
+
+  override def name(): String = catalogTable.identifier.table
+
+  override def schema(): StructType = catalogTable.schema
+
+  override def properties(): util.Map[String, String] = catalogTable.properties.asJava
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+    HiveScanBuilder(sparkSession, fileIndex, dataSchema, catalogTable)
+  }
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    // TODO: Support write code path fo hive v2.
+    throw new UnsupportedOperationException("Not support write for v2 hive.")
+  }
+
+  override def capabilities(): util.Set[TableCapability] = {
+    util.EnumSet.of(BATCH_READ)
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
new file mode 100644
index 000000000..848d5be4b
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import java.net.URI
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.util.quoteIfNeeded
+import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.{catalogV2Util, postExternalCatalogEvent, HiveExternalCatalog, HiveMetastoreCatalog, HiveSessionCatalog}
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
+
+/**
+ * A [[TableCatalog]] that wrap HiveExternalCatalog to as V2 CatalogPlugin instance to access Hive.
+ */
+class HiveTableCatalog(sparkSession: SparkSession)
+  extends TableCatalog with SQLConfHelper with SupportsNamespaces with Logging {
+
+  def this() = this(SparkSession.active)
+
+  private val sc = sparkSession.sparkContext
+
+  private val sessionState = sparkSession.sessionState
+
+  private var catalogName: String = _
+
+  private var catalogOptions: CaseInsensitiveStringMap = _
+
+  var catalog: HiveSessionCatalog = _
+
+  val NAMESPACE_RESERVED_PROPERTIES =
+    Seq(
+      SupportsNamespaces.PROP_COMMENT,
+      SupportsNamespaces.PROP_LOCATION,
+      SupportsNamespaces.PROP_OWNER)
+
+  private lazy val hadoopConf: Configuration = {
+    val conf = sparkSession.sessionState.newHadoopConf()
+    catalogOptions.asScala.foreach {
+      case (k, v) => conf.set(k, v)
+      case _ =>
+    }
+    conf
+  }
+
+  private lazy val sparkConf: SparkConf = {
+    val conf = sparkSession.sparkContext.getConf
+    catalogOptions.asScala.foreach {
+      case (k, v) => conf.set(k, v)
+    }
+    conf
+  }
+
+  def hadoopConfiguration(): Configuration = hadoopConf
+
+  override def name(): String = {
+    require(catalogName != null, "The Hive table catalog is not initialed")
+    catalogName
+  }
+
+  override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
+    assert(catalogName == null, "The Hive table catalog is already initialed.")
+    assert(
+      conf.getConf(CATALOG_IMPLEMENTATION) == "hive",
+      s"Require setting ${CATALOG_IMPLEMENTATION.key} to `hive` to enable hive support.")
+    catalogName = name
+    catalogOptions = options
+    catalog = new HiveSessionCatalog(
+      externalCatalogBuilder = () => externalCatalog,
+      globalTempViewManagerBuilder = () => sparkSession.sharedState.globalTempViewManager,
+      metastoreCatalog = new HiveMetastoreCatalog(sparkSession),
+      functionRegistry = sessionState.functionRegistry,
+      tableFunctionRegistry = sessionState.tableFunctionRegistry,
+      hadoopConf = hadoopConf,
+      parser = sessionState.sqlParser,
+      functionResourceLoader = sessionState.resourceLoader,
+      HiveUDFExpressionBuilder)
+  }
+
+  /**
+   * A catalog that interacts with external systems.
+   */
+  lazy val externalCatalog: ExternalCatalogWithListener = {
+    val externalCatalog = new HiveExternalCatalog(sparkConf, hadoopConf)
+
+    // Wrap to provide catalog events
+    val wrapped = new ExternalCatalogWithListener(externalCatalog)
+
+    // Make sure we propagate external catalog events to the spark listener bus
+    wrapped.addListener((event: ExternalCatalogEvent) => postExternalCatalogEvent(sc, event))
+
+    wrapped
+  }
+
+  override val defaultNamespace: Array[String] = Array("default")
+
+  override def listTables(namespace: Array[String]): Array[Identifier] = {
+    namespace match {
+      case Array(db) =>
+        catalog
+          .listTables(db)
+          .map(ident => Identifier.of(ident.database.map(Array(_)).getOrElse(Array()), ident.table))
+          .toArray
+      case _ =>
+        throw new NoSuchNamespaceException(namespace)
+    }
+  }
+
+  override def loadTable(ident: Identifier): Table = {
+    HiveTable(sparkSession, catalog.getTableMetadata(ident.asTableIdentifier), this)
+  }
+
+  override def createTable(
+      ident: Identifier,
+      schema: StructType,
+      partitions: Array[Transform],
+      properties: util.Map[String, String]): Table = {
+    import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.TransformHelper
+    val (partitionColumns, maybeBucketSpec) = partitions.toSeq.convertTransforms
+    val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
+    val tableProperties = properties.asScala
+    val location = Option(properties.get(TableCatalog.PROP_LOCATION))
+    val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
+      .copy(locationUri = location.map(CatalogUtils.stringToURI))
+    val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
+    val tableType =
+      if (isExternal || location.isDefined) {
+        CatalogTableType.EXTERNAL
+      } else {
+        CatalogTableType.MANAGED
+      }
+
+    val tableDesc = CatalogTable(
+      identifier = ident.asTableIdentifier,
+      tableType = tableType,
+      storage = storage,
+      schema = schema,
+      provider = Some(provider),
+      partitionColumnNames = partitionColumns,
+      bucketSpec = maybeBucketSpec,
+      properties = tableProperties.toMap,
+      tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
+      comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
+
+    try {
+      catalog.createTable(tableDesc, ignoreIfExists = false)
+    } catch {
+      case _: TableAlreadyExistsException =>
+        throw new TableAlreadyExistsException(ident)
+    }
+
+    loadTable(ident)
+  }
+
+  override def alterTable(ident: Identifier, changes: TableChange*): Table = {
+    val catalogTable =
+      try {
+        catalog.getTableMetadata(ident.asTableIdentifier)
+      } catch {
+        case _: NoSuchTableException =>
+          throw new NoSuchTableException(ident)
+      }
+
+    val properties = catalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
+    val schema = catalogV2Util.applySchemaChanges(
+      catalogTable.schema,
+      changes)
+    val comment = properties.get(TableCatalog.PROP_COMMENT)
+    val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
+    val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
+    val storage =
+      if (location.isDefined) {
+        catalogTable.storage.copy(locationUri = location)
+      } else {
+        catalogTable.storage
+      }
+
+    try {
+      catalog.alterTable(
+        catalogTable.copy(
+          properties = properties,
+          schema = schema,
+          owner = owner,
+          comment = comment,
+          storage = storage))
+    } catch {
+      case _: NoSuchTableException =>
+        throw new NoSuchTableException(ident)
+    }
+
+    loadTable(ident)
+  }
+
+  override def dropTable(ident: Identifier): Boolean = {
+    try {
+      if (loadTable(ident) != null) {
+        catalog.dropTable(
+          ident.asTableIdentifier,
+          ignoreIfNotExists = true,
+          purge = true /* skip HDFS trash */ )
+        true
+      } else {
+        false
+      }
+    } catch {
+      case _: NoSuchTableException =>
+        false
+    }
+  }
+
+  override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
+    if (tableExists(newIdent)) {
+      throw new TableAlreadyExistsException(newIdent)
+    }
+
+    // Load table to make sure the table exists
+    loadTable(oldIdent)
+    catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
+  }
+
+  private def toOptions(properties: Map[String, String]): Map[String, String] = {
+    properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
+      case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value
+    }.toMap
+  }
+
+  override def listNamespaces(): Array[Array[String]] = {
+    catalog.listDatabases().map(Array(_)).toArray
+  }
+
+  override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
+    namespace match {
+      case Array() =>
+        listNamespaces()
+      case Array(db) if catalog.databaseExists(db) =>
+        Array()
+      case _ =>
+        throw new NoSuchNamespaceException(namespace)
+    }
+  }
+
+  override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = {
+    namespace match {
+      case Array(db) =>
+        try {
+          catalog.getDatabaseMetadata(db).toMetadata
+        } catch {
+          case _: NoSuchDatabaseException =>
+            throw new NoSuchNamespaceException(namespace)
+        }
+
+      case _ =>
+        throw new NoSuchNamespaceException(namespace)
+    }
+  }
+
+  override def createNamespace(
+      namespace: Array[String],
+      metadata: util.Map[String, String]): Unit = namespace match {
+    case Array(db) if !catalog.databaseExists(db) =>
+      catalog.createDatabase(
+        toCatalogDatabase(db, metadata, defaultLocation = Some(catalog.getDefaultDBPath(db))),
+        ignoreIfExists = false)
+
+    case Array(_) =>
+      throw new NamespaceAlreadyExistsException(namespace)
+
+    case _ =>
+      throw new IllegalArgumentException(s"Invalid namespace name: ${namespace.quoted}")
+  }
+
+  override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
+    namespace match {
+      case Array(db) =>
+        // validate that this catalog's reserved properties are not removed
+        changes.foreach {
+          case remove: RemoveProperty if NAMESPACE_RESERVED_PROPERTIES.contains(remove.property) =>
+            throw new UnsupportedOperationException(
+              s"Cannot remove reserved property: ${remove.property}")
+          case _ =>
+        }
+
+        val metadata = catalog.getDatabaseMetadata(db).toMetadata
+        catalog.alterDatabase(
+          toCatalogDatabase(db, catalogV2Util.applyNamespaceChanges(metadata, changes)))
+
+      case _ =>
+        throw new NoSuchNamespaceException(namespace)
+    }
+  }
+
+  /**
+   * List the metadata of partitions that belong to the specified table, assuming it exists, that
+   * satisfy the given partition-pruning predicate expressions.
+   */
+  def listPartitionsByFilter(
+      tableName: TableIdentifier,
+      predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+    catalog.listPartitionsByFilter(tableName, predicates)
+  }
+
+  def listPartitions(
+      tableName: TableIdentifier,
+      partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = {
+    catalog.listPartitions(tableName, partialSpec)
+  }
+
+  override def dropNamespace(
+      namespace: Array[String],
+      cascade: Boolean): Boolean = namespace match {
+    case Array(db) if catalog.databaseExists(db) =>
+      if (catalog.listTables(db).nonEmpty) {
+        throw new IllegalStateException(s"Namespace ${namespace.quoted} is not empty")
+      }
+      catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
+      true
+
+    case Array(_) =>
+      // exists returned false
+      false
+
+    case _ =>
+      throw new NoSuchNamespaceException(namespace)
+  }
+}
+
+private object HiveTableCatalog {
+  private def toCatalogDatabase(
+      db: String,
+      metadata: util.Map[String, String],
+      defaultLocation: Option[URI] = None): CatalogDatabase = {
+    CatalogDatabase(
+      name = db,
+      description = metadata.getOrDefault(SupportsNamespaces.PROP_COMMENT, ""),
+      locationUri = Option(metadata.get(SupportsNamespaces.PROP_LOCATION))
+        .map(CatalogUtils.stringToURI)
+        .orElse(defaultLocation)
+        .getOrElse(throw new IllegalArgumentException("Missing database location")),
+      properties = metadata.asScala.toMap --
+        Seq(SupportsNamespaces.PROP_COMMENT, SupportsNamespaces.PROP_LOCATION))
+  }
+
+  implicit class NamespaceHelper(namespace: Array[String]) {
+    def quoted: String = namespace.map(quoteIfNeeded).mkString(".")
+  }
+
+  implicit class IdentifierHelper(ident: Identifier) {
+    def quoted: String = {
+      if (ident.namespace.nonEmpty) {
+        ident.namespace.map(quoteIfNeeded).mkString(".") + "." + quoteIfNeeded(ident.name)
+      } else {
+        quoteIfNeeded(ident.name)
+      }
+    }
+
+    def asMultipartIdentifier: Seq[String] = ident.namespace :+ ident.name
+
+    def asTableIdentifier: TableIdentifier = ident.namespace match {
+      case ns if ns.isEmpty => TableIdentifier(ident.name)
+      case Array(dbName) => TableIdentifier(ident.name, Some(dbName))
+      case _ =>
+        throw KyuubiHiveConnectorException(
+          s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.")
+    }
+  }
+
+  implicit class CatalogDatabaseHelper(catalogDatabase: CatalogDatabase) {
+    def toMetadata: util.Map[String, String] = {
+      val metadata = mutable.HashMap[String, String]()
+
+      catalogDatabase.properties.foreach {
+        case (key, value) => metadata.put(key, value)
+      }
+      metadata.put(SupportsNamespaces.PROP_LOCATION, catalogDatabase.locationUri.toString)
+      metadata.put(SupportsNamespaces.PROP_COMMENT, catalogDatabase.description)
+
+      metadata.asJava
+    }
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorException.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorException.scala
new file mode 100644
index 000000000..7cdcc211f
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorException.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+case class KyuubiHiveConnectorException(
+    message: String,
+    cause: Option[Throwable] = None) extends Exception(message, cause.orNull)
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
new file mode 100644
index 000000000..fe8564ca8
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.read
+
+import java.io.{FileNotFoundException, IOException}
+
+import org.apache.parquet.io.ParquetDecodingException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.PartitionReader
+import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.inputFileBlockHolder
+import org.apache.spark.sql.internal.SQLConf
+
+// scalastyle:off line.size.limit
+// copy from https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+// scalastyle:on line.size.limit
+class FilePartitionReader[T](readers: Iterator[HivePartitionedFileReader[T]])
+  extends PartitionReader[T] with Logging {
+  private var currentReader: HivePartitionedFileReader[T] = null
+
+  private val sqlConf = SQLConf.get
+  private def ignoreMissingFiles = sqlConf.ignoreMissingFiles
+  private def ignoreCorruptFiles = sqlConf.ignoreCorruptFiles
+
+  override def next(): Boolean = {
+    if (currentReader == null) {
+      if (readers.hasNext) {
+        try {
+          currentReader = getNextReader()
+        } catch {
+          case e: FileNotFoundException if ignoreMissingFiles =>
+            logWarning(s"Skipped missing file.", e)
+            currentReader = null
+          // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+          case e: FileNotFoundException if !ignoreMissingFiles =>
+            throw fileNotFoundError(e)
+          case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
+            logWarning(
+              s"Skipped the rest of the content in the corrupted file.",
+              e)
+            currentReader = null
+        }
+      } else {
+        return false
+      }
+    }
+
+    // In PartitionReader.next(), the current reader proceeds to next record.
+    // It might throw RuntimeException/IOException and Spark should handle these exceptions.
+    val hasNext =
+      try {
+        currentReader != null && currentReader.next()
+      } catch {
+        case e @ (_: SchemaColumnConvertNotSupportedException
+            | _: ParquetDecodingException) =>
+          throw e
+        case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
+          logWarning(
+            s"Skipped the rest of the content in the corrupted file: $currentReader",
+            e)
+          false
+      }
+    if (hasNext) {
+      true
+    } else {
+      close()
+      currentReader = null
+      next()
+    }
+  }
+
+  override def get(): T = currentReader.get()
+
+  override def close(): Unit = {
+    if (currentReader != null) {
+      currentReader.close()
+    }
+    inputFileBlockHolder.unset()
+  }
+
+  private def getNextReader(): HivePartitionedFileReader[T] = {
+    val reader = readers.next()
+    logInfo(s"Reading file $reader")
+    // Sets InputFileBlockHolder for the file block's information
+    val file = reader.file
+    inputFileBlockHolder.set(file.filePath, file.start, file.length)
+    reader
+  }
+
+  def fileNotFoundError(e: FileNotFoundException): Throwable = {
+    new FileNotFoundException(
+      e.getMessage + "\n" +
+        "It is possible the underlying files have been updated. " +
+        "You can explicitly invalidate the cache in Spark by " +
+        "recreating the Dataset/DataFrame involved.")
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
new file mode 100644
index 000000000..416fdb2eb
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.read
+
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.io._
+import org.apache.hadoop.mapred._
+import org.apache.spark.TaskContext
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.NextIterator
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.kyuubi.spark.connector.hive.HiveReader
+
+case class HivePartitionReaderFactory(
+    sqlConf: SQLConf,
+    broadcastHiveConf: Broadcast[SerializableConfiguration],
+    hiveTable: HiveTable,
+    dataSchema: StructType,
+    readDataSchema: StructType,
+    partitionSchema: StructType,
+    partFileToHivePart: Map[PartitionedFile, HivePartition],
+    pushedFilters: Array[Filter] = Array.empty)
+  extends FilePartitionReaderFactory with Logging {
+
+  private val charset: String =
+    sqlConf.getConfString("hive.exec.default.charset", "utf-8")
+
+  val tableDesc = HiveReader.getTableDec(hiveTable)
+  val nonPartitionReadDataKeys = HiveReader.toAttributes(readDataSchema)
+
+  override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
+    throw new UnsupportedOperationException("Cannot use buildReader directly.")
+  }
+
+  override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
+    assert(partition.isInstanceOf[FilePartition])
+    val filePartition = partition.asInstanceOf[FilePartition]
+    val iter: Iterator[HivePartitionedFileReader[InternalRow]] =
+      filePartition.files.toIterator.map { file =>
+        val bindHivePart = partFileToHivePart.getOrElse(file, null)
+        HivePartitionedFileReader(
+          file,
+          new PartitionReaderWithPartitionValues(
+            HivePartitionedReader(
+              file,
+              buildReaderInternal(file, bindHivePart),
+              tableDesc,
+              broadcastHiveConf,
+              nonPartitionReadDataKeys,
+              bindHivePart,
+              charset),
+            readDataSchema,
+            partitionSchema,
+            file.partitionValues))
+      }
+    new FilePartitionReader[InternalRow](iter)
+  }
+
+  def buildReaderInternal(
+      file: PartitionedFile,
+      bindPartition: HivePartition): PartitionReader[Writable] = {
+    val reader = createPartitionWritableReader(file, bindPartition)
+    val fileReader = new PartitionReader[Writable] {
+      override def next(): Boolean = reader.hasNext
+      override def get(): Writable = reader.next()
+      override def close(): Unit = {}
+    }
+    fileReader
+  }
+
+  private def createPartitionWritableReader[T](
+      file: PartitionedFile,
+      bindPartition: HivePartition): Iterator[Writable] = {
+    // Obtain binding HivePartition from input partitioned file
+    val partDesc =
+      if (bindPartition != null) {
+        Utilities.getPartitionDesc(bindPartition)
+      } else null
+
+    val ifc =
+      if (partDesc == null) {
+        hiveTable.getInputFormatClass
+          .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+      } else {
+        partDesc.getInputFileFormatClass
+          .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+      }
+
+    val jobConf = new JobConf(broadcastHiveConf.value.value)
+
+    val filePath = new Path(new URI(file.filePath))
+
+    if (tableDesc != null) {
+      configureJobPropertiesForStorageHandler(tableDesc, jobConf, true)
+      Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
+    }
+
+    val eventIter: NextIterator[Writable] = new NextIterator[Writable] {
+      // Initial 'FileSplit' and 'InputFormat' instance for Record reader construction
+      private val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty[String])
+      private val inputFormat: InputFormat[Writable, Writable] =
+        HiveReader.getInputFormat(ifc, jobConf)
+      logDebug(s"Final input format $inputFormat")
+      var reader: RecordReader[Writable, Writable] =
+        try {
+          inputFormat.getRecordReader(fileSplit.asInstanceOf[InputSplit], jobConf, Reporter.NULL)
+        } catch {
+          case e: Exception =>
+            logError("Exception raised when creating iterator reader", e)
+            throw e
+        }
+      val key: Writable =
+        if (reader == null) null.asInstanceOf[Writable] else reader.createKey()
+      val value: Writable =
+        if (reader == null) null.asInstanceOf[Writable] else reader.createValue()
+
+      override def getNext(): Writable = {
+        try {
+          finished = !reader.next(key, value)
+        } catch {
+          case e: Exception =>
+            logError(s"Exception raised when reading corrupt file: $fileSplit", e)
+            throw e
+        }
+        value
+      }
+
+      override def close(): Unit = {
+        if (reader != null) {
+          try {
+            reader.close()
+          } catch {
+            case e @ (_: Exception | _: Throwable) =>
+              logError("Exception in RecordReader.close()", e)
+          } finally {
+            reader = null
+          }
+        }
+      }
+    }
+
+    Option(TaskContext.get())
+      .foreach(_.addTaskCompletionListener[Unit](_ => eventIter.closeIfNeeded()))
+    eventIter
+  }
+
+  def configureJobPropertiesForStorageHandler(
+      tableDesc: TableDesc,
+      conf: Configuration,
+      input: Boolean): Unit = {
+    val property = tableDesc.getProperties.getProperty(META_TABLE_STORAGE)
+    val storageHandler =
+      org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(conf, property)
+    if (storageHandler != null) {
+      val jobProperties = new java.util.LinkedHashMap[String, String]
+      if (input) {
+        storageHandler.configureInputJobProperties(tableDesc, jobProperties)
+      } else {
+        storageHandler.configureOutputJobProperties(tableDesc, jobProperties)
+      }
+      if (!jobProperties.isEmpty) {
+        tableDesc.setJobProperties(jobProperties)
+      }
+    }
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedFileReader.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedFileReader.scala
new file mode 100644
index 000000000..fa0adf96b
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedFileReader.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.read
+
+import org.apache.spark.sql.connector.read.PartitionReader
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+
+case class HivePartitionedFileReader[T](
+    file: PartitionedFile,
+    reader: PartitionReader[T]) extends PartitionReader[T] {
+  override def next(): Boolean = reader.next()
+
+  override def get(): T = reader.get()
+
+  override def close(): Unit = reader.close()
+
+  override def toString: String = file.toString
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
new file mode 100644
index 000000000..108df65d4
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.read
+
+import java.util.Properties
+
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde2.Deserializer
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.primitive._
+import org.apache.hadoop.io.Writable
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.connector.read.PartitionReader
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.{hadoopTableReader, hiveShim}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.SerializableConfiguration
+
+case class HivePartitionedReader(
+    file: PartitionedFile,
+    reader: PartitionReader[Writable],
+    tableDesc: TableDesc,
+    broadcastHiveConf: Broadcast[SerializableConfiguration],
+    nonPartitionReadDataKeys: Seq[Attribute],
+    bindPartition: HivePartition,
+    charset: String = "utf-8") extends PartitionReader[InternalRow] with Logging {
+
+  private val partDesc =
+    if (bindPartition != null) {
+      Utilities.getPartitionDesc(bindPartition)
+    } else null
+  private val hiveConf = broadcastHiveConf.value.value
+
+  private val tableDeser = tableDesc.getDeserializerClass.newInstance()
+  tableDeser.initialize(hiveConf, tableDesc.getProperties)
+
+  private val localDeser: Deserializer =
+    if (bindPartition != null &&
+      bindPartition.getDeserializer != null) {
+      val tableProperties = tableDesc.getProperties
+      val props = new Properties(tableProperties)
+      val deserializer =
+        bindPartition.getDeserializer.getClass.asInstanceOf[Class[Deserializer]].newInstance()
+      deserializer.initialize(hiveConf, props)
+      deserializer
+    } else {
+      tableDeser
+    }
+
+  private val internalRow = new SpecificInternalRow(nonPartitionReadDataKeys.map(_.dataType))
+
+  private val soi: StructObjectInspector =
+    if (localDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
+      localDeser.getObjectInspector.asInstanceOf[StructObjectInspector]
+    } else {
+      ObjectInspectorConverters.getConvertedOI(
+        localDeser.getObjectInspector,
+        tableDeser.getObjectInspector).asInstanceOf[StructObjectInspector]
+    }
+
+  private val (fieldRefs, fieldOrdinals) = nonPartitionReadDataKeys.zipWithIndex
+    .map { case (attr, ordinal) =>
+      soi.getStructFieldRef(attr.name) -> ordinal
+    }.toArray.unzip
+
+  private val unwrappers: Seq[(Any, InternalRow, Int) => Unit] = fieldRefs.map {
+    _.getFieldObjectInspector match {
+      case oi: BooleanObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
+      case oi: ByteObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
+      case oi: ShortObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
+      case oi: IntObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
+      case oi: LongObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
+      case oi: FloatObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
+      case oi: DoubleObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
+      case oi: HiveVarcharObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) =>
+          row.update(ordinal, UTF8String.fromString(oi.getPrimitiveJavaObject(value).getValue))
+      case oi: HiveCharObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) =>
+          row.update(ordinal, UTF8String.fromString(oi.getPrimitiveJavaObject(value).getValue))
+      case oi: HiveDecimalObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) =>
+          row.update(ordinal, hiveShim.toCatalystDecimal(oi, value))
+      case oi: TimestampObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) =>
+          row.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value)))
+      case oi: DateObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) =>
+          row.setInt(ordinal, DateTimeUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
+      case oi: BinaryObjectInspector =>
+        (value: Any, row: InternalRow, ordinal: Int) =>
+          row.update(ordinal, oi.getPrimitiveJavaObject(value))
+      case oi =>
+        logDebug("HiveInspector class: " + oi.getClass.getName + ", charset: " + charset)
+        val unwrapper = hadoopTableReader.unwrapperFor(oi)
+        (value: Any, row: InternalRow, ordinal: Int) => row(ordinal) = unwrapper(value)
+    }
+  }
+
+  private val converter =
+    ObjectInspectorConverters.getConverter(localDeser.getObjectInspector, soi)
+
+  private def fillObject(value: Writable, mutableRow: InternalRow): InternalRow = {
+    val raw = converter.convert(localDeser.deserialize(value))
+    var i = 0
+    val length = fieldRefs.length
+    while (i < length) {
+      val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
+      if (fieldValue == null) {
+        mutableRow.setNullAt(fieldOrdinals(i))
+      } else {
+        unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
+      }
+      i += 1
+    }
+    mutableRow
+  }
+
+  override def next(): Boolean = reader.next()
+
+  override def get(): InternalRow = fillObject(reader.get(), internalRow)
+
+  override def close(): Unit = reader.close()
+
+  override def toString: String = file.toString
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveConnectorHelper.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveConnectorHelper.scala
new file mode 100644
index 000000000..19257f0f6
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveConnectorHelper.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.kyuubi.connector
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.InputFileBlockHolder
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogEvent}
+import org.apache.spark.sql.connector.catalog.CatalogV2Util
+import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform}
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim, HiveTableUtil}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+
+object HiveConnectorHelper {
+
+  type HiveSessionCatalog = org.apache.spark.sql.hive.HiveSessionCatalog
+  type HiveMetastoreCatalog = org.apache.spark.sql.hive.HiveMetastoreCatalog
+  type HiveExternalCatalog = org.apache.spark.sql.hive.HiveExternalCatalog
+  type BucketSpecHelper = org.apache.spark.sql.connector.catalog.CatalogV2Implicits.BucketSpecHelper
+  type NextIterator[U] = org.apache.spark.util.NextIterator[U]
+  val logicalExpressions: LogicalExpressions.type = LogicalExpressions
+  val hiveClientImpl: HiveClientImpl.type = HiveClientImpl
+  val catalogV2Util: CatalogV2Util.type = CatalogV2Util
+  val hiveTableUtil: HiveTableUtil.type = HiveTableUtil
+  val hiveShim: HiveShim.type = HiveShim
+  val inputFileBlockHolder: InputFileBlockHolder.type = InputFileBlockHolder
+  val hadoopTableReader: HadoopTableReader.type = HadoopTableReader
+
+  def postExternalCatalogEvent(sc: SparkContext, event: ExternalCatalogEvent): Unit = {
+    sc.listenerBus.post(event)
+  }
+
+  implicit class TransformHelper(transforms: Seq[Transform]) {
+    def convertTransforms: (Seq[String], Option[BucketSpec]) = {
+      val identityCols = new mutable.ArrayBuffer[String]
+      var bucketSpec = Option.empty[BucketSpec]
+
+      transforms.map {
+        case IdentityTransform(FieldReference(Seq(col))) =>
+          identityCols += col
+
+        case BucketTransform(numBuckets, col, sortCol) =>
+          if (bucketSpec.nonEmpty) {
+            throw new UnsupportedOperationException("Multiple bucket transforms are not supported.")
+          }
+          if (sortCol.isEmpty) {
+            bucketSpec = Some(BucketSpec(numBuckets, col.map(_.fieldNames.mkString(".")), Nil))
+          } else {
+            bucketSpec = Some(BucketSpec(
+              numBuckets,
+              col.map(_.fieldNames.mkString(".")),
+              sortCol.map(_.fieldNames.mkString("."))))
+          }
+
+        case transform =>
+          throw new UnsupportedOperationException(
+            s"Unsupported partition transform: $transform")
+      }
+
+      (identityCols.toSeq, bucketSpec)
+    }
+  }
+
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
new file mode 100644
index 000000000..f56b7577d
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
+
+class HiveCatalogSuite extends KyuubiFunSuite {
+
+  test("get catalog name") {
+    val sparkConf = new SparkConf()
+      .setMaster("local[*]")
+      .set("spark.ui.enabled", "false")
+      .set("spark.sql.catalogImplementation", "hive")
+      .set("spark.sql.catalog.v2hive", classOf[HiveTableCatalog].getName)
+    withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
+      val catalog = new HiveTableCatalog
+      val catalogName = "v2hive"
+      catalog.initialize(catalogName, CaseInsensitiveStringMap.empty())
+      assert(catalog.name() == catalogName)
+    }
+  }
+
+  test("supports namespaces") {
+    val sparkConf = new SparkConf()
+      .setMaster("local[*]")
+      .set("spark.ui.enabled", "false")
+      .set("spark.sql.catalogImplementation", "hive")
+      .set("spark.sql.catalog.v2hive_namespaces", classOf[HiveTableCatalog].getName)
+    withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
+      try {
+        spark.sql("USE v2hive_namespaces")
+        spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1")
+        assert(spark.sql(s"SHOW NAMESPACES").collect().length == 2)
+      } finally {
+        spark.sql("DROP NAMESPACE IF EXISTS ns1")
+      }
+    }
+  }
+
+  test("nonexistent table") {
+    val sparkConf = new SparkConf()
+      .setMaster("local[*]")
+      .set("spark.ui.enabled", "false")
+      .set("spark.sql.catalogImplementation", "hive")
+      .set("spark.sql.catalog.v2hive", classOf[HiveTableCatalog].getName)
+    withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
+      val exception = intercept[AnalysisException] {
+        spark.table("v2hive.ns1.nonexistent_table")
+      }
+      assert(exception.message === "Table or view not found: v2hive.ns1.nonexistent_table")
+    }
+  }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorSuite.scala
new file mode 100644
index 000000000..090558383
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
+
+class HiveConnectorSuite extends KyuubiFunSuite {
+
+  def withTempTable(spark: SparkSession, table: String)(f: => Unit): Unit = {
+    spark.sql(
+      s"""
+         | CREATE TABLE IF NOT EXISTS
+         | $table (id String, date String)
+         | USING PARQUET
+         | PARTITIONED BY (date)
+         |""".stripMargin).collect()
+    try f
+    finally spark.sql(s"DROP TABLE $table")
+  }
+
+  test("simple query") {
+    val sparkConf = new SparkConf()
+      .setMaster("local[*]")
+      .set("spark.ui.enabled", "false")
+      .set("spark.sql.catalogImplementation", "hive")
+      .set("spark.sql.catalog.hivev2", classOf[HiveTableCatalog].getName)
+    withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
+      val table = "default.employee"
+      withTempTable(spark, table) {
+        spark.sql(
+          s"""
+             | INSERT INTO
+             | $table
+             | VALUES("yi", "2022-08-08")
+             |""".stripMargin).collect()
+
+        // can query an existing Hive table in three sections
+        val result = spark.sql(
+          s"""
+             | SELECT * FROM hivev2.$table
+             |""".stripMargin)
+        assert(result.collect().head == Row("yi", "2022-08-08"))
+
+        // error msg should contains catalog info if table is not exist
+        val e = intercept[AnalysisException] {
+          spark.sql(
+            s"""
+               | SELECT * FROM hivev2.ns1.tb1
+               |""".stripMargin)
+        }
+        assert(e.getMessage().contains("Table or view not found: hivev2.ns1.tb1"))
+      }
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index cb81f5fe4..ea6484897 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2185,6 +2185,7 @@
             <modules>
                 <module>extensions/spark/kyuubi-extension-spark-common</module>
                 <module>extensions/spark/kyuubi-extension-spark-3-3</module>
+                <module>extensions/spark/kyuubi-spark-connector-hive</module>
                 <module>extensions/spark/kyuubi-spark-connector-kudu</module>
             </modules>
         </profile>