You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/26 11:33:23 UTC

[GitHub] [hudi] rafcis02 opened a new issue, #5694: [SUPPORT] Querying Hudi Timeline causes Hive sync failure with AWS Glue Catalog

rafcis02 opened a new issue, #5694:
URL: https://github.com/apache/hudi/issues/5694

   **Describe the problem you faced**
   
   I've been struggling with the failing synchronization with Glue Catalog. I have the process(AWS Glue Job) which reads from Hudi table and then writes to the Huid table as well. Data are being properly saved into S3 bucket but it fails on the Hive table synchronization - it is trying to create database and table that already exist in Glue Catalog. That started happening when we added a code which query Hudi table timeline - without that peace of code it works fine.
   
   **To Reproduce**
   You need to have already existing Hudi table on S3 bucket with the table in AWS Glue Catalog
   Steps to reproduce the behavior:
   
   1. Query the Hudi table timeline e.g. to get inflight instants
   2. Read the data from Hudi table
   3. Upsert data to Hudi table
   
   Here you have the Glue Job script which can use to reproduce it - you need to provide S3 bucket and Glue database name(it cannot be default - you have to create separate database for that). Script contains the part to initialize the table on S3 with sample data. It has a 3 scenarios:
   * scenario 1 (job result: SUCCESS) - without querying table timeline. Just to make sure it works properly.
   * scenario 2 (job result: FAILURE) - with querying the table timeline before the reading data from table.
   * scenario 3 (job result: SUCCESS) - same like scenario 2 but it invoke the `Hive.closeCurrent()` method before upserting data 
   
   Each step run as a separate Glue Job Run(including initializing the data sample)
   
   ```scala
   import org.apache.hadoop.hive.ql.metadata.Hive
   import org.apache.hudi.DataSourceReadOptions
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.common.fs.FSUtils
   import org.apache.hudi.common.table.HoodieTableMetaClient
   import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   import org.apache.hudi.hive.ddl.HiveSyncMode
   import org.apache.hudi.hive.util.ConfigUtils
   import org.apache.hudi.keygen.ComplexKeyGenerator
   import org.apache.log4j.{Level, Logger}
   import org.apache.spark.sql.hudi.HoodieOptionConfig.{SQL_KEY_PRECOMBINE_FIELD, SQL_KEY_TABLE_PRIMARY_KEY, SQL_KEY_TABLE_TYPE}
   import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSparkSessionExtension}
   import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
   
   import java.sql.Timestamp
   import java.time.Instant
   import scala.collection.JavaConverters.{asScalaIteratorConverter, mapAsJavaMapConverter}
   
   
   object TestJob {
   
     val s3BucketName = "bucketName"
     val glueDatabaseName = "databaseName" // cannot be default as for default it does not fails
     val tableName = "test"
     val tablePath = s"s3://$s3BucketName/$tableName"
   
     val spark: SparkSession = SparkSession.builder()
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .withExtensions(new HoodieSparkSessionExtension())
       .enableHiveSupport()
       .getOrCreate()
   
     def main(args: Array[String]): Unit = {
       import spark.implicits._
       Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
   
   //     init the table on S3 with sample data
       writeDF(List(
         (1, "first", 2020, Timestamp.from(Instant.now())),
         (2, "second", 2020, Timestamp.from(Instant.now())),
         (3, "third", 2020, Timestamp.from(Instant.now())),
         (4, "fourth", 2020, Timestamp.from(Instant.now())),
         (5, "fifth", 2020, Timestamp.from(Instant.now()))
       ).toDF("id", "description", "year", "mod_date"))
   
   //    scenario1WithoutHoodieTableMetaClient()
   //    scenario2WithHoodieTableMetaClient()
   //    scenario3WithHoodieTableMetaClientAndHiveCloseCurrent()
     }
   
     def writeDF(df: DataFrame): Unit = {
       new HudiUpsertTarget(
         tableName = tableName,
         databaseName = glueDatabaseName,
         s3BucketName = s3BucketName,
         partitionColumns = List("year"),
         recordKeyColumns = List("id"),
         preCombineKeyColumn = "mod_date"
       ).writeEntity(df)
     }
   
     def readTable(): DataFrame = {
       spark.read
         .format("org.apache.hudi")
         .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
         .load(tablePath)
     }
   
     def queryHudiTimeline(): Unit = {
       val fs = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration)
       val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
       val ongoingCommits = metaClient.getActiveTimeline.getCommitsTimeline
         .filterInflightsAndRequested()
         .getInstants.iterator().asScala.toList
       println(ongoingCommits)
     }
   
     def scenario1WithoutHoodieTableMetaClient(): Unit = {
       val sourceDF = readTable()
       writeDF(sourceDF)
     }
   
     def scenario2WithHoodieTableMetaClient(): Unit = {
       queryHudiTimeline()
       val sourceDF = readTable()
       writeDF(sourceDF)
     }
   
     def scenario3WithHoodieTableMetaClientAndHiveCloseCurrent(): Unit = {
       queryHudiTimeline()
       val sourceDF = readTable()
       Hive.closeCurrent()
       writeDF(sourceDF)
     }
   }
   
   
   trait HudiWriterOptions {
   
     def tableName: String
   
     def databaseName: String
   
     def s3BucketName: String
   
     def fullTableName: String = s"$databaseName.$tableName"
   
     def s3LocationPath: String = s"s3://$s3BucketName/$tableName/"
   
     def partitionColumns: List[String]
   
     def recordKeyColumns: List[String]
   
     def preCombineKeyColumn: String
   
     def hudiTableOptions = Map(
       TABLE_TYPE.key() -> COW_TABLE_TYPE_OPT_VAL,
       OPERATION.key() -> UPSERT_OPERATION_OPT_VAL,
       TBL_NAME.key() -> tableName,
       RECORDKEY_FIELD.key() -> recordKeyColumns.mkString(","),
       PARTITIONPATH_FIELD.key() -> partitionColumns.mkString(","),
       KEYGENERATOR_CLASS_NAME.key() -> classOf[ComplexKeyGenerator].getName,
       PRECOMBINE_FIELD.key() -> preCombineKeyColumn,
       URL_ENCODE_PARTITIONING.key() -> "false"
     )
   
     def sqlTableOptions = Map(
       SQL_KEY_TABLE_TYPE.sqlKeyName -> HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW,
       SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName -> hudiTableOptions(RECORDKEY_FIELD.key()),
       SQL_KEY_PRECOMBINE_FIELD.sqlKeyName -> hudiTableOptions(PRECOMBINE_FIELD.key())
     )
   
     def hiveTableOptions = Map(
       HIVE_SYNC_MODE.key() -> HiveSyncMode.HMS.name(),
       HIVE_SYNC_ENABLED.key() -> "true",
       HIVE_DATABASE.key() -> databaseName,
       HIVE_TABLE.key() -> hudiTableOptions(TBL_NAME.key()),
       HIVE_PARTITION_FIELDS.key() -> hudiTableOptions(PARTITIONPATH_FIELD.key()),
       HIVE_PARTITION_EXTRACTOR_CLASS.key() -> classOf[MultiPartKeysValueExtractor].getName,
       HIVE_STYLE_PARTITIONING.key() -> "true",
       HIVE_SUPPORT_TIMESTAMP_TYPE.key() -> "true",
       HIVE_TABLE_SERDE_PROPERTIES.key() -> ConfigUtils.configToString(sqlTableOptions.asJava)
     )
   
     def writerOptions: Map[String, String] = hudiTableOptions ++ hiveTableOptions
   }
   
   class HudiUpsertTarget(override val tableName: String,
                          override val databaseName: String,
                          override val s3BucketName: String,
                          override val partitionColumns: List[String],
                          override val recordKeyColumns: List[String],
                          override val preCombineKeyColumn: String
                         ) extends HudiWriterOptions {
   
     lazy val spark: SparkSession = SparkSession.active
   
     def writeEntity(dataFrame: DataFrame): Unit = {
       if (!spark.catalog.tableExists(databaseName, tableName)) {
         dataFrame.write
           .format("org.apache.hudi")
           .options(writerOptions)
           .mode(SaveMode.Append)
           .save(s3LocationPath)
       } else {
         upsertData(dataFrame)
       }
     }
   
     private def upsertData(dataFrame: DataFrame) = {
       val mergeOnStatement = recordKeyColumns
         .map(k => s"hudi_target.$k = hudi_input.$k")
         .mkString(" AND ")
   
       val mergeIntoStatement =
         s"""MERGE INTO $databaseName.$tableName AS hudi_target
            | USING (SELECT * FROM source_data_set) hudi_input
            | ON $mergeOnStatement
            | WHEN MATCHED THEN UPDATE SET *
            | WHEN NOT MATCHED THEN INSERT *
            |""".stripMargin
   
       dataFrame.createTempView("source_data_set")
       spark.sql(mergeIntoStatement)
       spark.catalog.dropTempView("source_data_set")
     }
   }
   
   ```
   
   **Expected behavior**
   
   Hive sync should be successful and job should complete without failure - it should detect that the database and table already exists in Glue catalog and there is no any schema change.
   
   **Environment Description**
   
   * Running on AWS Glue 2.0
   
   * Hudi version : 0.10.1
   
   * Spark version : 2.4
   
   * Storage (HDFS/S3/GCS..) : S3
   
   **Additional context**
   
   Comparing Scenario 1 and 3 to Scenario 3 I noticed that in Scenario 3 it does not create the AWS Glue Client to sync the table metada. Here is the log that I can find for scenario 1 and 3 but not for 2:
   ```
    metastore.AWSGlueClientFactory (AWSGlueClientFactory.java:newClient(55)): Setting glue service endpoint to https://glue.eu-west-1.amazonaws.com
   ```
   
   **Stacktrace**
   
   ```
   2022-05-26 11:23:08,800 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(91)): Exception in User Class
   org.apache.hudi.exception.HoodieException: Got runtime exception when hive syncing test
   	at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:118)
   	at org.apache.hudi.HoodieSparkSqlWriter$.org$apache$hudi$HoodieSparkSqlWriter$$syncHive(HoodieSparkSqlWriter.scala:539)
   	at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:595)
   	at org.apache.hudi.HoodieSparkSqlWriter$$anonfun$metaSync$2.apply(HoodieSparkSqlWriter.scala:591)
   	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
   	at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:591)
   	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:664)
   	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:284)
   	at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.executeUpsert(MergeIntoHoodieTableCommand.scala:285)
   	at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:155)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
   	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
   	at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
   	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
   	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
   	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
   	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
   	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
   	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
   	at HudiUpsertTarget.upsertData(test.scala:185)
   	at HudiUpsertTarget.writeEntity(test.scala:167)
   	at TestJob$.writeDF(test.scala:60)
   	at TestJob$.scenario2WithHoodieTableMetaClient(test.scala:87)
   	at TestJob$.main(test.scala:48)
   	at TestJob.main(test.scala)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at com.amazonaws.services.glue.SparkProcessLauncherPlugin$class.invoke(ProcessLauncher.scala:48)
   	at com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:78)
   	at com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:142)
   	at com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:30)
   	at com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)
   Caused by: org.apache.hudi.hive.HoodieHiveSyncException: failed to create table test
   	at org.apache.hudi.hive.ddl.HMSDDLExecutor.createTable(HMSDDLExecutor.java:129)
   	at org.apache.hudi.hive.HoodieHiveClient.createTable(HoodieHiveClient.java:213)
   	at org.apache.hudi.hive.HiveSyncTool.syncSchema(HiveSyncTool.java:243)
   	at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:184)
   	at org.apache.hudi.hive.HiveSyncTool.doSync(HiveSyncTool.java:129)
   	at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:115)
   	... 37 more
   Caused by: InvalidObjectException(message:databaseName)
   	at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1453)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
   	at com.sun.proxy.$Proxy63.create_table_with_environment_context(Unknown Source)
   	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.create_table_with_environment_context(HiveMetaStoreClient.java:2050)
   	at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.create_table_with_environment_context(SessionHiveMetaStoreClient.java:97)
   	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:669)
   	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:657)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:152)
   	at com.sun.proxy.$Proxy64.createTable(Unknown Source)
   	at org.apache.hudi.hive.ddl.HMSDDLExecutor.createTable(HMSDDLExecutor.java:126)
   	... 42 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] rafcis02 commented on issue #5694: [SUPPORT] Querying Hudi Timeline causes Hive sync failure with AWS Glue Catalog

Posted by GitBox <gi...@apache.org>.
rafcis02 commented on issue #5694:
URL: https://github.com/apache/hudi/issues/5694#issuecomment-1208156812

   @alexeykudinkin 
   **Environment Description**
   
   * Running on AWS Glue 2.0
   
   * Hudi version : 0.10.1
   
   * Spark version : 2.4
   
   * Storage (HDFS/S3/GCS..) : S3
   
   The same issue occur using Hudi 0.11.0 but it works fine for 0.11.1. So it seems it has been somehow fixed with the latest release
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on issue #5694: [SUPPORT] Querying Hudi Timeline causes Hive sync failure with AWS Glue Catalog

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on issue #5694:
URL: https://github.com/apache/hudi/issues/5694#issuecomment-1208379071

   @xushiyan can you please help close this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on issue #5694: [SUPPORT] Querying Hudi Timeline causes Hive sync failure with AWS Glue Catalog

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on issue #5694:
URL: https://github.com/apache/hudi/issues/5694#issuecomment-1207102827

   @rafcis02 can also specify your Glue/Hive configs you're using? 
   
   Did you try to run it w/ 0.11.1? I was not able to reproduce your issue with the code-snippet you've pasted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan closed issue #5694: [SUPPORT] Querying Hudi Timeline causes Hive sync failure with AWS Glue Catalog

Posted by GitBox <gi...@apache.org>.
xushiyan closed issue #5694: [SUPPORT] Querying Hudi Timeline causes Hive sync failure with AWS Glue Catalog
URL: https://github.com/apache/hudi/issues/5694


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on issue #5694: [SUPPORT] Querying Hudi Timeline causes Hive sync failure with AWS Glue Catalog

Posted by GitBox <gi...@apache.org>.
xushiyan commented on issue #5694:
URL: https://github.com/apache/hudi/issues/5694#issuecomment-1204626523

   > @xushiyan : recently we had a fix around aws glue client factory right. is that related to this issue?
   
   no recent fix which resolved the Glue catalog connection issue is not relevant to this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5694: [SUPPORT] Querying Hudi Timeline causes Hive sync failure with AWS Glue Catalog

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5694:
URL: https://github.com/apache/hudi/issues/5694#issuecomment-1149299485

   @xushiyan : recently we had a fix around aws glue client factory right. is that related to this issue? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org