You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/22 03:53:45 UTC

[GitHub] [hudi] stackfun opened a new issue #1860: [SUPPORT] Issue when querying from Spark Datasource if COW table is being written to at the same time

stackfun opened a new issue #1860:
URL: https://github.com/apache/hudi/issues/1860


   **Describe the problem you faced**
   
   In one pyspark job, I'm appending 10 rows to a COW table in a loop
   In another pyspark job, I'm doing a select count(*) on the same table in another loop.
   
   When querying using the Spark Datasource API, the count is unpredictable, sometimes returning the right amount of rows. 
   When querying using hive, the select count(*) query returns expected results.
   
   **To Reproduce**
   
   I'm running two pyspark jobs simultaneously in GCP using dataproc.
   
   Writer Job
   ```python
   from pyspark.sql import SparkSession, functions
   import time
   
   table_name = "hudi_trips_cow"
   hudi_options = {
       "hoodie.table.name": table_name,
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator",
       "hoodie.datasource.write.partitionpath.field": "continent,country,city",
       "hoodie.datasource.write.table.name": table_name,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.datasource.hive_sync.enable": True,
       "hoodie.datasource.hive_sync.database": "default",
       "hoodie.datasource.hive_sync.table": table_name,
       "hoodie.datasource.hive_sync.username": "hive",
       "hoodie.datasource.hive_sync.password": "hive",
       "hoodie.datasource.hive_sync.jdbcurl": "jdbc:hive2://localhost:10000",
       "hoodie.datasource.hive_sync.partition_fields": "continent,country,city",
       "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
   }
   
   def execute(spark: SparkSession, output_path: str):
       start_time = time.time()
       while time.time() < start_time + 60 * 15:
           df = generate_trips(spark)
           df.write.format("hudi").options(**hudi_options).mode("append").save(output_path)
   
   def generate_trips(spark):
       sc = spark.sparkContext
       dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
       inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
           dataGen.generateInserts(10)
       )
       # split partitionspath, necessary to sync with hive
       df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
       split_col = functions.split(df["partitionpath"], "/")
       df = df.withColumn("continent", split_col.getItem(0))
       df = df.withColumn("country", split_col.getItem(1))
       return df.withColumn("city", split_col.getItem(2))
   
   spark = (
       SparkSession.builder.appName("test")
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .enableHiveSupport()
       .getOrCreate()
   )
   execute(spark, "gs://random-gcs-folder-3adf/hudi-data")
   ```
   
   Reader Job
   ```python
   from pyspark.sql import SparkSession, functions, HiveContext
   from pyspark.sql.functions import col
   import time
   
   def spark_query(spark: SparkSession, input_path: str):
       df = spark.read.format("org.apache.hudi").load(input_path + "/*/*/*/*")
       df.createOrReplaceTempView("trips_spark_temp")
       spark.catalog.refreshTable("trips_spark_temp")
       print("Spark Query:")
       spark.sql("select count(*) from trips_spark_temp").show()
   
   def hive_query(hive_context: HiveContext):
       hudi_trips_table = hive_context.table("default.hudi_trips_cow")
       hudi_trips_table.createOrReplaceTempView("trips_temp")
       hive_context.sql("REFRESH TABLE trips_temp")
       print("Hive Query:")
       hive_context.sql("select count(*) from trips_temp").show()
   
   def execute(spark: SparkSession, input_path: str):
       hive_context = HiveContext(spark.sparkContext)
   
       start_time = time.time()
       while time.time() < start_time + (15 * 60):
           spark_query(spark, input_path)
           hive_query(hive_context)
   
   spark = (
       SparkSession.builder.appName("test")
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .enableHiveSupport()
       .getOrCreate()
   )
   execute(spark, "gs://random-gcs-folder-3adf/hudi-data")
   ```
   Output from Reader Job:
   ```
   Spark Query:
   +--------+
   |count(1)|
   +--------+
   |     545|
   +--------+
   
   Hive Query:
   +--------+
   |count(1)|
   +--------+
   |    1750|
   +--------+
   
   Spark Query:
   +--------+
   |count(1)|
   +--------+
   |     1760|
   +--------+
   
   Hive Query:
   +--------+
   |count(1)|
   +--------+
   |    1760|
   +--------+
   
   ```
   
   **Expected behavior**
   
   Queries using spark datasource API should match the hive queries.
   
   **Environment Description**
   
   * Hudi version : 0.5.3
   
   * Spark version : 2.4.5
   
   * Hive version : 2.3.7
   
   * Hadoop version : 2.10
   
   * Storage (HDFS/S3/GCS..) : GCS
   
   * Running on Docker? (yes/no) : no
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] stackfun commented on issue #1860: [SUPPORT] Issue when querying from Spark Datasource if COW table is being written to at the same time

