You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/09 09:05:34 UTC

[GitHub] [hudi] melin opened a new issue, #5537: Can iceberg and hudi catalog exist at the same time?

melin opened a new issue, #5537:
URL: https://github.com/apache/hudi/issues/5537

   ```scala
   val spark = SparkSession.builder().master("local").enableHiveSupport()
         .config("spark.sql.extensions",
           "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions," +
             "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
         .config("spark.sql.catalog.spark_catalog.type", "hive")
   
         .config("spark.sql.catalog.hudi", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
   
         .getOrCreate()
   ```
   
   iceberg catalog name use spark_catalog,hudi catalog name cannot use spark_catalog。
   
   Hudi For Spark 3.2, the additional spark_catalog config is required: --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] asethia commented on issue #5537: hudi supports custom catalog name, spark_catalog is not mandatory

Posted by "asethia (via GitHub)" <gi...@apache.org>.
asethia commented on issue #5537:
URL: https://github.com/apache/hudi/issues/5537#issuecomment-1443872559

   Is any further update on this? If the hack is the solution, what does it take to add it as part of the main code?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on issue #5537: hudi supports custom catalog name, spark_catalog is not mandatory

Posted by GitBox <gi...@apache.org>.
codope commented on issue #5537:
URL: https://github.com/apache/hudi/issues/5537#issuecomment-1160589436

   @melin Did you get a chance to try out the above suggestion by @leesf ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5537: hudi supports custom catalog name, spark_catalog is not mandatory

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5537:
URL: https://github.com/apache/hudi/issues/5537#issuecomment-1302885402

   @melin : gentle ping. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5537: hudi supports custom catalog name, spark_catalog is not mandatory

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5537:
URL: https://github.com/apache/hudi/issues/5537#issuecomment-1123176143

   @YannByron @XuQianJin-Stars : can you folks follow up on this please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] leesf commented on issue #5537: hudi supports custom catalog name, spark_catalog is not mandatory

Posted by GitBox <gi...@apache.org>.
leesf commented on issue #5537:
URL: https://github.com/apache/hudi/issues/5537#issuecomment-1146419871

   @melin I think you can specify `spark_catalog` to `HoodieCatalog` and custom catalog for iceberg catalog for a currently workaround, since Hudi currently do not support custom catalogs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] melin commented on issue #5537: Can iceberg and hudi catalog exist at the same time?

Posted by GitBox <gi...@apache.org>.
melin commented on issue #5537:
URL: https://github.com/apache/hudi/issues/5537#issuecomment-1121163324

   @leesf 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] melin commented on issue #5537: hudi supports custom catalog name, spark_catalog is not mandatory

