You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/10 12:16:38 UTC
[GitHub] [hudi] melin commented on issue #5537: hudi supports custom catalog name, spark_catalog is not mandatory
melin commented on issue #5537:
URL: https://github.com/apache/hudi/issues/5537#issuecomment-1122313372
hack way
```java
@Aspect
public class CatalogManagerAspectj {
private static final Logger LOG = LoggerFactory.getLogger(CatalogManagerAspectj.class);
@Around("execution(org.apache.spark.sql.connector.catalog.CatalogManager.new(..))")
public void aroundCatalogManagerInit(ProceedingJoinPoint pjp) throws Throwable {
SuperiorHoodieCatalog.defaultSessionCatalog_$eq((CatalogPlugin) pjp.getArgs()[0]);
pjp.proceed();
}
}
```
```java
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.hudi.catalog.SuperiorHoodieCatalog.defaultSessionCatalog
import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.types.{StructField, StructType}
import java.util
class SuperiorHoodieCatalog extends HoodieCatalog {
override def name: String = "hudi";
override def defaultNamespace: Array[String] = defaultSessionCatalog.defaultNamespace
override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
} else {
BasicStagedTable(
ident,
asTableCatalog.createTable(ident, schema, partitions, properties),
this)
}
}
override def stageReplace(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_REPLACE)
} else {
asTableCatalog.dropTable(ident)
BasicStagedTable(
ident,
asTableCatalog.createTable(ident, schema, partitions, properties),
this)
}
}
override def stageCreateOrReplace(ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(
ident, this, schema, partitions, properties, TableCreationMode.CREATE_OR_REPLACE)
} else {
try asTableCatalog.dropTable(ident) catch {
case _: NoSuchTableException => // ignore the exception
}
BasicStagedTable(
ident,
asTableCatalog.createTable(ident, schema, partitions, properties),
this)
}
}
override def loadTable(ident: Identifier): Table = {
try {
asTableCatalog.loadTable(ident) match {
case v1: V1Table if sparkAdapter.isHoodieTable(v1.catalogTable) =>
HoodieInternalV2Table(
spark,
v1.catalogTable.location.toString,
catalogTable = Some(v1.catalogTable),
tableIdentifier = Some(ident.toString))
case o => o
}
} catch {
case e: Exception =>
throw e
}
}
override def createTable(ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
createHoodieTable(ident, schema, partitions, properties, Map.empty, Option.empty, TableCreationMode.CREATE)
}
override def tableExists(ident: Identifier): Boolean = asTableCatalog.tableExists(ident)
override def dropTable(ident: Identifier): Boolean = {
val table = loadTable(ident)
table match {
case _: HoodieInternalV2Table =>
DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = false).run(spark)
true
case _ => asTableCatalog.dropTable(ident)
}
}
override def purgeTable(ident: Identifier): Boolean = {
val table = loadTable(ident)
table match {
case _: HoodieInternalV2Table =>
DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = true).run(spark)
true
case _ => asTableCatalog.purgeTable(ident)
}
}
@throws[NoSuchTableException]
@throws[TableAlreadyExistsException]
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
loadTable(oldIdent) match {
case _: HoodieInternalV2Table =>
new AlterHoodieTableRenameCommand(oldIdent.asTableIdentifier, newIdent.asTableIdentifier, false).run(spark)
case _ => asTableCatalog.renameTable(oldIdent, newIdent)
}
}
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
val tableIdent = TableIdentifier(ident.name(), ident.namespace().lastOption)
// scalastyle:off
val table = loadTable(ident) match {
case hoodieTable: HoodieInternalV2Table => hoodieTable
case _ => return asTableCatalog.alterTable(ident, changes: _*)
}
// scalastyle:on
val grouped = changes.groupBy(c => c.getClass)
grouped.foreach {
case (t, newColumns) if t == classOf[AddColumn] =>
AlterHoodieTableAddColumnsCommand(
tableIdent,
newColumns.asInstanceOf[Seq[AddColumn]].map { col =>
StructField(
col.fieldNames()(0),
col.dataType(),
col.isNullable)
}).run(spark)
case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) =>
columnChanges.foreach {
case dataType: UpdateColumnType =>
val colName = UnresolvedAttribute(dataType.fieldNames()).name
val newDataType = dataType.newDataType()
val structField = StructField(colName, newDataType)
AlterHoodieTableChangeColumnCommand(tableIdent, colName, structField).run(spark)
case dataType: UpdateColumnComment =>
val newComment = dataType.newComment()
val colName = UnresolvedAttribute(dataType.fieldNames()).name
val fieldOpt = table.schema().findNestedField(dataType.fieldNames(), includeCollections = true,
spark.sessionState.conf.resolver).map(_._2)
val field = fieldOpt.getOrElse {
throw new AnalysisException(
s"Couldn't find column $colName in:\n${table.schema().treeString}")
}
AlterHoodieTableChangeColumnCommand(tableIdent, colName, field.withComment(newComment)).run(spark)
}
case (t, _) =>
throw new UnsupportedOperationException(s"not supported table change: ${t.getClass}")
}
loadTable(ident)
}
@throws[NoSuchNamespaceException]
override def listTables(namespace: Array[String]): Array[Identifier] = asTableCatalog.listTables(namespace)
override def invalidateTable(ident: Identifier): Unit = {
asTableCatalog.invalidateTable(ident)
}
@throws[NoSuchNamespaceException]
override def listNamespaces: Array[Array[String]] = asNamespaceCatalog.listNamespaces
@throws[NoSuchNamespaceException]
override def listNamespaces(namespace: Array[String]): Array[Array[String]] =
asNamespaceCatalog.listNamespaces(namespace)
override def namespaceExists(namespace: Array[String]): Boolean =
asNamespaceCatalog.namespaceExists(namespace)
@throws[NoSuchNamespaceException]
override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] =
asNamespaceCatalog.loadNamespaceMetadata(namespace)
@throws[NamespaceAlreadyExistsException]
override def createNamespace(namespace: Array[String], metadata: util.Map[String, String]): Unit = {
asNamespaceCatalog.createNamespace(namespace, metadata)
}
@throws[NoSuchNamespaceException]
override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
asNamespaceCatalog.alterNamespace(namespace, changes:_*)
}
@throws[NoSuchNamespaceException]
override def dropNamespace(namespace: Array[String]): Boolean =
asNamespaceCatalog.dropNamespace(namespace)
private def asTableCatalog: TableCatalog = defaultSessionCatalog.asInstanceOf[TableCatalog]
private def asNamespaceCatalog: SupportsNamespaces =
defaultSessionCatalog.asInstanceOf[SupportsNamespaces]
}
object SuperiorHoodieCatalog {
var defaultSessionCatalog: CatalogPlugin = _
}
```
```xml
<?xml version="1.0" encoding="UTF-8" ?>
<aspectj>
<aspects>
<aspect name="com.github.melin.superior.jobserver.extensions.aspectj.CatalogManagerAspectj"/>
</aspects>
<weaver options="-verbose -showWeaveInfo">
<include within="org.apache.spark.sql.connector.catalog..*"/>
</weaver>
<weaver options="-XaddSerialVersionUID"/>
</aspectj>
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org