Posted by GitBox <gi...@apache.org>.
stackfun commented on issue #1860:
URL: https://github.com/apache/hudi/issues/1860#issuecomment-663176816


   I used the setting you recommended, and still get similar results. In this run, I was inserting 200 records in the writer job. 
   ```
   Hive Query: 600
   Spark Query: 777
   Hive Query: 800
   Spark Query: 800
   Hive Query: 800
   Spark Query: 800
   Hive Query: 800
   Spark Query: 800
   Hive Query: 800
   Spark Query: 851
   Hive Query: 1000
   Spark Query: 1000
   ```
   
   I'm refreshing the table before each query, so the table metadata in Spark should be cleared. Does this seem like a bug to you, or is there some other setting that I should try?
   
   I was stress testing Hudi's atomic write feature as our team is determining whether we can use Hudi for an efficient data lake. Directly querying the hive table using Spark SQL seems to work flawlessly, so we're not blocked. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar closed issue #1860: [SUPPORT] Issue when querying from Spark Datasource if COW table is being written to at the same time

Posted by GitBox <gi...@apache.org>.
bvaradar closed issue #1860:
URL: https://github.com/apache/hudi/issues/1860


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #1860: [SUPPORT] Issue when querying from Spark Datasource if COW table is being written to at the same time

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #1860:
URL: https://github.com/apache/hudi/issues/1860#issuecomment-663413173


   I would expect the data to be same across query engines unless there is some caching or GS is not giving consistent listing view.
   
   With Hudi's Spark datasource integration, Hudi reuses spark's parquet Data Source implementation and merely applies file level path filter to pick and choose what files to read. you can do something like select(distinct("_hoodie_file_name")) on both the cases to see if any file is getting missed. You can also run select(max("_hoodie_commit_time") to determine what is the highest committed time and check if they are consistent for checking atomicity. Otherwise, I suggest you can also do similar experiments with Parquet or other datasets. 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] stackfun edited a comment on issue #1860: [SUPPORT] Issue when querying from Spark Datasource if COW table is being written to at the same time

Posted by GitBox <gi...@apache.org>.
stackfun edited a comment on issue #1860:
URL: https://github.com/apache/hudi/issues/1860#issuecomment-662658795


   @bvaradar Thanks for your quick response. I ran the same test but running the hive query first, then the spark query and I'm still seeing similar results. 
   
   ```
   Hive Query:
   +--------+
   |count(1)|
   +--------+
   |    3410|
   +--------+
   
   Spark Query:
   +--------+
   |count(1)|
   +--------+
   |    3410|
   +--------+
   
   Hive Query:
   +--------+
   |count(1)|
   +--------+
   |    3410|
   +--------+
   
   Spark Query:
   +--------+
   |count(1)|
   +--------+
   |       0|
   +--------+
   
   Hive Query:
   +--------+
   |count(1)|
   +--------+
   |    3420|
   +--------+
   
   Spark Query:
   +--------+
   |count(1)|
   +--------+
   |    3420|
   +--------+
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] stackfun commented on issue #1860: [SUPPORT] Issue when querying from Spark Datasource if COW table is being written to at the same time

Posted by GitBox <gi...@apache.org>.
stackfun commented on issue #1860:
URL: https://github.com/apache/hudi/issues/1860#issuecomment-662658795


   I ran the same test but running the hive query first, then the spark query and I'm still seeing similar results.
   
   ```
   Hive Query:
   +--------+
   |count(1)|
   +--------+
   |    3410|
   +--------+
   
   Spark Query:
   +--------+
   |count(1)|
   +--------+
   |    3410|
   +--------+
   
   Hive Query:
   +--------+
   |count(1)|
   +--------+
   |    3410|
   +--------+
   
   Spark Query:
   +--------+
   |count(1)|
   +--------+
   |       0|
   +--------+
   
   Hive Query:
   +--------+
   |count(1)|
   +--------+
   |    3420|
   +--------+
   
   Spark Query:
   +--------+
   |count(1)|
   +--------+
   |    3420|
   +--------+
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #1860: [SUPPORT] Issue when querying from Spark Datasource if COW table is being written to at the same time

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #1860:
URL: https://github.com/apache/hudi/issues/1860#issuecomment-662535162


   @stackfun : You are running spark query first followed by hive query. Between the 2 runs, hudi would have committed the data and that could be the reason you are seeing inconsistent results.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #1860: [SUPPORT] Issue when querying from Spark Datasource if COW table is being written to at the same time

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #1860:
URL: https://github.com/apache/hudi/issues/1860#issuecomment-668667634


   @stackfun : Were you able to figure this out ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #1860: [SUPPORT] Issue when querying from Spark Datasource if COW table is being written to at the same time

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #1860:
URL: https://github.com/apache/hudi/issues/1860#issuecomment-675438762


   Closing this due to inactivity 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bhasudha commented on issue #1860: [SUPPORT] Issue when querying from Spark Datasource if COW table is being written to at the same time

Posted by GitBox <gi...@apache.org>.
bhasudha commented on issue #1860:
URL: https://github.com/apache/hudi/issues/1860#issuecomment-663036038


   I am not sure if this has to do with Spark caching the table metadata. In any case, could you try adding the  conf `spark.sql.hive.convertMetastoreParquet` to false like here - https://hudi.apache.org/docs/docker_demo.html#step-4-b-run-spark-sql-queries and try again ? 
   
   Were you specifically testing the Spark Datasource API ? Since your table is already registered to Hive, directly querying the hive table table using Spark SQL would also work. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org