You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "soumilshah1995 (via GitHub)" <gi...@apache.org> on 2023/03/16 13:14:13 UTC

[GitHub] [hudi] soumilshah1995 opened a new issue, #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

soumilshah1995 opened a new issue, #8207:
URL: https://github.com/apache/hudi/issues/8207

   
   i am trying to implement consistent hashing and example code given on website on release notes 
   
   ```
   """"
   :Consistent Hashing
   Hudi supports Upsert operation to de-duplicate records in a table, which depends on indexing schemes to perform record location lookup. Among many index options, bucket index (in progress, RFC-29) achieves promising Upsert performance, around ~3x improvement on throughput compared to using Bloom Filter. However, it requires pre-configure a fixed bucket number and cannot be changed afterwards. Combined with the design of one-one mapping between hash buckets and file groups, hudi tables with bucket index have some practical issues, such as data skew and unlimited file group size, which now can only be resolved by resetting a suitable bucket number through re-writing the whole table.
   Problems can be solved by introducing Consistent Hashing Index. It achieves bucket resizing by splitting or merging several local buckets (i.e., only large file groups) while leaving most buckets untouched. This feature allows us to adjust bucket number dynamically in a background service with minimal impacts on downstream systems relying on Hudi. For example, concurrent readers and writers are not blocked during the resizing.
   
   """
   
   try:
   
       import os
       import sys
       import uuid
   
       import pyspark
       from pyspark.sql import SparkSession
       from pyspark import SparkConf, SparkContext
       from pyspark.sql.functions import col, asc, desc
       from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
       from pyspark.sql.functions import *
       from pyspark.sql.types import *
       from datetime import datetime
       from functools import reduce
       from faker import Faker
       import datetime
   
   except Exception as e:
       pass
   
   
   SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
   
   
   
   spark = SparkSession.builder \
       .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
       .config('className', 'org.apache.hudi') \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .getOrCreate()
   
   
   
   import faker
   import uuid
   
   global faker
   faker = Faker()
   
   class DataGenerator(object):
   
       @staticmethod
       def get_data(samples):
           return [
               (
                   uuid.uuid4().__str__(),
                   faker.name(),
                   faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                   faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                   str(faker.random_int(min=10000, max=150000)),
                   str(faker.random_int(min=18, max=60)),
                   str(faker.random_int(min=0, max=100000)),
                   str(faker.unix_time()),
                   faker.email(),
                   faker.credit_card_number(card_type='amex'),
                   faker.year(),
                   faker.month()
   
               ) for x in range(samples)
           ]
   
   
   db_name = "hudidb"
   table_name = "hudi_bucket_consistent_hasing_index"
   
   recordkey = 'emp_id,state'
   path = f"file:///C:/tmp/{db_name}/{table_name}"
   precombine = "ts"
   method = 'upsert'
   table_type = "MERGE_ON_READ"
   BUCKET_INDEX_HASH_FEILD = 'state'
   PARTITION_FIELD = 'year'
   
   hudi_options = {
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.table.type': table_type,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': method,
       'hoodie.datasource.write.precombine.field': precombine
   
   
       ,"hoodie.index.type":"BUCKET"
       ,"hoodie.index.bucket.engine" : 'CONSISTENT_HASHING'
       ,'hoodie.bucket.index.max.num.buckets':128
       ,'hoodie.bucket.index.min.num.buckets':32
       ,"hoodie.bucket.index.num.buckets":4
   
       ## do split if the bucket size reach 1.5 * max_file_size
       ,"hoodie.bucket.index.split.threshold":1.5
       ## do merge if the bucket size smaller than 0.2 * max_file_size
       ,"hoodie.bucket.index.merge.threshold": 0.1
       ,"hoodie.datasource.write.partitionpath.field":PARTITION_FIELD
   
   
       ,"hoodie.clustering.inline":"true"
       ,"hoodie.clustering.inline.max.commit":2
       ,"hoodie.clustering.inline.max.commits":2
       ,"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824"
       ,"hoodie.clustering.plan.strategy.small.file.limit":"629145600"
       ,"hoodie.clustering.plan.strategy.class":"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy"
       ,"hoodie.clustering.execution.strategy.class":"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy"
       ,"hoodie.clustering.updates.strategy":"org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy"
   
   
   
       ,"hoodie.clean.automatic": "true"
       , "hoodie.clean.async": "true"
       , "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS'
       , "hoodie.cleaner.fileversions.retained": "3"
       , "hoodie-conf hoodie.cleaner.parallelism": '200'
       , 'hoodie.cleaner.commits.retained': 2
   
   }
   
   
   data = DataGenerator.get_data(1000)
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
   spark_df = spark.createDataFrame(data=data, schema=columns)
   
   
   start = datetime.datetime.now()
   spark_df.write.format("hudi"). \
       options(**hudi_options). \
       mode("append"). \
       save(path)
   end = datetime.datetime.now()
   print(f"Execution Time {end-start}")
   """"
   
   ### Consistent Hashing Index is still an evolving feature and currently there are some limitations to use it as of 0.13.0:
   
   ###### This index is supported only for Spark engine using a MOR table.
   * It does not work with metadata table enabled.
   * To scale up or shrink the buckets, users have to manually trigger clustering using above configs (at some cadence), but they cannot have compaction concurrently running.
   
   So, if compaction is enabled with your regular write pipeline, please follow this recommendation: You can choose to trigger the scale/shrink once every 12 hours. In such cases, once every 12 hours, you might need to disable compaction, stop your write pipeline and enable clustering. You should take extreme care to not run both concurrently because it might result in conflicts and a failed pipeline. Once clustering is complete, you can resume your regular write pipeline, which will have compaction enabled.
   """
   
   ```
   
   #### Code works fine if i ran it once i ran the same code again it throws error 
   ```
   
   ---------------------------------------------------------------------------
   Py4JJavaError                             Traceback (most recent call last)
   <timed exec> in <module>
   
   ~\Anaconda3\lib\site-packages\pyspark\sql\readwriter.py in save(self, path, format, mode, partitionBy, **options)
       966             self._jwrite.save()
       967         else:
   --> 968             self._jwrite.save(path)
       969 
       970     @since(1.4)
   
   ~\Anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
      1319 
      1320         answer = self.gateway_client.send_command(command)
   -> 1321         return_value = get_return_value(
      1322             answer, self.gateway_client, self.target_id, self.name)
      1323 
   
   ~\Anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
       188     def deco(*a: Any, **kw: Any) -> Any:
       189         try:
   --> 190             return f(*a, **kw)
       191         except Py4JJavaError as e:
       192             converted = convert_exception(e.java_exception)
   
   ~\Anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
       324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
       325             if answer[1] == REFERENCE_TYPE:
   --> 326                 raise Py4JJavaError(
       327                     "An error occurred while calling {0}{1}{2}.\n".
       328                     format(target_id, ".", name), value)
   
   Py4JJavaError: An error occurred while calling o129.save.
   : java.util.concurrent.CompletionException: org.apache.hudi.exception.HoodieClusteringException: Not implement yet
   	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
   	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
   	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
   	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
   	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
   	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
   	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1840)
   	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
   	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
   Caused by: org.apache.hudi.exception.HoodieClusteringException: Not implement yet
   	at org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy.performClusteringWithRecordsAsRow(SparkConsistentBucketClusteringExecutionStrategy.java:67)
   	at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:249)
   	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
   	... 6 more
   
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1473207550

   maybe try `upsert` instead of `append`. Consistent hashing BUCKET does not support spark row interface.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476521258

   any idea why i am seeing error ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476357236

   ### Complete Code for test cases 
   
   ```
   try:
   
       import os
       import sys
       import uuid
   
       import pyspark
       from pyspark.sql import SparkSession
       from pyspark import SparkConf, SparkContext
       from pyspark.sql.functions import col, asc, desc
       from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
       from pyspark.sql.functions import *
       from pyspark.sql.types import *
       from datetime import datetime
       from functools import reduce
       from faker import Faker
       import datetime
   
   except Exception as e:
       pass
   
   
   
   SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
   
   
       
   spark = SparkSession.builder \
       .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
       .config('className', 'org.apache.hudi') \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .getOrCreate()
   
   
   
   db_name = "hudidb"
   table_name = "hudi_bucket_consistent_hasing_test"
   
   recordkey = 'emp_id,state'
   path = f"file:///C:/tmp/{db_name}/{table_name}"
   precombine = "ts"
   method = 'upsert'
   table_type = "MERGE_ON_READ" 
   BUCKET_INDEX_HASH_FEILD = 'state'
   PARTITION_FIELD = 'year'
   
   hudi_options = {
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.table.type': table_type,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': method,
       'hoodie.datasource.write.precombine.field': precombine
       ,"hoodie.datasource.write.row.writer.enable":"false"
   
       
       ,"hoodie.index.type":"BUCKET"
       ,"hoodie.index.bucket.engine" : 'CONSISTENT_HASHING'
       ,'hoodie.bucket.index.max.num.buckets':128
       ,'hoodie.bucket.index.min.num.buckets':32 
       ,"hoodie.bucket.index.num.buckets":4
       
       ## do split if the bucket size reach 1.5 * max_file_size
       ,"hoodie.bucket.index.split.threshold":1.5
       ## do merge if the bucket size smaller than 0.2 * max_file_size
       ,"hoodie.bucket.index.merge.threshold": 0.1 
       ,"hoodie.datasource.write.partitionpath.field":PARTITION_FIELD
          
       
       ,"hoodie.clustering.inline":"true"
       ,"hoodie.clustering.inline.max.commit":2
       ,"hoodie.clustering.inline.max.commits":2
       ,"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824"
       ,"hoodie.clustering.plan.strategy.small.file.limit":"629145600"    
       ,"hoodie.clustering.plan.strategy.class":"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy"
       ,"hoodie.clustering.execution.strategy.class":"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy"
       ,"hoodie.clustering.updates.strategy":"org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy"
       
       ,"hoodie.clean.automatic": "true"
       , "hoodie.clean.async": "true"
       , "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS'
       , "hoodie.cleaner.fileversions.retained": "3"
       , "hoodie-conf hoodie.cleaner.parallelism": '200'
       , 'hoodie.cleaner.commits.retained': 2   
   }
   
   
   
   for i in range(1,5):
       print("Batch :{} ".format(i))
       data = DataGenerator.get_data(1000)
       columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
       spark_df = spark.createDataFrame(data=data, schema=columns)
   
   
       start = datetime.datetime.now()
       spark_df.write.format("hudi"). \
           options(**hudi_options). \
           mode("append"). \
           save(path)
       end = datetime.datetime.now()
       print(f"Execution Time {end-start}")
       
       
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1473939179

   can you share code snippets iam not sure i understand 
   i already have UPSERT Method 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1475446095

   Yes, the config you use is correct, which is the same as the one release node posted.
   
   However, Consistent hashing does not support `row` interface, as the error message said. To get around with `row` interface, use upsert mode.
   
   ```
   for i in range(1,5):
           
       data = DataGenerator.get_data(1000)
       columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
       spark_df = spark.createDataFrame(data=data, schema=columns)
       
       
       start = datetime.datetime.now()
       spark_df.write.format("hudi"). \
           options(**hudi_options). \
           mode("upsert"). \
           save(path)
       end = datetime.datetime.now()
       print(f"Execution Time {end-start}")
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1478470507

   Here is Video and Exercise file 
   https://soumilshah1995.blogspot.com/2023/03/topic-consistent-hashing-rfc-42.html
   
   https://www.youtube.com/watch?v=zN8JOBKXxP0
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1477868610

   works like charm thanks i will make video for community on YouTube channel 
   i would want to say thank you for helping me and looking into issue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1475271371

   Hi 
   The i try to preform append First time it works fine next time it throws error 
   here is complete code 
   
   ```
   
   try:
   
       import os
       import sys
       import uuid
   
       import pyspark
       from pyspark.sql import SparkSession
       from pyspark import SparkConf, SparkContext
       from pyspark.sql.functions import col, asc, desc
       from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
       from pyspark.sql.functions import *
       from pyspark.sql.types import *
       from datetime import datetime
       from functools import reduce
       from faker import Faker
       import datetime
   
   except Exception as e:
       pass
   
   SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
   
   
   
   spark = SparkSession.builder \
       .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
       .config('className', 'org.apache.hudi') \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .getOrCreate()
   
   
   import faker
   import uuid
   
   global faker
   faker = Faker()
   
   class DataGenerator(object):
   
       @staticmethod
       def get_data(samples):
           return [
               (
                   uuid.uuid4().__str__(),
                   faker.name(),
                   faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                   faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                   str(faker.random_int(min=10000, max=150000)),
                   str(faker.random_int(min=18, max=60)),
                   str(faker.random_int(min=0, max=100000)),
                   str(faker.unix_time()),
                   faker.email(),
                   faker.credit_card_number(card_type='amex'),
                   faker.year(),
                   faker.month()
   
               ) for x in range(samples)
           ]
   
   
   
       db_name = "hudidb"
   table_name = "hudi_bucket_consistent_hasing_index"
   
   recordkey = 'emp_id,state'
   path = f"file:///C:/tmp/{db_name}/{table_name}"
   precombine = "ts"
   method = 'upsert'
   table_type = "MERGE_ON_READ"
   BUCKET_INDEX_HASH_FEILD = 'state'
   PARTITION_FIELD = 'year'
   
   hudi_options = {
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.table.type': table_type,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': method,
       'hoodie.datasource.write.precombine.field': precombine
   
   
       ,"hoodie.index.type":"BUCKET"
       ,"hoodie.index.bucket.engine" : 'CONSISTENT_HASHING'
       ,'hoodie.bucket.index.max.num.buckets':128
       ,'hoodie.bucket.index.min.num.buckets':32
       ,"hoodie.bucket.index.num.buckets":4
   
       ## do split if the bucket size reach 1.5 * max_file_size
       ,"hoodie.bucket.index.split.threshold":1.5
       ## do merge if the bucket size smaller than 0.2 * max_file_size
       ,"hoodie.bucket.index.merge.threshold": 0.1
       ,"hoodie.datasource.write.partitionpath.field":PARTITION_FIELD
   
   
       ,"hoodie.clustering.inline":"true"
       ,"hoodie.clustering.inline.max.commit":2
       ,"hoodie.clustering.inline.max.commits":2
       ,"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824"
       ,"hoodie.clustering.plan.strategy.small.file.limit":"629145600"
       ,"hoodie.clustering.plan.strategy.class":"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy"
       ,"hoodie.clustering.execution.strategy.class":"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy"
       ,"hoodie.clustering.updates.strategy":"org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy"
   
       ,"hoodie.clean.automatic": "true"
       , "hoodie.clean.async": "true"
       , "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS'
       , "hoodie.cleaner.fileversions.retained": "3"
       , "hoodie-conf hoodie.cleaner.parallelism": '200'
       , 'hoodie.cleaner.commits.retained': 2
   
   }
   
   
   
   for i in range(1,5):
           
       data = DataGenerator.get_data(1000)
       columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
       spark_df = spark.createDataFrame(data=data, schema=columns)
       
       
       start = datetime.datetime.now()
       spark_df.write.format("hudi"). \
           options(**hudi_options). \
           mode("append"). \
           save(path)
       end = datetime.datetime.now()
       print(f"Execution Time {end-start}")
   
   ```
   #### Error Messages
   ```
   Py4JJavaError: An error occurred while calling o180.save.
   : java.util.concurrent.CompletionException: org.apache.hudi.exception.HoodieClusteringException: Not implement yet
   	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
   	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
   	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
   	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
   	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
   	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
   	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1840)
   	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
   	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
   Caused by: org.apache.hudi.exception.HoodieClusteringException: Not implement yet
   	at org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy.performClusteringWithRecordsAsRow(SparkConsistentBucketClusteringExecutionStrategy.java:67)
   	at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:249)
   	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
   	... 6 more
   
   
   ```
   
   
   #### please let me know if i am doing something wrong here 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1477844690

   sure will try it 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476354557

   i got error 
   
   ```
   Py4JJavaError: An error occurred while calling o621.save.
   : org.apache.hudi.exception.HoodieRollbackException: Failed to rollback file:///C:/tmp/hudidb/hudi_bucket_consistent_hasing_index/.hoodie/metadata commits 20230320103632686
   	at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:823)
   	at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:727)
   	at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:711)
   	at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:706)
   	at org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:836)
   	at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:156)
   	at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:835)
   	at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:820)
   	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:165)
   	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:830)
   	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:897)
   	at org.apache.hudi.client.BaseHoodieWriteClient.lambda$writeTableMetadata$1(BaseHoodieWriteClient.java:355)
   	at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
   	at org.apache.hudi.client.BaseHoodieWriteClient.writeTableMetadata(BaseHoodieWriteClient.java:355)
   	at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:282)
   	at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:233)
   	at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:102)
   	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:945)
   	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:372)
   	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
   	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
   	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
   	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
   	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
   	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
   	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
   	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
   	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
   	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
   	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
   	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
   	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
   	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   	at py4j.Gateway.invoke(Gateway.java:282)
   	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   	at py4j.commands.CallCommand.execute(CallCommand.java:79)
   	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
   	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
   	at java.base/java.lang.Thread.run(Thread.java:1589)
   Caused by: org.apache.hudi.exception.HoodieIOException: Could not delete instant [==>20230320103632686__deltacommit__REQUESTED]
   	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:300)
   	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deletePending(HoodieActiveTimeline.java:242)
   	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.deleteInflightAndRequestedInstant(BaseRollbackActionExecutor.java:286)
   	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:250)
   	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:114)
   	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:135)
   	at org.apache.hudi.table.HoodieSparkMergeOnReadTable.rollback(HoodieSparkMergeOnReadTable.java:194)
   	at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:806)
   	... 58 more
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1477817743

   adding `hoodie.metadata.enable":"false` works fine in my local env. could u try again with this additional config?
   
   by the way, your initial bucket number `hoodie.bucket.index.num.buckets: 4` is smaller than `hoodie.bucket.index.min.num.buckets=32`. So the min config will be overwritten by the initial bucket number (i.e., 4).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 closed issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 closed issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
URL: https://github.com/apache/hudi/issues/8207


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476193225

   Hie ```
   mode("upsert"). \
   ```
   
   i dont think this is valid mode 
   IllegalArgumentException: Unknown save mode: upsert. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists', 'default'.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476329201

   that worked thanks i will be making a video for community :D 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476323878

   sure i will try that now 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1477155606

   It seems relate to metadata table, which should be disabled in consistent hashing case. Will test your code in my local env and get back to u later today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1477759103

   Thanks a lot :D 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1475187308

   ```
   start = datetime.datetime.now()
   spark_df.write.format("hudi"). \
       options(**hudi_options). \
       mode("append"). \
       save(path)
   ```
   
   In the above code you posted, maybe change mode `append` to `upsert`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 closed issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 closed issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
URL: https://github.com/apache/hudi/issues/8207


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476529101

   i have attached full code so you can try it at your end if needed @YuweiXiao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476186701

   got it let me try 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables

Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476214336

   oh yes... sorry for the mistakes. Did you explicitly enable `hoodie.datasource.write.row.writer.enable`?
   
   could u try putting it into your config and make it false?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org