You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "soumilshah1995 (via GitHub)" <gi...@apache.org> on 2023/03/16 13:14:13 UTC
[GitHub] [hudi] soumilshah1995 opened a new issue, #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
soumilshah1995 opened a new issue, #8207:
URL: https://github.com/apache/hudi/issues/8207
i am trying to implement consistent hashing and example code given on website on release notes
```
""""
:Consistent Hashing
Hudi supports Upsert operation to de-duplicate records in a table, which depends on indexing schemes to perform record location lookup. Among many index options, bucket index (in progress, RFC-29) achieves promising Upsert performance, around ~3x improvement on throughput compared to using Bloom Filter. However, it requires pre-configure a fixed bucket number and cannot be changed afterwards. Combined with the design of one-one mapping between hash buckets and file groups, hudi tables with bucket index have some practical issues, such as data skew and unlimited file group size, which now can only be resolved by resetting a suitable bucket number through re-writing the whole table.
Problems can be solved by introducing Consistent Hashing Index. It achieves bucket resizing by splitting or merging several local buckets (i.e., only large file groups) while leaving most buckets untouched. This feature allows us to adjust bucket number dynamically in a background service with minimal impacts on downstream systems relying on Hudi. For example, concurrent readers and writers are not blocked during the resizing.
"""
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
import datetime
except Exception as e:
pass
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
import faker
import uuid
global faker
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data(samples):
return [
(
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
str(faker.random_int(min=10000, max=150000)),
str(faker.random_int(min=18, max=60)),
str(faker.random_int(min=0, max=100000)),
str(faker.unix_time()),
faker.email(),
faker.credit_card_number(card_type='amex'),
faker.year(),
faker.month()
) for x in range(samples)
]
db_name = "hudidb"
table_name = "hudi_bucket_consistent_hasing_index"
recordkey = 'emp_id,state'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "ts"
method = 'upsert'
table_type = "MERGE_ON_READ"
BUCKET_INDEX_HASH_FEILD = 'state'
PARTITION_FIELD = 'year'
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine
,"hoodie.index.type":"BUCKET"
,"hoodie.index.bucket.engine" : 'CONSISTENT_HASHING'
,'hoodie.bucket.index.max.num.buckets':128
,'hoodie.bucket.index.min.num.buckets':32
,"hoodie.bucket.index.num.buckets":4
## do split if the bucket size reach 1.5 * max_file_size
,"hoodie.bucket.index.split.threshold":1.5
## do merge if the bucket size smaller than 0.2 * max_file_size
,"hoodie.bucket.index.merge.threshold": 0.1
,"hoodie.datasource.write.partitionpath.field":PARTITION_FIELD
,"hoodie.clustering.inline":"true"
,"hoodie.clustering.inline.max.commit":2
,"hoodie.clustering.inline.max.commits":2
,"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824"
,"hoodie.clustering.plan.strategy.small.file.limit":"629145600"
,"hoodie.clustering.plan.strategy.class":"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy"
,"hoodie.clustering.execution.strategy.class":"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy"
,"hoodie.clustering.updates.strategy":"org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy"
,"hoodie.clean.automatic": "true"
, "hoodie.clean.async": "true"
, "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS'
, "hoodie.cleaner.fileversions.retained": "3"
, "hoodie-conf hoodie.cleaner.parallelism": '200'
, 'hoodie.cleaner.commits.retained': 2
}
data = DataGenerator.get_data(1000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
start = datetime.datetime.now()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
end = datetime.datetime.now()
print(f"Execution Time {end-start}")
""""
### Consistent Hashing Index is still an evolving feature and currently there are some limitations to use it as of 0.13.0:
###### This index is supported only for Spark engine using a MOR table.
* It does not work with metadata table enabled.
* To scale up or shrink the buckets, users have to manually trigger clustering using above configs (at some cadence), but they cannot have compaction concurrently running.
So, if compaction is enabled with your regular write pipeline, please follow this recommendation: You can choose to trigger the scale/shrink once every 12 hours. In such cases, once every 12 hours, you might need to disable compaction, stop your write pipeline and enable clustering. You should take extreme care to not run both concurrently because it might result in conflicts and a failed pipeline. Once clustering is complete, you can resume your regular write pipeline, which will have compaction enabled.
"""
```
#### Code works fine if i ran it once i ran the same code again it throws error
```
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<timed exec> in <module>
~\Anaconda3\lib\site-packages\pyspark\sql\readwriter.py in save(self, path, format, mode, partitionBy, **options)
966 self._jwrite.save()
967 else:
--> 968 self._jwrite.save(path)
969
970 @since(1.4)
~\Anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
1319
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1323
~\Anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
188 def deco(*a: Any, **kw: Any) -> Any:
189 try:
--> 190 return f(*a, **kw)
191 except Py4JJavaError as e:
192 converted = convert_exception(e.java_exception)
~\Anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o129.save.
: java.util.concurrent.CompletionException: org.apache.hudi.exception.HoodieClusteringException: Not implement yet
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1840)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: org.apache.hudi.exception.HoodieClusteringException: Not implement yet
at org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy.performClusteringWithRecordsAsRow(SparkConsistentBucketClusteringExecutionStrategy.java:67)
at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:249)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
... 6 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1473207550
maybe try `upsert` instead of `append`. Consistent hashing BUCKET does not support spark row interface.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476521258
any idea why i am seeing error ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476357236
### Complete Code for test cases
```
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
import datetime
except Exception as e:
pass
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
db_name = "hudidb"
table_name = "hudi_bucket_consistent_hasing_test"
recordkey = 'emp_id,state'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "ts"
method = 'upsert'
table_type = "MERGE_ON_READ"
BUCKET_INDEX_HASH_FEILD = 'state'
PARTITION_FIELD = 'year'
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine
,"hoodie.datasource.write.row.writer.enable":"false"
,"hoodie.index.type":"BUCKET"
,"hoodie.index.bucket.engine" : 'CONSISTENT_HASHING'
,'hoodie.bucket.index.max.num.buckets':128
,'hoodie.bucket.index.min.num.buckets':32
,"hoodie.bucket.index.num.buckets":4
## do split if the bucket size reach 1.5 * max_file_size
,"hoodie.bucket.index.split.threshold":1.5
## do merge if the bucket size smaller than 0.2 * max_file_size
,"hoodie.bucket.index.merge.threshold": 0.1
,"hoodie.datasource.write.partitionpath.field":PARTITION_FIELD
,"hoodie.clustering.inline":"true"
,"hoodie.clustering.inline.max.commit":2
,"hoodie.clustering.inline.max.commits":2
,"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824"
,"hoodie.clustering.plan.strategy.small.file.limit":"629145600"
,"hoodie.clustering.plan.strategy.class":"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy"
,"hoodie.clustering.execution.strategy.class":"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy"
,"hoodie.clustering.updates.strategy":"org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy"
,"hoodie.clean.automatic": "true"
, "hoodie.clean.async": "true"
, "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS'
, "hoodie.cleaner.fileversions.retained": "3"
, "hoodie-conf hoodie.cleaner.parallelism": '200'
, 'hoodie.cleaner.commits.retained': 2
}
for i in range(1,5):
print("Batch :{} ".format(i))
data = DataGenerator.get_data(1000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
start = datetime.datetime.now()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
end = datetime.datetime.now()
print(f"Execution Time {end-start}")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1473939179
can you share code snippets iam not sure i understand
i already have UPSERT Method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1475446095
Yes, the config you use is correct, which is the same as the one release node posted.
However, Consistent hashing does not support `row` interface, as the error message said. To get around with `row` interface, use upsert mode.
```
for i in range(1,5):
data = DataGenerator.get_data(1000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
start = datetime.datetime.now()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("upsert"). \
save(path)
end = datetime.datetime.now()
print(f"Execution Time {end-start}")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1478470507
Here is Video and Exercise file
https://soumilshah1995.blogspot.com/2023/03/topic-consistent-hashing-rfc-42.html
https://www.youtube.com/watch?v=zN8JOBKXxP0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1477868610
works like charm thanks i will make video for community on YouTube channel
i would want to say thank you for helping me and looking into issue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1475271371
Hi
The i try to preform append First time it works fine next time it throws error
here is complete code
```
try:
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
import datetime
except Exception as e:
pass
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
import faker
import uuid
global faker
faker = Faker()
class DataGenerator(object):
@staticmethod
def get_data(samples):
return [
(
uuid.uuid4().__str__(),
faker.name(),
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
str(faker.random_int(min=10000, max=150000)),
str(faker.random_int(min=18, max=60)),
str(faker.random_int(min=0, max=100000)),
str(faker.unix_time()),
faker.email(),
faker.credit_card_number(card_type='amex'),
faker.year(),
faker.month()
) for x in range(samples)
]
db_name = "hudidb"
table_name = "hudi_bucket_consistent_hasing_index"
recordkey = 'emp_id,state'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "ts"
method = 'upsert'
table_type = "MERGE_ON_READ"
BUCKET_INDEX_HASH_FEILD = 'state'
PARTITION_FIELD = 'year'
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': table_type,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine
,"hoodie.index.type":"BUCKET"
,"hoodie.index.bucket.engine" : 'CONSISTENT_HASHING'
,'hoodie.bucket.index.max.num.buckets':128
,'hoodie.bucket.index.min.num.buckets':32
,"hoodie.bucket.index.num.buckets":4
## do split if the bucket size reach 1.5 * max_file_size
,"hoodie.bucket.index.split.threshold":1.5
## do merge if the bucket size smaller than 0.2 * max_file_size
,"hoodie.bucket.index.merge.threshold": 0.1
,"hoodie.datasource.write.partitionpath.field":PARTITION_FIELD
,"hoodie.clustering.inline":"true"
,"hoodie.clustering.inline.max.commit":2
,"hoodie.clustering.inline.max.commits":2
,"hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824"
,"hoodie.clustering.plan.strategy.small.file.limit":"629145600"
,"hoodie.clustering.plan.strategy.class":"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy"
,"hoodie.clustering.execution.strategy.class":"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy"
,"hoodie.clustering.updates.strategy":"org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy"
,"hoodie.clean.automatic": "true"
, "hoodie.clean.async": "true"
, "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS'
, "hoodie.cleaner.fileversions.retained": "3"
, "hoodie-conf hoodie.cleaner.parallelism": '200'
, 'hoodie.cleaner.commits.retained': 2
}
for i in range(1,5):
data = DataGenerator.get_data(1000)
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card","year", "month"]
spark_df = spark.createDataFrame(data=data, schema=columns)
start = datetime.datetime.now()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
end = datetime.datetime.now()
print(f"Execution Time {end-start}")
```
#### Error Messages
```
Py4JJavaError: An error occurred while calling o180.save.
: java.util.concurrent.CompletionException: org.apache.hudi.exception.HoodieClusteringException: Not implement yet
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1311)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1840)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1806)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: org.apache.hudi.exception.HoodieClusteringException: Not implement yet
at org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy.performClusteringWithRecordsAsRow(SparkConsistentBucketClusteringExecutionStrategy.java:67)
at org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsyncAsRow$6(MultipleSparkJobExecutionStrategy.java:249)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
... 6 more
```
#### please let me know if i am doing something wrong here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1477844690
sure will try it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476354557
i got error
```
Py4JJavaError: An error occurred while calling o621.save.
: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback file:///C:/tmp/hudidb/hudi_bucket_consistent_hasing_index/.hoodie/metadata commits 20230320103632686
at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:823)
at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:727)
at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:711)
at org.apache.hudi.client.BaseHoodieTableServiceClient.rollbackFailedWrites(BaseHoodieTableServiceClient.java:706)
at org.apache.hudi.client.BaseHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(BaseHoodieWriteClient.java:836)
at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:156)
at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:835)
at org.apache.hudi.client.BaseHoodieWriteClient.startCommitWithTime(BaseHoodieWriteClient.java:820)
at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:165)
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:830)
at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:897)
at org.apache.hudi.client.BaseHoodieWriteClient.lambda$writeTableMetadata$1(BaseHoodieWriteClient.java:355)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
at org.apache.hudi.client.BaseHoodieWriteClient.writeTableMetadata(BaseHoodieWriteClient.java:355)
at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:282)
at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:233)
at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:102)
at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:945)
at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:372)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1589)
Caused by: org.apache.hudi.exception.HoodieIOException: Could not delete instant [==>20230320103632686__deltacommit__REQUESTED]
at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:300)
at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deletePending(HoodieActiveTimeline.java:242)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.deleteInflightAndRequestedInstant(BaseRollbackActionExecutor.java:286)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:250)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:114)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:135)
at org.apache.hudi.table.HoodieSparkMergeOnReadTable.rollback(HoodieSparkMergeOnReadTable.java:194)
at org.apache.hudi.client.BaseHoodieTableServiceClient.rollback(BaseHoodieTableServiceClient.java:806)
... 58 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1477817743
adding `hoodie.metadata.enable":"false` works fine in my local env. could u try again with this additional config?
by the way, your initial bucket number `hoodie.bucket.index.num.buckets: 4` is smaller than `hoodie.bucket.index.min.num.buckets=32`. So the min config will be overwritten by the initial bucket number (i.e., 4).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 closed issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 closed issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
URL: https://github.com/apache/hudi/issues/8207
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476193225
Hie ```
mode("upsert"). \
```
i dont think this is valid mode
IllegalArgumentException: Unknown save mode: upsert. Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists', 'default'.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476329201
that worked thanks i will be making a video for community :D
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476323878
sure i will try that now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1477155606
It seems relate to metadata table, which should be disabled in consistent hashing case. Will test your code in my local env and get back to u later today.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1477759103
Thanks a lot :D
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1475187308
```
start = datetime.datetime.now()
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)
```
In the above code you posted, maybe change mode `append` to `upsert`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 closed issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 closed issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
URL: https://github.com/apache/hudi/issues/8207
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476529101
i have attached full code so you can try it at your end if needed @YuweiXiao
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] soumilshah1995 commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476186701
got it let me try
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] YuweiXiao commented on issue #8207: [SUPPORT] Hudi 0.13 Consistent Hashing Issue for MOR Tables
Posted by "YuweiXiao (via GitHub)" <gi...@apache.org>.
YuweiXiao commented on issue #8207:
URL: https://github.com/apache/hudi/issues/8207#issuecomment-1476214336
oh yes... sorry for the mistakes. Did you explicitly enable `hoodie.datasource.write.row.writer.enable`?
could u try putting it into your config and make it false?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org