Posted by GitBox <gi...@apache.org>.
melin commented on issue #5537:
URL: https://github.com/apache/hudi/issues/5537#issuecomment-1122313372

   hack way
   ```java
   @Aspect
   public class CatalogManagerAspectj {
       private static final Logger LOG = LoggerFactory.getLogger(CatalogManagerAspectj.class);
   
       @Around("execution(org.apache.spark.sql.connector.catalog.CatalogManager.new(..))")
       public void aroundCatalogManagerInit(ProceedingJoinPoint pjp) throws Throwable {
           SuperiorHoodieCatalog.defaultSessionCatalog_$eq((CatalogPlugin) pjp.getArgs()[0]);
           pjp.proceed();
       }
   }
   ```
   
   ```java
   import org.apache.spark.sql.AnalysisException
   import org.apache.spark.sql.catalyst.TableIdentifier
   import org.apache.spark.sql.catalyst.analysis._
   import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange, UpdateColumnComment, UpdateColumnType}
   import org.apache.spark.sql.connector.catalog._
   import org.apache.spark.sql.connector.expressions.Transform
   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
   import org.apache.spark.sql.hudi.catalog.SuperiorHoodieCatalog.defaultSessionCatalog
   import org.apache.spark.sql.hudi.command._
   import org.apache.spark.sql.types.{StructField, StructType}
   
   import java.util
   
   class SuperiorHoodieCatalog extends HoodieCatalog {
   
     override def name: String = "hudi";
   
     override def defaultNamespace: Array[String] = defaultSessionCatalog.defaultNamespace
   
     override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
       if (sparkAdapter.isHoodieTable(properties)) {
         HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
       } else {
         BasicStagedTable(
           ident,
           asTableCatalog.createTable(ident, schema, partitions, properties),
           this)
       }
     }
   
     override def stageReplace(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
       if (sparkAdapter.isHoodieTable(properties)) {
         HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_REPLACE)
       } else {
         asTableCatalog.dropTable(ident)
         BasicStagedTable(
           ident,
           asTableCatalog.createTable(ident, schema, partitions, properties),
           this)
       }
     }
   
     override def stageCreateOrReplace(ident: Identifier,
                                       schema: StructType,
                                       partitions: Array[Transform],
                                       properties: util.Map[String, String]): StagedTable = {
       if (sparkAdapter.isHoodieTable(properties)) {
         HoodieStagedTable(
           ident, this, schema, partitions, properties, TableCreationMode.CREATE_OR_REPLACE)
       } else {
         try asTableCatalog.dropTable(ident) catch {
           case _: NoSuchTableException => // ignore the exception
         }
         BasicStagedTable(
           ident,
           asTableCatalog.createTable(ident, schema, partitions, properties),
           this)
       }
     }
   
     override def loadTable(ident: Identifier): Table = {
       try {
         asTableCatalog.loadTable(ident) match {
           case v1: V1Table if sparkAdapter.isHoodieTable(v1.catalogTable) =>
             HoodieInternalV2Table(
               spark,
               v1.catalogTable.location.toString,
               catalogTable = Some(v1.catalogTable),
               tableIdentifier = Some(ident.toString))
           case o => o
         }
       } catch {
         case e: Exception =>
           throw e
       }
     }
   
     override def createTable(ident: Identifier,
                              schema: StructType,
                              partitions: Array[Transform],
                              properties: util.Map[String, String]): Table = {
       createHoodieTable(ident, schema, partitions, properties, Map.empty, Option.empty, TableCreationMode.CREATE)
     }
   
     override def tableExists(ident: Identifier): Boolean = asTableCatalog.tableExists(ident)
   
     override def dropTable(ident: Identifier): Boolean = {
       val table = loadTable(ident)
       table match {
         case _: HoodieInternalV2Table =>
           DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = false).run(spark)
           true
         case _ => asTableCatalog.dropTable(ident)
       }
     }
   
     override def purgeTable(ident: Identifier): Boolean = {
       val table = loadTable(ident)
       table match {
         case _: HoodieInternalV2Table =>
           DropHoodieTableCommand(ident.asTableIdentifier, ifExists = true, isView = false, purge = true).run(spark)
           true
         case _ => asTableCatalog.purgeTable(ident)
       }
     }
   
     @throws[NoSuchTableException]
     @throws[TableAlreadyExistsException]
     override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = {
       loadTable(oldIdent) match {
         case _: HoodieInternalV2Table =>
           new AlterHoodieTableRenameCommand(oldIdent.asTableIdentifier, newIdent.asTableIdentifier, false).run(spark)
         case _ => asTableCatalog.renameTable(oldIdent, newIdent)
       }
     }
   
     override def alterTable(ident: Identifier, changes: TableChange*): Table = {
       val tableIdent = TableIdentifier(ident.name(), ident.namespace().lastOption)
       // scalastyle:off
       val table = loadTable(ident) match {
         case hoodieTable: HoodieInternalV2Table => hoodieTable
         case _ => return asTableCatalog.alterTable(ident, changes: _*)
       }
       // scalastyle:on
   
       val grouped = changes.groupBy(c => c.getClass)
   
       grouped.foreach {
         case (t, newColumns) if t == classOf[AddColumn] =>
           AlterHoodieTableAddColumnsCommand(
             tableIdent,
             newColumns.asInstanceOf[Seq[AddColumn]].map { col =>
               StructField(
                 col.fieldNames()(0),
                 col.dataType(),
                 col.isNullable)
             }).run(spark)
         case (t, columnChanges) if classOf[ColumnChange].isAssignableFrom(t) =>
           columnChanges.foreach {
             case dataType: UpdateColumnType =>
               val colName = UnresolvedAttribute(dataType.fieldNames()).name
               val newDataType = dataType.newDataType()
               val structField = StructField(colName, newDataType)
               AlterHoodieTableChangeColumnCommand(tableIdent, colName, structField).run(spark)
             case dataType: UpdateColumnComment =>
               val newComment = dataType.newComment()
               val colName = UnresolvedAttribute(dataType.fieldNames()).name
               val fieldOpt = table.schema().findNestedField(dataType.fieldNames(), includeCollections = true,
                 spark.sessionState.conf.resolver).map(_._2)
               val field = fieldOpt.getOrElse {
                 throw new AnalysisException(
                   s"Couldn't find column $colName in:\n${table.schema().treeString}")
               }
               AlterHoodieTableChangeColumnCommand(tableIdent, colName, field.withComment(newComment)).run(spark)
           }
         case (t, _) =>
           throw new UnsupportedOperationException(s"not supported table change: ${t.getClass}")
       }
   
       loadTable(ident)
     }
   
     @throws[NoSuchNamespaceException]
     override def listTables(namespace: Array[String]): Array[Identifier] = asTableCatalog.listTables(namespace)
   
   
     override def invalidateTable(ident: Identifier): Unit = {
       asTableCatalog.invalidateTable(ident)
     }
   
     @throws[NoSuchNamespaceException]
     override def listNamespaces: Array[Array[String]] = asNamespaceCatalog.listNamespaces
   
     @throws[NoSuchNamespaceException]
     override def listNamespaces(namespace: Array[String]): Array[Array[String]] =
       asNamespaceCatalog.listNamespaces(namespace)
   
     override def namespaceExists(namespace: Array[String]): Boolean =
       asNamespaceCatalog.namespaceExists(namespace)
   
     @throws[NoSuchNamespaceException]
     override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] =
       asNamespaceCatalog.loadNamespaceMetadata(namespace)
   
     @throws[NamespaceAlreadyExistsException]
     override def createNamespace(namespace: Array[String], metadata: util.Map[String, String]): Unit = {
       asNamespaceCatalog.createNamespace(namespace, metadata)
     }
   
     @throws[NoSuchNamespaceException]
     override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
       asNamespaceCatalog.alterNamespace(namespace, changes:_*)
     }
   
     @throws[NoSuchNamespaceException]
     override def dropNamespace(namespace: Array[String]): Boolean =
       asNamespaceCatalog.dropNamespace(namespace)
   
     private def asTableCatalog: TableCatalog = defaultSessionCatalog.asInstanceOf[TableCatalog]
   
     private def asNamespaceCatalog: SupportsNamespaces =
       defaultSessionCatalog.asInstanceOf[SupportsNamespaces]
   }
   
   object SuperiorHoodieCatalog {
     var defaultSessionCatalog: CatalogPlugin = _
   }
   ```
   
   ```xml
   <?xml version="1.0" encoding="UTF-8" ?>
   <aspectj>
       <aspects>
           <aspect name="com.github.melin.superior.jobserver.extensions.aspectj.CatalogManagerAspectj"/>
       </aspects>
       <weaver options="-verbose -showWeaveInfo">
           <include within="org.apache.spark.sql.connector.catalog..*"/>
       </weaver>
       <weaver options="-XaddSerialVersionUID"/>
   </aspectj>
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5537: hudi supports custom catalog name, spark_catalog is not mandatory

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5537:
URL: https://github.com/apache/hudi/issues/5537#issuecomment-1229360928

   @melin : any updates please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5537: hudi supports custom catalog name, spark_catalog is not mandatory

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5537:
URL: https://github.com/apache/hudi/issues/5537#issuecomment-1302885962

   @YannByron : looks like the author has given some hacky solution. Is there any enhancement we can add to hudi based on that. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org