You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/08/23 05:48:34 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3259] Initial implementation of the Hive Connector based on the Spark datasource V2
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new d822b3eba [KYUUBI #3259] Initial implementation of the Hive Connector based on the Spark datasource V2
d822b3eba is described below
commit d822b3eba3050eb6a3908cf9c6c059ca29f82100
Author: yikf <yi...@gmail.com>
AuthorDate: Tue Aug 23 13:48:21 2022 +0800
[KYUUBI #3259] Initial implementation of the Hive Connector based on the Spark datasource V2
### _Why are the changes needed?_
In a modern database architecture, users may have a strong need for federated queries. Since there are a large number of Hive warehouse in the history database, we tried to implement the Hive V2 Datasource based on Spark Datasource V2 to meet this need. for the discussion, see :https://lists.apache.org/thread/fq8ywr58rzf9bycflj1q4fl1xyz2rq2w
This PR is the first step in fixing https://github.com/apache/incubator-kyuubi/issues/3259, having
- initialization implementation
- support read code path
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3260 from yikf/hive-v2-connector.
Closes #3259
753aca30 [yikf] Initial implementation of the Hive Connector based on the Spark datasource V2
Authored-by: yikf <yi...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../spark/kyuubi-spark-connector-hive/pom.xml | 216 +++++++++++
.../spark/connector/hive/HiveFileIndex.scala | 256 +++++++++++++
.../kyuubi/spark/connector/hive/HiveReader.scala | 106 ++++++
.../kyuubi/spark/connector/hive/HiveScan.scala | 147 ++++++++
.../spark/connector/hive/HiveScanBuilder.scala | 42 +++
.../kyuubi/spark/connector/hive/HiveTable.scala | 74 ++++
.../spark/connector/hive/HiveTableCatalog.scala | 415 +++++++++++++++++++++
.../hive/KyuubiHiveConnectorException.scala | 22 ++
.../connector/hive/read/FilePartitionReader.scala | 112 ++++++
.../hive/read/HivePartitionReaderFactory.scala | 197 ++++++++++
.../hive/read/HivePartitionedFileReader.scala | 33 ++
.../hive/read/HivePartitionedReader.scala | 154 ++++++++
.../kyuubi/connector/HiveConnectorHelper.scala | 80 ++++
.../spark/connector/hive/HiveCatalogSuite.scala | 73 ++++
.../spark/connector/hive/HiveConnectorSuite.scala | 74 ++++
pom.xml | 1 +
16 files changed, 2002 insertions(+)
diff --git a/extensions/spark/kyuubi-spark-connector-hive/pom.xml b/extensions/spark/kyuubi-spark-connector-hive/pom.xml
new file mode 100644
index 000000000..9f0912992
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/pom.xml
@@ -0,0 +1,216 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-parent</artifactId>
+ <version>1.7.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kyuubi-spark-connector-hive</artifactId>
+ <name>Kyuubi Spark Hive Connector</name>
+ <description>A Kyuubi hive connector based on Spark V2 DataSource</description>
+ <packaging>jar</packaging>
+ <url>https://kyuubi.apache.org/</url>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-spark-connector-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-spark-connector-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatestplus</groupId>
+ <artifactId>scalacheck-1-15_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-runtime</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+ <!--
+ Spark requires `commons-collections` and `commons-io` but got them from transitive
+ dependencies of `hadoop-client`. As we are using Hadoop Shaded Client, we need add
+ them explicitly. See more details at SPARK-33212.
+ -->
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.xml.bind</groupId>
+ <artifactId>jakarta.xml.bind-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <artifactSet>
+ <includes>
+ <include>com.google.guava:guava</include>
+ <include>org.apache.kyuubi:kyuubi-spark-connector-common_${scala.binary.version}</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>${kyuubi.shade.packageName}.com.google.common</shadedPattern>
+ <includes>
+ <include>com.google.common.**</include>
+ </includes>
+ </relocation>
+ </relocations>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>prepare-test-jar</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveFileIndex.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveFileIndex.scala
new file mode 100644
index 000000000..0a8ec26b4
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveFileIndex.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import java.net.URI
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{expressions, InternalRow}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, Predicate}
+import org.apache.spark.sql.connector.catalog.CatalogPlugin
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.hiveClientImpl
+import org.apache.spark.sql.types.StructType
+
+class HiveCatalogFileIndex(
+ sparkSession: SparkSession,
+ val catalogTable: CatalogTable,
+ hiveCatalog: HiveTableCatalog,
+ override val sizeInBytes: Long)
+ extends PartitioningAwareFileIndex(
+ sparkSession,
+ catalogTable.storage.properties,
+ Some(catalogTable.schema)) {
+
+ private val table = catalogTable
+
+ private val partPathToBindHivePart: mutable.Map[PartitionPath, HivePartition] = mutable.Map()
+
+ private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+
+ private lazy val hiveTable: Table = hiveClientImpl.toHiveTable(table)
+
+ private val baseLocation: Option[URI] = table.storage.locationUri
+
+ override def partitionSchema: StructType = table.partitionSchema
+
+ private[hive] def listHiveFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression])
+ : (Seq[PartitionDirectory], Map[PartitionDirectory, HivePartition]) = {
+ val fileIndex = filterPartitions(partitionFilters)
+ val partDirs = fileIndex.listFiles(partitionFilters, dataFilters)
+ val partDirToHivePart = fileIndex.partDirToBindHivePartMap()
+ (partDirs, partDirToHivePart)
+ }
+
+ def filterPartitions(filters: Seq[Expression]): HiveInMemoryFileIndex = {
+ if (table.partitionColumnNames.nonEmpty) {
+ val startTime = System.nanoTime()
+ val selectedPartitions: Seq[BindPartition] = {
+ val partitions =
+ ExternalCatalogUtils.listPartitionsByFilter(
+ sparkSession.sessionState.conf,
+ hiveCatalog.catalog,
+ table,
+ filters)
+ partitions.map(buildBindPartition)
+ }
+
+ val partitions = selectedPartitions.map {
+ case BindPartition(catalogTablePartition, hivePartition) =>
+ val path = new Path(catalogTablePartition.location)
+ val fs = path.getFileSystem(hadoopConf)
+ val partPath = PartitionPath(
+ catalogTablePartition.toRow(
+ partitionSchema,
+ sparkSession.sessionState.conf.sessionLocalTimeZone),
+ path.makeQualified(fs.getUri, fs.getWorkingDirectory))
+ partPathToBindHivePart += (partPath -> hivePartition)
+ partPath
+ }
+ val partitionSpec = PartitionSpec(partitionSchema, partitions)
+ val timeNs = System.nanoTime() - startTime
+ new HiveInMemoryFileIndex(
+ sparkSession = sparkSession,
+ rootPathsSpecified = partitionSpec.partitions.map(_.path),
+ parameters = table.properties,
+ partPathToBindHivePart = partPathToBindHivePart.toMap,
+ userSpecifiedSchema = Some(partitionSpec.partitionColumns),
+ fileStatusCache = fileStatusCache,
+ userSpecifiedPartitionSpec = Some(partitionSpec),
+ metadataOpsTimeNs = Some(timeNs))
+ } else {
+ new HiveInMemoryFileIndex(
+ sparkSession = sparkSession,
+ rootPathsSpecified = rootPaths,
+ parameters = table.properties,
+ userSpecifiedSchema = None,
+ fileStatusCache = fileStatusCache)
+ }
+ }
+
+ private def buildBindPartition(partition: CatalogTablePartition): BindPartition =
+ BindPartition(partition, hiveClientImpl.toHivePartition(partition, hiveTable))
+
+ override def partitionSpec(): PartitionSpec = {
+ throw notSupportOperator("partitionSpec")
+ }
+
+ override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
+ throw notSupportOperator("leafFiles")
+ }
+
+ override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = {
+ throw notSupportOperator("leafDirToChildrenFiles")
+ }
+
+ override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
+
+ override def refresh(): Unit = fileStatusCache.invalidateAll()
+
+ def notSupportOperator(operator: String): UnsupportedOperationException = {
+ new UnsupportedOperationException(s"Not support $operator in Hive file index.")
+ }
+}
+
+class HiveInMemoryFileIndex(
+ sparkSession: SparkSession,
+ rootPathsSpecified: Seq[Path],
+ parameters: Map[String, String],
+ userSpecifiedSchema: Option[StructType],
+ partPathToBindHivePart: Map[PartitionPath, HivePartition] = Map.empty,
+ fileStatusCache: FileStatusCache = NoopCache,
+ userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
+ override val metadataOpsTimeNs: Option[Long] = None)
+ extends InMemoryFileIndex(
+ sparkSession,
+ rootPathsSpecified,
+ parameters,
+ userSpecifiedSchema,
+ fileStatusCache,
+ userSpecifiedPartitionSpec,
+ metadataOpsTimeNs) {
+
+ private val partDirToBindHivePart: mutable.Map[PartitionDirectory, HivePartition] = mutable.Map()
+
+ override def listFiles(
+ partitionFilters: Seq[Expression],
+ dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+ def isNonEmptyFile(f: FileStatus): Boolean = {
+ isDataPath(f.getPath) && f.getLen > 0
+ }
+ val selectedPartitions =
+ if (partitionSpec().partitionColumns.isEmpty) {
+ PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
+ } else {
+ if (recursiveFileLookup) {
+ throw new IllegalArgumentException(
+ "Datasource with partition do not allow recursive file loading.")
+ }
+ prunePartitions(partitionFilters, partitionSpec()).map {
+ case partPath @ PartitionPath(values, path) =>
+ val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
+ case Some(existingDir) =>
+ // Directory has children files in it, return them
+ existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f))
+
+ case None =>
+ // Directory does not exist, or has no children files
+ Nil
+ }
+ val partDir = PartitionDirectory(values, files)
+ // Update Partition Directory -> binding Hive part map
+ updatePartDirHivePartitionMapping(partDir, partPath)
+
+ partDir
+ }
+ }
+ logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
+ selectedPartitions
+ }
+
+ private def prunePartitions(
+ predicates: Seq[Expression],
+ partitionSpec: PartitionSpec): Seq[PartitionPath] = {
+ val PartitionSpec(partitionColumns, partitions) = partitionSpec
+ val partitionColumnNames = partitionColumns.map(_.name).toSet
+ val partitionPruningPredicates = predicates.filter {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ }
+
+ if (partitionPruningPredicates.nonEmpty) {
+ val predicate = partitionPruningPredicates.reduce(expressions.And)
+
+ val boundPredicate = Predicate.createInterpreted(predicate.transform {
+ case a: AttributeReference =>
+ val index = partitionColumns.indexWhere(a.name == _.name)
+ BoundReference(index, partitionColumns(index).dataType, nullable = true)
+ })
+
+ val selected = partitions.filter {
+ case PartitionPath(values, _) => boundPredicate.eval(values)
+ }
+ logInfo {
+ val total = partitions.length
+ val selectedSize = selected.length
+ val percentPruned = (1 - selectedSize.toDouble / total.toDouble) * 100
+ s"Selected $selectedSize partitions out of $total, " +
+ s"pruned ${if (total == 0) "0" else s"$percentPruned%"} partitions."
+ }
+
+ selected
+ } else {
+ partitions
+ }
+ }
+
+ private def isDataPath(path: Path): Boolean = {
+ val name = path.getName
+ !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
+ }
+
+ def partDirToBindHivePartMap(): Map[PartitionDirectory, HivePartition] = {
+ partDirToBindHivePart.toMap
+ }
+
+ def updatePartDirHivePartitionMapping(
+ partDir: PartitionDirectory,
+ partPath: PartitionPath): Unit = {
+ if (partPathToBindHivePart.contains(partPath)) {
+ partDirToBindHivePart += (partDir -> partPathToBindHivePart(partPath))
+ }
+ }
+}
+
+case class BindPartition(catalogTablePartition: CatalogTablePartition, hivePartition: HivePartition)
+
+object HiveTableCatalogFileIndex {
+ implicit class CatalogHelper(plugin: CatalogPlugin) {
+ def asHiveCatalog: HiveTableCatalog = plugin match {
+ case hiveTableCatalog: HiveTableCatalog =>
+ hiveTableCatalog
+ case _ =>
+ throw KyuubiHiveConnectorException(
+ s"Cannot use catalog ${plugin.name}: not a HiveTableCatalog")
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveReader.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveReader.scala
new file mode 100644
index 000000000..bd8dadbbe
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveReader.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.hive.ql.metadata.{Table => TableHive}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{InputFormat, JobConf}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.{hiveShim, hiveTableUtil}
+import org.apache.spark.sql.types.StructType
+
+object HiveReader {
+
+ def initializeHiveConf(
+ hiveTable: TableHive,
+ hiveConf: Configuration,
+ dataSchema: StructType,
+ readDataSchema: StructType): Unit = {
+ // Build tableDesc from hiveTable
+ val tableDesc = getTableDec(hiveTable)
+ // Set required columns and serde columns to hiveConf according to schema
+ addColumnMetadataToConf(tableDesc, hiveConf, dataSchema, readDataSchema)
+ // Copy hive table properties to hiveConf. For example,
+ // initial job conf to read files with specified format
+ hiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, hiveConf, false)
+ }
+
+ private def addColumnMetadataToConf(
+ tableDesc: TableDesc,
+ hiveConf: Configuration,
+ dataSchema: StructType,
+ readDataSchema: StructType): Unit = {
+ // Set required column id and name to hiveConf,
+ // to specified read columns without partition schema of Hive file reader
+ val neededColumnNames = readDataSchema.map(_.name)
+ val neededColumnIDs =
+ readDataSchema.map(field => Integer.valueOf(dataSchema.fields.indexOf(field)))
+
+ hiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames)
+
+ val deserializer = tableDesc.getDeserializerClass.newInstance
+ deserializer.initialize(hiveConf, tableDesc.getProperties)
+
+ // Specifies types and object inspectors of columns to be scanned.
+ val structOI = ObjectInspectorUtils
+ .getStandardObjectInspector(
+ deserializer.getObjectInspector,
+ ObjectInspectorCopyOption.JAVA)
+ .asInstanceOf[StructObjectInspector]
+
+ val columnTypeNames = structOI
+ .getAllStructFieldRefs.asScala
+ .map(_.getFieldObjectInspector)
+ .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+ .mkString(",")
+
+ hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+ hiveConf.set(serdeConstants.LIST_COLUMNS, dataSchema.map(_.name).mkString(","))
+ }
+
+ def getTableDec(hiveTable: TableHive): TableDesc = {
+ new TableDesc(
+ hiveTable.getInputFormatClass,
+ hiveTable.getOutputFormatClass,
+ hiveTable.getMetadata)
+ }
+
+ def toAttributes(structType: StructType): Seq[AttributeReference] =
+ structType.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+
+ def getInputFormat(
+ ifc: Class[_ <: InputFormat[Writable, Writable]],
+ conf: JobConf): InputFormat[Writable, Writable] = {
+ val newInputFormat = ReflectionUtils.newInstance(ifc.asInstanceOf[Class[_]], conf)
+ .asInstanceOf[InputFormat[Writable, Writable]]
+ newInputFormat match {
+ case c: Configurable => c.setConf(conf)
+ case _ =>
+ }
+ newInputFormat
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScan.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScan.scala
new file mode 100644
index 000000000..cb0ba0513
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScan.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import java.util.Locale
+
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.connector.read.PartitionReaderFactory
+import org.apache.spark.sql.execution.PartitionedFileUtil
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.v2.FileScan
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.hiveClientImpl
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory
+
+case class HiveScan(
+ sparkSession: SparkSession,
+ fileIndex: HiveCatalogFileIndex,
+ catalogTable: CatalogTable,
+ dataSchema: StructType,
+ readDataSchema: StructType,
+ readPartitionSchema: StructType,
+ pushedFilters: Array[Filter] = Array.empty,
+ partitionFilters: Seq[Expression] = Seq.empty,
+ dataFilters: Seq[Expression] = Seq.empty) extends FileScan {
+
+ private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+
+ private val partFileToHivePartMap: mutable.Map[PartitionedFile, HivePartition] = mutable.Map()
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ val hiveConf = sparkSession.sessionState.newHadoopConf()
+ addCatalogTableConfToConf(hiveConf, catalogTable)
+
+ val table = hiveClientImpl.toHiveTable(catalogTable)
+ HiveReader.initializeHiveConf(table, hiveConf, dataSchema, readDataSchema)
+ val broadcastHiveConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveConf))
+
+ HivePartitionReaderFactory(
+ sparkSession.sessionState.conf.clone(),
+ broadcastHiveConf,
+ table,
+ dataSchema,
+ readDataSchema,
+ readPartitionSchema,
+ partFileToHivePartMap.toMap,
+ pushedFilters = pushedFilters)
+ }
+
+ override protected def partitions: Seq[FilePartition] = {
+ val (selectedPartitions, partDirToHivePartMap) =
+ fileIndex.listHiveFiles(partitionFilters, dataFilters)
+ val maxSplitBytes = FilePartition.maxSplitBytes(sparkSession, selectedPartitions)
+ val partitionAttributes = toAttributes(fileIndex.partitionSchema)
+ val attributeMap = partitionAttributes.map(a => normalizeName(a.name) -> a).toMap
+ val readPartitionAttributes = readPartitionSchema.map { readField =>
+ attributeMap.getOrElse(
+ normalizeName(readField.name),
+ throw KyuubiHiveConnectorException(s"Can't find required partition " +
+ s"column ${readField.name} in partition schema ${fileIndex.partitionSchema}"))
+ }
+ lazy val partitionValueProject =
+ GenerateUnsafeProjection.generate(readPartitionAttributes, partitionAttributes)
+ val splitFiles = selectedPartitions.flatMap { partition =>
+ val partitionValues =
+ if (readPartitionAttributes != partitionAttributes) {
+ partitionValueProject(partition.values).copy()
+ } else {
+ partition.values
+ }
+ partition.files.flatMap { file =>
+ val filePath = file.getPath
+ val partFiles = PartitionedFileUtil.splitFiles(
+ sparkSession = sparkSession,
+ file = file,
+ filePath = filePath,
+ isSplitable = isSplitable(filePath),
+ maxSplitBytes = maxSplitBytes,
+ partitionValues = partitionValues)
+
+ if (partDirToHivePartMap.contains(partition)) {
+ partFiles.foreach { partFile =>
+ partFileToHivePartMap += (partFile -> partDirToHivePartMap(partition))
+ }
+ }
+ partFiles
+ }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+ }
+
+ if (splitFiles.length == 1) {
+ val path = new Path(splitFiles(0).filePath)
+ if (!isSplitable(path) && splitFiles(0).length >
+ sparkSession.sparkContext.getConf.getOption("spark.io.warning.largeFileThreshold")
+ .getOrElse("1024000000").toLong) {
+ logWarning(s"Loading one large unsplittable file ${path.toString} with only one " +
+ s"partition, the reason is: ${getFileUnSplittableReason(path)}")
+ }
+ }
+
+ FilePartition.getFilePartitions(sparkSession, splitFiles, maxSplitBytes)
+ }
+
+ private def addCatalogTableConfToConf(hiveConf: Configuration, table: CatalogTable): Unit = {
+ table.properties.foreach {
+ case (key, value) =>
+ hiveConf.set(key, value)
+ }
+ }
+
+ private def normalizeName(name: String): String = {
+ if (isCaseSensitive) {
+ name
+ } else {
+ name.toLowerCase(Locale.ROOT)
+ }
+ }
+
+ def toAttributes(structType: StructType): Seq[AttributeReference] =
+ structType.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScanBuilder.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScanBuilder.scala
new file mode 100644
index 000000000..7297af238
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveScanBuilder.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
+import org.apache.spark.sql.types.StructType
+
+case class HiveScanBuilder(
+ sparkSession: SparkSession,
+ fileIndex: HiveCatalogFileIndex,
+ dataSchema: StructType,
+ table: CatalogTable)
+ extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+
+ override def build(): Scan = {
+ HiveScan(
+ sparkSession = sparkSession,
+ fileIndex = fileIndex,
+ catalogTable = table,
+ dataSchema = dataSchema,
+ readDataSchema = readDataSchema(),
+ readPartitionSchema = readPartitionSchema())
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
new file mode 100644
index 000000000..3305a830e
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTable.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+case class HiveTable(
+ sparkSession: SparkSession,
+ catalogTable: CatalogTable,
+ hiveTableCatalog: HiveTableCatalog)
+ extends Table with SupportsRead with SupportsWrite with Logging {
+
+ lazy val dataSchema: StructType = catalogTable.dataSchema
+
+ lazy val partitionSchema: StructType = catalogTable.partitionSchema
+
+ def rootPaths: Seq[Path] = catalogTable.storage.locationUri.map(new Path(_)).toSeq
+
+ lazy val fileIndex: HiveCatalogFileIndex = {
+ val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes
+ new HiveCatalogFileIndex(
+ sparkSession,
+ catalogTable,
+ hiveTableCatalog,
+ catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
+ }
+
+ override def name(): String = catalogTable.identifier.table
+
+ override def schema(): StructType = catalogTable.schema
+
+ override def properties(): util.Map[String, String] = catalogTable.properties.asJava
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
+ HiveScanBuilder(sparkSession, fileIndex, dataSchema, catalogTable)
+ }
+
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ // TODO: Support write code path fo hive v2.
+ throw new UnsupportedOperationException("Not support write for v2 hive.")
+ }
+
+ override def capabilities(): util.Set[TableCapability] = {
+ util.EnumSet.of(BATCH_READ)
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
new file mode 100644
index 000000000..848d5be4b
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import java.net.URI
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.util.quoteIfNeeded
+import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
+import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.{catalogV2Util, postExternalCatalogEvent, HiveExternalCatalog, HiveMetastoreCatalog, HiveSessionCatalog}
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
+
+/**
+ * A [[TableCatalog]] that wrap HiveExternalCatalog to as V2 CatalogPlugin instance to access Hive.
+ */
+class HiveTableCatalog(sparkSession: SparkSession)
+ extends TableCatalog with SQLConfHelper with SupportsNamespaces with Logging {
+
+ def this() = this(SparkSession.active)
+
+ private val sc = sparkSession.sparkContext
+
+ private val sessionState = sparkSession.sessionState
+
+ private var catalogName: String = _
+
+ private var catalogOptions: CaseInsensitiveStringMap = _
+
+ var catalog: HiveSessionCatalog = _
+
+ val NAMESPACE_RESERVED_PROPERTIES =
+ Seq(
+ SupportsNamespaces.PROP_COMMENT,
+ SupportsNamespaces.PROP_LOCATION,
+ SupportsNamespaces.PROP_OWNER)
+
+ private lazy val hadoopConf: Configuration = {
+ val conf = sparkSession.sessionState.newHadoopConf()
+ catalogOptions.asScala.foreach {
+ case (k, v) => conf.set(k, v)
+ case _ =>
+ }
+ conf
+ }
+
+ private lazy val sparkConf: SparkConf = {
+ val conf = sparkSession.sparkContext.getConf
+ catalogOptions.asScala.foreach {
+ case (k, v) => conf.set(k, v)
+ }
+ conf
+ }
+
+ def hadoopConfiguration(): Configuration = hadoopConf
+
+ override def name(): String = {
+ require(catalogName != null, "The Hive table catalog is not initialed")
+ catalogName
+ }
+
+ override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
+ assert(catalogName == null, "The Hive table catalog is already initialed.")
+ assert(
+ conf.getConf(CATALOG_IMPLEMENTATION) == "hive",
+ s"Require setting ${CATALOG_IMPLEMENTATION.key} to `hive` to enable hive support.")
+ catalogName = name
+ catalogOptions = options
+ catalog = new HiveSessionCatalog(
+ externalCatalogBuilder = () => externalCatalog,
+ globalTempViewManagerBuilder = () => sparkSession.sharedState.globalTempViewManager,
+ metastoreCatalog = new HiveMetastoreCatalog(sparkSession),
+ functionRegistry = sessionState.functionRegistry,
+ tableFunctionRegistry = sessionState.tableFunctionRegistry,
+ hadoopConf = hadoopConf,
+ parser = sessionState.sqlParser,
+ functionResourceLoader = sessionState.resourceLoader,
+ HiveUDFExpressionBuilder)
+ }
+
+ /**
+ * A catalog that interacts with external systems.
+ */
+ lazy val externalCatalog: ExternalCatalogWithListener = {
+ val externalCatalog = new HiveExternalCatalog(sparkConf, hadoopConf)
+
+ // Wrap to provide catalog events
+ val wrapped = new ExternalCatalogWithListener(externalCatalog)
+
+ // Make sure we propagate external catalog events to the spark listener bus
+ wrapped.addListener((event: ExternalCatalogEvent) => postExternalCatalogEvent(sc, event))
+
+ wrapped
+ }
+
+ override val defaultNamespace: Array[String] = Array("default")
+
+ override def listTables(namespace: Array[String]): Array[Identifier] = {
+ namespace match {
+ case Array(db) =>
+ catalog
+ .listTables(db)
+ .map(ident => Identifier.of(ident.database.map(Array(_)).getOrElse(Array()), ident.table))
+ .toArray
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+ }
+
+ override def loadTable(ident: Identifier): Table = {
+ HiveTable(sparkSession, catalog.getTableMetadata(ident.asTableIdentifier), this)
+ }
+
+ override def createTable(
+ ident: Identifier,
+ schema: StructType,
+ partitions: Array[Transform],
+ properties: util.Map[String, String]): Table = {
+ import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.TransformHelper
+ val (partitionColumns, maybeBucketSpec) = partitions.toSeq.convertTransforms
+ val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, conf.defaultDataSourceName)
+ val tableProperties = properties.asScala
+ val location = Option(properties.get(TableCatalog.PROP_LOCATION))
+ val storage = DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
+ .copy(locationUri = location.map(CatalogUtils.stringToURI))
+ val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
+ val tableType =
+ if (isExternal || location.isDefined) {
+ CatalogTableType.EXTERNAL
+ } else {
+ CatalogTableType.MANAGED
+ }
+
+ val tableDesc = CatalogTable(
+ identifier = ident.asTableIdentifier,
+ tableType = tableType,
+ storage = storage,
+ schema = schema,
+ provider = Some(provider),
+ partitionColumnNames = partitionColumns,
+ bucketSpec = maybeBucketSpec,
+ properties = tableProperties.toMap,
+ tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
+ comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
+
+ try {
+ catalog.createTable(tableDesc, ignoreIfExists = false)
+ } catch {
+ case _: TableAlreadyExistsException =>
+ throw new TableAlreadyExistsException(ident)
+ }
+
+ loadTable(ident)
+ }
+
+ override def alterTable(ident: Identifier, changes: TableChange*): Table = {
+ val catalogTable =
+ try {
+ catalog.getTableMetadata(ident.asTableIdentifier)
+ } catch {
+ case _: NoSuchTableException =>
+ throw new NoSuchTableException(ident)
+ }
+
+ val properties = catalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
+ val schema = catalogV2Util.applySchemaChanges(
+ catalogTable.schema,
+ changes)
+ val comment = properties.get(TableCatalog.PROP_COMMENT)
+ val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
+ val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
+ val storage =
+ if (location.isDefined) {
+ catalogTable.storage.copy(locationUri = location)
+ } else {
+ catalogTable.storage
+ }
+
+ try {
+ catalog.alterTable(
+ catalogTable.copy(
+ properties = properties,
+ schema = schema,
+ owner = owner,
+ comment = comment,
+ storage = storage))
+ } catch {
+ case _: NoSuchTableException =>
+ throw new NoSuchTableException(ident)
+ }
+
+ loadTable(ident)
+ }
+
+ override def dropTable(ident: Identifier): Boolean = {
+ try {
+ if (loadTable(ident) != null) {
+ catalog.dropTable(
+ ident.asTableIdentifier,
+ ignoreIfNotExists = true,
+ purge = true /* skip HDFS trash */ )
+ true
+ } else {
+ false
+ }
+ } catch {
+ case _: NoSuchTableException =>
+ false
+ }
+ }
+
+ override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
+ if (tableExists(newIdent)) {
+ throw new TableAlreadyExistsException(newIdent)
+ }
+
+ // Load table to make sure the table exists
+ loadTable(oldIdent)
+ catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
+ }
+
+ private def toOptions(properties: Map[String, String]): Map[String, String] = {
+ properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
+ case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value
+ }.toMap
+ }
+
+ override def listNamespaces(): Array[Array[String]] = {
+ catalog.listDatabases().map(Array(_)).toArray
+ }
+
+ override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
+ namespace match {
+ case Array() =>
+ listNamespaces()
+ case Array(db) if catalog.databaseExists(db) =>
+ Array()
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+ }
+
+ override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = {
+ namespace match {
+ case Array(db) =>
+ try {
+ catalog.getDatabaseMetadata(db).toMetadata
+ } catch {
+ case _: NoSuchDatabaseException =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+ }
+
+ override def createNamespace(
+ namespace: Array[String],
+ metadata: util.Map[String, String]): Unit = namespace match {
+ case Array(db) if !catalog.databaseExists(db) =>
+ catalog.createDatabase(
+ toCatalogDatabase(db, metadata, defaultLocation = Some(catalog.getDefaultDBPath(db))),
+ ignoreIfExists = false)
+
+ case Array(_) =>
+ throw new NamespaceAlreadyExistsException(namespace)
+
+ case _ =>
+ throw new IllegalArgumentException(s"Invalid namespace name: ${namespace.quoted}")
+ }
+
+ override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
+ namespace match {
+ case Array(db) =>
+ // validate that this catalog's reserved properties are not removed
+ changes.foreach {
+ case remove: RemoveProperty if NAMESPACE_RESERVED_PROPERTIES.contains(remove.property) =>
+ throw new UnsupportedOperationException(
+ s"Cannot remove reserved property: ${remove.property}")
+ case _ =>
+ }
+
+ val metadata = catalog.getDatabaseMetadata(db).toMetadata
+ catalog.alterDatabase(
+ toCatalogDatabase(db, catalogV2Util.applyNamespaceChanges(metadata, changes)))
+
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+ }
+
+ /**
+ * List the metadata of partitions that belong to the specified table, assuming it exists, that
+ * satisfy the given partition-pruning predicate expressions.
+ */
+ def listPartitionsByFilter(
+ tableName: TableIdentifier,
+ predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+ catalog.listPartitionsByFilter(tableName, predicates)
+ }
+
+ def listPartitions(
+ tableName: TableIdentifier,
+ partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = {
+ catalog.listPartitions(tableName, partialSpec)
+ }
+
+ override def dropNamespace(
+ namespace: Array[String],
+ cascade: Boolean): Boolean = namespace match {
+ case Array(db) if catalog.databaseExists(db) =>
+ if (catalog.listTables(db).nonEmpty) {
+ throw new IllegalStateException(s"Namespace ${namespace.quoted} is not empty")
+ }
+ catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
+ true
+
+ case Array(_) =>
+ // exists returned false
+ false
+
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+}
+
+private object HiveTableCatalog {
+ private def toCatalogDatabase(
+ db: String,
+ metadata: util.Map[String, String],
+ defaultLocation: Option[URI] = None): CatalogDatabase = {
+ CatalogDatabase(
+ name = db,
+ description = metadata.getOrDefault(SupportsNamespaces.PROP_COMMENT, ""),
+ locationUri = Option(metadata.get(SupportsNamespaces.PROP_LOCATION))
+ .map(CatalogUtils.stringToURI)
+ .orElse(defaultLocation)
+ .getOrElse(throw new IllegalArgumentException("Missing database location")),
+ properties = metadata.asScala.toMap --
+ Seq(SupportsNamespaces.PROP_COMMENT, SupportsNamespaces.PROP_LOCATION))
+ }
+
+ implicit class NamespaceHelper(namespace: Array[String]) {
+ def quoted: String = namespace.map(quoteIfNeeded).mkString(".")
+ }
+
+ implicit class IdentifierHelper(ident: Identifier) {
+ def quoted: String = {
+ if (ident.namespace.nonEmpty) {
+ ident.namespace.map(quoteIfNeeded).mkString(".") + "." + quoteIfNeeded(ident.name)
+ } else {
+ quoteIfNeeded(ident.name)
+ }
+ }
+
+ def asMultipartIdentifier: Seq[String] = ident.namespace :+ ident.name
+
+ def asTableIdentifier: TableIdentifier = ident.namespace match {
+ case ns if ns.isEmpty => TableIdentifier(ident.name)
+ case Array(dbName) => TableIdentifier(ident.name, Some(dbName))
+ case _ =>
+ throw KyuubiHiveConnectorException(
+ s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.")
+ }
+ }
+
+ implicit class CatalogDatabaseHelper(catalogDatabase: CatalogDatabase) {
+ def toMetadata: util.Map[String, String] = {
+ val metadata = mutable.HashMap[String, String]()
+
+ catalogDatabase.properties.foreach {
+ case (key, value) => metadata.put(key, value)
+ }
+ metadata.put(SupportsNamespaces.PROP_LOCATION, catalogDatabase.locationUri.toString)
+ metadata.put(SupportsNamespaces.PROP_COMMENT, catalogDatabase.description)
+
+ metadata.asJava
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorException.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorException.scala
new file mode 100644
index 000000000..7cdcc211f
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorException.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+case class KyuubiHiveConnectorException(
+ message: String,
+ cause: Option[Throwable] = None) extends Exception(message, cause.orNull)
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
new file mode 100644
index 000000000..fe8564ca8
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/FilePartitionReader.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.read
+
+import java.io.{FileNotFoundException, IOException}
+
+import org.apache.parquet.io.ParquetDecodingException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.PartitionReader
+import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.inputFileBlockHolder
+import org.apache.spark.sql.internal.SQLConf
+
+// scalastyle:off line.size.limit
+// copy from https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+// scalastyle:on line.size.limit
+class FilePartitionReader[T](readers: Iterator[HivePartitionedFileReader[T]])
+ extends PartitionReader[T] with Logging {
+ private var currentReader: HivePartitionedFileReader[T] = null
+
+ private val sqlConf = SQLConf.get
+ private def ignoreMissingFiles = sqlConf.ignoreMissingFiles
+ private def ignoreCorruptFiles = sqlConf.ignoreCorruptFiles
+
+ override def next(): Boolean = {
+ if (currentReader == null) {
+ if (readers.hasNext) {
+ try {
+ currentReader = getNextReader()
+ } catch {
+ case e: FileNotFoundException if ignoreMissingFiles =>
+ logWarning(s"Skipped missing file.", e)
+ currentReader = null
+ // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+ case e: FileNotFoundException if !ignoreMissingFiles =>
+ throw fileNotFoundError(e)
+ case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
+ logWarning(
+ s"Skipped the rest of the content in the corrupted file.",
+ e)
+ currentReader = null
+ }
+ } else {
+ return false
+ }
+ }
+
+ // In PartitionReader.next(), the current reader proceeds to next record.
+ // It might throw RuntimeException/IOException and Spark should handle these exceptions.
+ val hasNext =
+ try {
+ currentReader != null && currentReader.next()
+ } catch {
+ case e @ (_: SchemaColumnConvertNotSupportedException
+ | _: ParquetDecodingException) =>
+ throw e
+ case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
+ logWarning(
+ s"Skipped the rest of the content in the corrupted file: $currentReader",
+ e)
+ false
+ }
+ if (hasNext) {
+ true
+ } else {
+ close()
+ currentReader = null
+ next()
+ }
+ }
+
+ override def get(): T = currentReader.get()
+
+ override def close(): Unit = {
+ if (currentReader != null) {
+ currentReader.close()
+ }
+ inputFileBlockHolder.unset()
+ }
+
+ private def getNextReader(): HivePartitionedFileReader[T] = {
+ val reader = readers.next()
+ logInfo(s"Reading file $reader")
+ // Sets InputFileBlockHolder for the file block's information
+ val file = reader.file
+ inputFileBlockHolder.set(file.filePath, file.start, file.length)
+ reader
+ }
+
+ def fileNotFoundError(e: FileNotFoundException): Throwable = {
+ new FileNotFoundException(
+ e.getMessage + "\n" +
+ "It is possible the underlying files have been updated. " +
+ "You can explicitly invalidate the cache in Spark by " +
+ "recreating the Dataset/DataFrame involved.")
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
new file mode 100644
index 000000000..416fdb2eb
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionReaderFactory.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.read
+
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.io._
+import org.apache.hadoop.mapred._
+import org.apache.spark.TaskContext
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
+import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.NextIterator
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types._
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.kyuubi.spark.connector.hive.HiveReader
+
+case class HivePartitionReaderFactory(
+ sqlConf: SQLConf,
+ broadcastHiveConf: Broadcast[SerializableConfiguration],
+ hiveTable: HiveTable,
+ dataSchema: StructType,
+ readDataSchema: StructType,
+ partitionSchema: StructType,
+ partFileToHivePart: Map[PartitionedFile, HivePartition],
+ pushedFilters: Array[Filter] = Array.empty)
+ extends FilePartitionReaderFactory with Logging {
+
+ private val charset: String =
+ sqlConf.getConfString("hive.exec.default.charset", "utf-8")
+
+ val tableDesc = HiveReader.getTableDec(hiveTable)
+ val nonPartitionReadDataKeys = HiveReader.toAttributes(readDataSchema)
+
+ override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
+ throw new UnsupportedOperationException("Cannot use buildReader directly.")
+ }
+
+ override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
+ assert(partition.isInstanceOf[FilePartition])
+ val filePartition = partition.asInstanceOf[FilePartition]
+ val iter: Iterator[HivePartitionedFileReader[InternalRow]] =
+ filePartition.files.toIterator.map { file =>
+ val bindHivePart = partFileToHivePart.getOrElse(file, null)
+ HivePartitionedFileReader(
+ file,
+ new PartitionReaderWithPartitionValues(
+ HivePartitionedReader(
+ file,
+ buildReaderInternal(file, bindHivePart),
+ tableDesc,
+ broadcastHiveConf,
+ nonPartitionReadDataKeys,
+ bindHivePart,
+ charset),
+ readDataSchema,
+ partitionSchema,
+ file.partitionValues))
+ }
+ new FilePartitionReader[InternalRow](iter)
+ }
+
+ def buildReaderInternal(
+ file: PartitionedFile,
+ bindPartition: HivePartition): PartitionReader[Writable] = {
+ val reader = createPartitionWritableReader(file, bindPartition)
+ val fileReader = new PartitionReader[Writable] {
+ override def next(): Boolean = reader.hasNext
+ override def get(): Writable = reader.next()
+ override def close(): Unit = {}
+ }
+ fileReader
+ }
+
+ private def createPartitionWritableReader[T](
+ file: PartitionedFile,
+ bindPartition: HivePartition): Iterator[Writable] = {
+ // Obtain binding HivePartition from input partitioned file
+ val partDesc =
+ if (bindPartition != null) {
+ Utilities.getPartitionDesc(bindPartition)
+ } else null
+
+ val ifc =
+ if (partDesc == null) {
+ hiveTable.getInputFormatClass
+ .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ } else {
+ partDesc.getInputFileFormatClass
+ .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
+ }
+
+ val jobConf = new JobConf(broadcastHiveConf.value.value)
+
+ val filePath = new Path(new URI(file.filePath))
+
+ if (tableDesc != null) {
+ configureJobPropertiesForStorageHandler(tableDesc, jobConf, true)
+ Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
+ }
+
+ val eventIter: NextIterator[Writable] = new NextIterator[Writable] {
+ // Initial 'FileSplit' and 'InputFormat' instance for Record reader construction
+ private val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty[String])
+ private val inputFormat: InputFormat[Writable, Writable] =
+ HiveReader.getInputFormat(ifc, jobConf)
+ logDebug(s"Final input format $inputFormat")
+ var reader: RecordReader[Writable, Writable] =
+ try {
+ inputFormat.getRecordReader(fileSplit.asInstanceOf[InputSplit], jobConf, Reporter.NULL)
+ } catch {
+ case e: Exception =>
+ logError("Exception raised when creating iterator reader", e)
+ throw e
+ }
+ val key: Writable =
+ if (reader == null) null.asInstanceOf[Writable] else reader.createKey()
+ val value: Writable =
+ if (reader == null) null.asInstanceOf[Writable] else reader.createValue()
+
+ override def getNext(): Writable = {
+ try {
+ finished = !reader.next(key, value)
+ } catch {
+ case e: Exception =>
+ logError(s"Exception raised when reading corrupt file: $fileSplit", e)
+ throw e
+ }
+ value
+ }
+
+ override def close(): Unit = {
+ if (reader != null) {
+ try {
+ reader.close()
+ } catch {
+ case e @ (_: Exception | _: Throwable) =>
+ logError("Exception in RecordReader.close()", e)
+ } finally {
+ reader = null
+ }
+ }
+ }
+ }
+
+ Option(TaskContext.get())
+ .foreach(_.addTaskCompletionListener[Unit](_ => eventIter.closeIfNeeded()))
+ eventIter
+ }
+
+ def configureJobPropertiesForStorageHandler(
+ tableDesc: TableDesc,
+ conf: Configuration,
+ input: Boolean): Unit = {
+ val property = tableDesc.getProperties.getProperty(META_TABLE_STORAGE)
+ val storageHandler =
+ org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(conf, property)
+ if (storageHandler != null) {
+ val jobProperties = new java.util.LinkedHashMap[String, String]
+ if (input) {
+ storageHandler.configureInputJobProperties(tableDesc, jobProperties)
+ } else {
+ storageHandler.configureOutputJobProperties(tableDesc, jobProperties)
+ }
+ if (!jobProperties.isEmpty) {
+ tableDesc.setJobProperties(jobProperties)
+ }
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedFileReader.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedFileReader.scala
new file mode 100644
index 000000000..fa0adf96b
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedFileReader.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.read
+
+import org.apache.spark.sql.connector.read.PartitionReader
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+
+case class HivePartitionedFileReader[T](
+ file: PartitionedFile,
+ reader: PartitionReader[T]) extends PartitionReader[T] {
+ override def next(): Boolean = reader.next()
+
+ override def get(): T = reader.get()
+
+ override def close(): Unit = reader.close()
+
+ override def toString: String = file.toString
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
new file mode 100644
index 000000000..108df65d4
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HivePartitionedReader.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive.read
+
+import java.util.Properties
+
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
+import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.serde2.Deserializer
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.primitive._
+import org.apache.hadoop.io.Writable
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.connector.read.PartitionReader
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hive.kyuubi.connector.HiveConnectorHelper.{hadoopTableReader, hiveShim}
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.SerializableConfiguration
+
+case class HivePartitionedReader(
+ file: PartitionedFile,
+ reader: PartitionReader[Writable],
+ tableDesc: TableDesc,
+ broadcastHiveConf: Broadcast[SerializableConfiguration],
+ nonPartitionReadDataKeys: Seq[Attribute],
+ bindPartition: HivePartition,
+ charset: String = "utf-8") extends PartitionReader[InternalRow] with Logging {
+
+ private val partDesc =
+ if (bindPartition != null) {
+ Utilities.getPartitionDesc(bindPartition)
+ } else null
+ private val hiveConf = broadcastHiveConf.value.value
+
+ private val tableDeser = tableDesc.getDeserializerClass.newInstance()
+ tableDeser.initialize(hiveConf, tableDesc.getProperties)
+
+ private val localDeser: Deserializer =
+ if (bindPartition != null &&
+ bindPartition.getDeserializer != null) {
+ val tableProperties = tableDesc.getProperties
+ val props = new Properties(tableProperties)
+ val deserializer =
+ bindPartition.getDeserializer.getClass.asInstanceOf[Class[Deserializer]].newInstance()
+ deserializer.initialize(hiveConf, props)
+ deserializer
+ } else {
+ tableDeser
+ }
+
+ private val internalRow = new SpecificInternalRow(nonPartitionReadDataKeys.map(_.dataType))
+
+ private val soi: StructObjectInspector =
+ if (localDeser.getObjectInspector.equals(tableDeser.getObjectInspector)) {
+ localDeser.getObjectInspector.asInstanceOf[StructObjectInspector]
+ } else {
+ ObjectInspectorConverters.getConvertedOI(
+ localDeser.getObjectInspector,
+ tableDeser.getObjectInspector).asInstanceOf[StructObjectInspector]
+ }
+
+ private val (fieldRefs, fieldOrdinals) = nonPartitionReadDataKeys.zipWithIndex
+ .map { case (attr, ordinal) =>
+ soi.getStructFieldRef(attr.name) -> ordinal
+ }.toArray.unzip
+
+ private val unwrappers: Seq[(Any, InternalRow, Int) => Unit] = fieldRefs.map {
+ _.getFieldObjectInspector match {
+ case oi: BooleanObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
+ case oi: ByteObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
+ case oi: ShortObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
+ case oi: IntObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
+ case oi: LongObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
+ case oi: FloatObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
+ case oi: DoubleObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
+ case oi: HiveVarcharObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
+ row.update(ordinal, UTF8String.fromString(oi.getPrimitiveJavaObject(value).getValue))
+ case oi: HiveCharObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
+ row.update(ordinal, UTF8String.fromString(oi.getPrimitiveJavaObject(value).getValue))
+ case oi: HiveDecimalObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
+ row.update(ordinal, hiveShim.toCatalystDecimal(oi, value))
+ case oi: TimestampObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
+ row.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value)))
+ case oi: DateObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
+ row.setInt(ordinal, DateTimeUtils.fromJavaDate(oi.getPrimitiveJavaObject(value)))
+ case oi: BinaryObjectInspector =>
+ (value: Any, row: InternalRow, ordinal: Int) =>
+ row.update(ordinal, oi.getPrimitiveJavaObject(value))
+ case oi =>
+ logDebug("HiveInspector class: " + oi.getClass.getName + ", charset: " + charset)
+ val unwrapper = hadoopTableReader.unwrapperFor(oi)
+ (value: Any, row: InternalRow, ordinal: Int) => row(ordinal) = unwrapper(value)
+ }
+ }
+
+ private val converter =
+ ObjectInspectorConverters.getConverter(localDeser.getObjectInspector, soi)
+
+ private def fillObject(value: Writable, mutableRow: InternalRow): InternalRow = {
+ val raw = converter.convert(localDeser.deserialize(value))
+ var i = 0
+ val length = fieldRefs.length
+ while (i < length) {
+ val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
+ if (fieldValue == null) {
+ mutableRow.setNullAt(fieldOrdinals(i))
+ } else {
+ unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
+ }
+ i += 1
+ }
+ mutableRow
+ }
+
+ override def next(): Boolean = reader.next()
+
+ override def get(): InternalRow = fillObject(reader.get(), internalRow)
+
+ override def close(): Unit = reader.close()
+
+ override def toString: String = file.toString
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveConnectorHelper.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveConnectorHelper.scala
new file mode 100644
index 000000000..19257f0f6
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/spark/sql/hive/kyuubi/connector/HiveConnectorHelper.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.kyuubi.connector
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.InputFileBlockHolder
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, ExternalCatalogEvent}
+import org.apache.spark.sql.connector.catalog.CatalogV2Util
+import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform}
+import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim, HiveTableUtil}
+import org.apache.spark.sql.hive.client.HiveClientImpl
+
+object HiveConnectorHelper {
+
+ type HiveSessionCatalog = org.apache.spark.sql.hive.HiveSessionCatalog
+ type HiveMetastoreCatalog = org.apache.spark.sql.hive.HiveMetastoreCatalog
+ type HiveExternalCatalog = org.apache.spark.sql.hive.HiveExternalCatalog
+ type BucketSpecHelper = org.apache.spark.sql.connector.catalog.CatalogV2Implicits.BucketSpecHelper
+ type NextIterator[U] = org.apache.spark.util.NextIterator[U]
+ val logicalExpressions: LogicalExpressions.type = LogicalExpressions
+ val hiveClientImpl: HiveClientImpl.type = HiveClientImpl
+ val catalogV2Util: CatalogV2Util.type = CatalogV2Util
+ val hiveTableUtil: HiveTableUtil.type = HiveTableUtil
+ val hiveShim: HiveShim.type = HiveShim
+ val inputFileBlockHolder: InputFileBlockHolder.type = InputFileBlockHolder
+ val hadoopTableReader: HadoopTableReader.type = HadoopTableReader
+
+ def postExternalCatalogEvent(sc: SparkContext, event: ExternalCatalogEvent): Unit = {
+ sc.listenerBus.post(event)
+ }
+
+ implicit class TransformHelper(transforms: Seq[Transform]) {
+ def convertTransforms: (Seq[String], Option[BucketSpec]) = {
+ val identityCols = new mutable.ArrayBuffer[String]
+ var bucketSpec = Option.empty[BucketSpec]
+
+ transforms.map {
+ case IdentityTransform(FieldReference(Seq(col))) =>
+ identityCols += col
+
+ case BucketTransform(numBuckets, col, sortCol) =>
+ if (bucketSpec.nonEmpty) {
+ throw new UnsupportedOperationException("Multiple bucket transforms are not supported.")
+ }
+ if (sortCol.isEmpty) {
+ bucketSpec = Some(BucketSpec(numBuckets, col.map(_.fieldNames.mkString(".")), Nil))
+ } else {
+ bucketSpec = Some(BucketSpec(
+ numBuckets,
+ col.map(_.fieldNames.mkString(".")),
+ sortCol.map(_.fieldNames.mkString("."))))
+ }
+
+ case transform =>
+ throw new UnsupportedOperationException(
+ s"Unsupported partition transform: $transform")
+ }
+
+ (identityCols.toSeq, bucketSpec)
+ }
+ }
+
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
new file mode 100644
index 000000000..f56b7577d
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
+
+class HiveCatalogSuite extends KyuubiFunSuite {
+
+ test("get catalog name") {
+ val sparkConf = new SparkConf()
+ .setMaster("local[*]")
+ .set("spark.ui.enabled", "false")
+ .set("spark.sql.catalogImplementation", "hive")
+ .set("spark.sql.catalog.v2hive", classOf[HiveTableCatalog].getName)
+ withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
+ val catalog = new HiveTableCatalog
+ val catalogName = "v2hive"
+ catalog.initialize(catalogName, CaseInsensitiveStringMap.empty())
+ assert(catalog.name() == catalogName)
+ }
+ }
+
+ test("supports namespaces") {
+ val sparkConf = new SparkConf()
+ .setMaster("local[*]")
+ .set("spark.ui.enabled", "false")
+ .set("spark.sql.catalogImplementation", "hive")
+ .set("spark.sql.catalog.v2hive_namespaces", classOf[HiveTableCatalog].getName)
+ withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
+ try {
+ spark.sql("USE v2hive_namespaces")
+ spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1")
+ assert(spark.sql(s"SHOW NAMESPACES").collect().length == 2)
+ } finally {
+ spark.sql("DROP NAMESPACE IF EXISTS ns1")
+ }
+ }
+ }
+
+ test("nonexistent table") {
+ val sparkConf = new SparkConf()
+ .setMaster("local[*]")
+ .set("spark.ui.enabled", "false")
+ .set("spark.sql.catalogImplementation", "hive")
+ .set("spark.sql.catalog.v2hive", classOf[HiveTableCatalog].getName)
+ withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
+ val exception = intercept[AnalysisException] {
+ spark.table("v2hive.ns1.nonexistent_table")
+ }
+ assert(exception.message === "Table or view not found: v2hive.ns1.nonexistent_table")
+ }
+ }
+}
diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorSuite.scala
new file mode 100644
index 000000000..090558383
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorSuite.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.hive
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.spark.connector.common.LocalSparkSession.withSparkSession
+
+class HiveConnectorSuite extends KyuubiFunSuite {
+
+ def withTempTable(spark: SparkSession, table: String)(f: => Unit): Unit = {
+ spark.sql(
+ s"""
+ | CREATE TABLE IF NOT EXISTS
+ | $table (id String, date String)
+ | USING PARQUET
+ | PARTITIONED BY (date)
+ |""".stripMargin).collect()
+ try f
+ finally spark.sql(s"DROP TABLE $table")
+ }
+
+ test("simple query") {
+ val sparkConf = new SparkConf()
+ .setMaster("local[*]")
+ .set("spark.ui.enabled", "false")
+ .set("spark.sql.catalogImplementation", "hive")
+ .set("spark.sql.catalog.hivev2", classOf[HiveTableCatalog].getName)
+ withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
+ val table = "default.employee"
+ withTempTable(spark, table) {
+ spark.sql(
+ s"""
+ | INSERT INTO
+ | $table
+ | VALUES("yi", "2022-08-08")
+ |""".stripMargin).collect()
+
+ // can query an existing Hive table in three sections
+ val result = spark.sql(
+ s"""
+ | SELECT * FROM hivev2.$table
+ |""".stripMargin)
+ assert(result.collect().head == Row("yi", "2022-08-08"))
+
+ // error msg should contains catalog info if table is not exist
+ val e = intercept[AnalysisException] {
+ spark.sql(
+ s"""
+ | SELECT * FROM hivev2.ns1.tb1
+ |""".stripMargin)
+ }
+ assert(e.getMessage().contains("Table or view not found: hivev2.ns1.tb1"))
+ }
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index cb81f5fe4..ea6484897 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2185,6 +2185,7 @@
<modules>
<module>extensions/spark/kyuubi-extension-spark-common</module>
<module>extensions/spark/kyuubi-extension-spark-3-3</module>
+ <module>extensions/spark/kyuubi-spark-connector-hive</module>
<module>extensions/spark/kyuubi-spark-connector-kudu</module>
</modules>
</profile>