You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "soumilshah1995 (via GitHub)" <gi...@apache.org> on 2023/04/06 17:30:19 UTC

[GitHub] [hudi] soumilshah1995 opened a new issue, #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

soumilshah1995 opened a new issue, #8400:
URL: https://github.com/apache/hudi/issues/8400

   Subject : Need Help on Compaction Offline for MOR tables 
   
   Good Afternoon and hope you are fine I would want some assistance for next content I am creating on hudi offline compaction for 
   
   MOR tables 
   After searching and reading I would seek some guidance on how to submit offline compaction and if I am missing anything 
   Attaching sample code 
   
   ```
   try:
       import json
       import uuid
       import os
       import boto3
       from dotenv import load_dotenv
   
       load_dotenv(".env")
   except Exception as e:
       pass
   
   global AWS_ACCESS_KEY
   global AWS_SECRET_KEY
   global AWS_REGION_NAME
   
   AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
   AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
   AWS_REGION_NAME = "us-east-1"
   
   client = boto3.client("emr-serverless",
                         aws_access_key_id=AWS_ACCESS_KEY,
                         aws_secret_access_key=AWS_SECRET_KEY,
                         region_name=AWS_REGION_NAME)
   
   
   def lambda_handler_test_emr(event, context):
       # ------------------Hudi settings ---------------------------------------------
       glue_db = "hudi_db"
       table_name = "invoice"
       path = "s3://delta-streamer-demo-hudi/hudi/"
   
       # ---------------------------------------------------------------------------------
       #                                       EMR
       # --------------------------------------------------------------------------------
       ApplicationId = os.getenv("ApplicationId")
       ExecutionTime = 600
       ExecutionArn = os.getenv("ExecutionArn")
       JobName = 'delta_streamer_{}'.format(table_name)
   
       # --------------------------------------------------------------------------------
       spark_submit_parameters = ' --conf spark.jars=/usr/lib/hudi/hudi-utilities-bundle.jar'
       spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor /usr/lib/hudi/hudi-utilities-bundle.jar'
   
       arguments = [
           '--spark-memory', '1g',
           '--parallelism', '2',
           "--mode", "schedule",
           "--base-path", path,
           "--table-name", table_name
       ]
   
       response = client.start_job_run(
           applicationId=ApplicationId,
           clientToken=uuid.uuid4().__str__(),
           executionRoleArn=ExecutionArn,
           jobDriver={
               'sparkSubmit': {
                   'entryPoint': "command-runner.jar",
                   'entryPointArguments': arguments,
                   'sparkSubmitParameters': spark_submit_parameters
               },
           },
           executionTimeoutMinutes=ExecutionTime,
           name=JobName,
       )
       print("response", end="\n")
       print(response)
   
   
   lambda_handler_test_emr(context=None, event=None)
   
   ```
   ![image](https://user-images.githubusercontent.com/39345855/230452917-0cb4019d-4334-406a-bed9-1e9c5efca505.png)
   
   
   # Error
   ```
   Job failed, please check complete logs in configured logging destination. ExitCode: 1. Last few exceptions: Exception in thread "main" org.apache.hudi.com.beust.jcommander.ParameterException: Was passed main parameter 'command-runner.jar' but no main parameter was defined in your arg
   ```
   
   looking fwd for your guidance 
   
   References 
   https://hudi.apache.org/docs/compaction/
   https://github.com/apache/hudi/issues/6903
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1516283337

   Hello i tested it looks like i get different error here are steps 
   
   # Glue Job OK tested 
   ```
   """
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog --conf spark.sql.legacy.pathOptionBehavior.enabled=true
   
   """
   try:
       import sys
       import os
       from pyspark.context import SparkContext
       from pyspark.sql.session import SparkSession
       from awsglue.context import GlueContext
       from awsglue.job import Job
       from awsglue.dynamicframe import DynamicFrame
       from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
       from pyspark.sql.functions import *
       from awsglue.utils import getResolvedOptions
       from pyspark.sql.types import *
       from datetime import datetime, date
       import boto3
       from functools import reduce
       from pyspark.sql import Row
   
       import uuid
       from faker import Faker
   except Exception as e:
       print("Modules are missing : {} ".format(e))
   
   spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
            .config('spark.sql.hive.convertMetastoreParquet', 'false') \
            .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
            .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
            .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())
   
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   logger = glueContext.get_logger()
   
   # =================================INSERTING DATA =====================================
   global faker
   faker = Faker()
   
   
   class DataGenerator(object):
   
       @staticmethod
       def get_data():
           return [
               (
                   x,
                   faker.name(),
                   faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                   faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                   str(faker.random_int(min=10000, max=150000)),
                   str(faker.random_int(min=18, max=60)),
                   str(faker.random_int(min=0, max=100000)),
                   str(faker.unix_time()),
                   faker.email(),
                   faker.credit_card_number(card_type='amex'),
   
               ) for x in range(10000)
           ]
   
   
   # ============================== Settings =======================================
   db_name = "hudidb"
   table_name = "employees"
   recordkey = 'emp_id'
   precombine = "ts"
   PARTITION_FIELD = 'state'
   path = "s3://delta-streamer-demo-hudi/hudi/"
   method = 'upsert'
   table_type = "MERGE_ON_READ"
   # ====================================================================================
   
   hudi_part_write_config = {
       'className': 'org.apache.hudi',
   
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.table.type': table_type,
       'hoodie.datasource.write.operation': method,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.precombine.field': precombine,
       "hoodie.schema.on.read.enable": "true",
       "hoodie.datasource.write.reconcile.schema": "true",
   
       'hoodie.datasource.hive_sync.mode': 'hms',
       'hoodie.datasource.hive_sync.enable': 'true',
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.hive_sync.support_timestamp': 'false',
       'hoodie.datasource.hive_sync.database': db_name,
       'hoodie.datasource.hive_sync.table': table_name
   
       , "hoodie.clean.automatic": "false"
       , "hoodie.clean.async": "false"
       , "hoodie.clustering.async.enabled": "false"
       , "hoodie.metadata.enable": "false"
       , "hoodie.metadata.index.async": "false"
       , "hoodie.metadata.index.column.stats.enable": "false"
       , "hoodie.compact.inline": "false"
       , 'hoodie.compact.schedule.inline': 'true'
   
       , "hoodie.metadata.index.check.timeout.seconds": "60"
       , "hoodie.write.concurrency.mode": "optimistic_concurrency_control"
       , "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
   
   }
   
   for i in range(0, 5):
       data = DataGenerator.get_data()
       columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"]
       spark_df = spark.createDataFrame(data=data, schema=columns)
       spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
   
   ```
   
   # EMR Serverless Job 
   ```
   try:
       import json
       import uuid
       import os
       import boto3
       from dotenv import load_dotenv
   
       load_dotenv("../.env")
   except Exception as e:
       pass
   
   global AWS_ACCESS_KEY
   global AWS_SECRET_KEY
   global AWS_REGION_NAME
   
   AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
   AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
   AWS_REGION_NAME = "us-east-1"
   
   client = boto3.client("emr-serverless",
                         aws_access_key_id=AWS_ACCESS_KEY,
                         aws_secret_access_key=AWS_SECRET_KEY,
                         region_name=AWS_REGION_NAME)
   
   
   def lambda_handler_test_emr(event, context):
       # ============================== Settings =======================================
       db_name = "hudidb"
       table_name = "employees"
       recordkey = 'emp_id'
       precombine = "ts"
       PARTITION_FIELD = 'state'
       path = "s3a://delta-streamer-demo-hudi/hudi/"
       method = 'upsert'
       table_type = "MERGE_ON_READ"
       # ====================================================================================
       # ---------------------------------------------------------------------------------
       #                                       EMR
       # --------------------------------------------------------------------------------
       ApplicationId = os.getenv("ApplicationId")
       ExecutionTime = 600
       ExecutionArn = os.getenv("ExecutionArn")
       JobName = 'delta_streamer_compaction_{}'.format(table_name)
   
       # --------------------------------------------------------------------------------
       spark_submit_parameters = ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
       spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor'
       jar_path = "s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar"
       # schedule | execute | scheduleAndExecute
   
       arguments = [
           '--spark-memory', '5g',
           '--parallelism', '2',
           "--mode", "scheduleAndExecute",
           "--base-path", path,
           "--table-name", table_name
   
       ]
   
       response = client.start_job_run(
           applicationId=ApplicationId,
           clientToken=uuid.uuid4().__str__(),
           executionRoleArn=ExecutionArn,
           jobDriver={
               'sparkSubmit': {
                   'entryPoint': jar_path,
                   'entryPointArguments': arguments,
                   'sparkSubmitParameters': spark_submit_parameters
               },
           },
           executionTimeoutMinutes=ExecutionTime,
           name=JobName,
       )
       print("response", end="\n")
       print(response)
   
   
   lambda_handler_test_emr(context=None, event=None)
   
   ```
   # o/p Logs 
   ```
   23/04/20 12:55:33 INFO SparkContext: Running Spark version 3.3.1-amzn-0
   23/04/20 12:55:33 INFO ResourceUtils: ==============================================================
   23/04/20 12:55:33 INFO ResourceUtils: No custom resources configured for spark.driver.
   23/04/20 12:55:33 INFO ResourceUtils: ==============================================================
   23/04/20 12:55:33 INFO SparkContext: Submitted application: compactor-employees
   23/04/20 12:55:33 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 4, script: , vendor: , memory -> name: memory, amount: 5120, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
   23/04/20 12:55:33 INFO ResourceProfile: Limiting resource is cpus at 4 tasks per executor
   23/04/20 12:55:33 INFO ResourceProfileManager: Added ResourceProfile id: 0
   23/04/20 12:55:33 INFO SecurityManager: Changing view acls to: hadoop
   23/04/20 12:55:33 INFO SecurityManager: Changing modify acls to: hadoop
   23/04/20 12:55:33 INFO SecurityManager: Changing view acls groups to: 
   23/04/20 12:55:33 INFO SecurityManager: Changing modify acls groups to: 
   23/04/20 12:55:33 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
   23/04/20 12:55:33 INFO deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
   23/04/20 12:55:33 INFO deprecation: mapred.output.compression.type is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type
   23/04/20 12:55:33 INFO deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
   23/04/20 12:55:33 INFO Utils: Successfully started service 'sparkDriver' on port 33507.
   23/04/20 12:55:33 INFO SparkEnv: Registering MapOutputTracker
   23/04/20 12:55:33 INFO SparkEnv: Registering BlockManagerMaster
   23/04/20 12:55:33 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
   23/04/20 12:55:33 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
   23/04/20 12:55:33 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
   23/04/20 12:55:33 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-78f669b6-62b6-4ec2-9e2b-b2314a1273fd
   23/04/20 12:55:33 INFO MemoryStore: MemoryStore started with capacity 7.3 GiB
   23/04/20 12:55:33 INFO SparkEnv: Registering OutputCommitCoordinator
   23/04/20 12:55:33 INFO SubResultCacheManager: Sub-result caches are disabled.
   23/04/20 12:55:33 INFO Utils: Successfully started service 'SparkUI' on port 8090.
   23/04/20 12:55:33 INFO SparkContext: Added JAR s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar at s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar with timestamp 1681995333013
   23/04/20 12:55:34 INFO Utils: Using initial executors = 3, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
   23/04/20 12:55:34 INFO ExecutorContainerAllocator: Set total expected execs to {0=3}
   23/04/20 12:55:34 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43873.
   23/04/20 12:55:34 INFO NettyBlockTransferService: Server created on [2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:43873
   23/04/20 12:55:34 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
   23/04/20 12:55:34 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, [2600:1f18:7aa7:3005:405:dccd:66cb:96e9], 43873, None)
   23/04/20 12:55:34 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:43873 with 7.3 GiB RAM, BlockManagerId(driver, [2600:1f18:7aa7:3005:405:dccd:66cb:96e9], 43873, None)
   23/04/20 12:55:34 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, [2600:1f18:7aa7:3005:405:dccd:66cb:96e9], 43873, None)
   23/04/20 12:55:34 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, [2600:1f18:7aa7:3005:405:dccd:66cb:96e9], 43873, None)
   23/04/20 12:55:34 INFO ExecutorContainerAllocator: Going to request 3 executors for ResourceProfile Id: 0, target: 3 already provisioned: 0.
   23/04/20 12:55:34 INFO DefaultEmrServerlessRMClient: Creating containers with container role SPARK_EXECUTOR and keys: Set(1, 2, 3)
   23/04/20 12:55:34 INFO SingleEventLogFileWriter: Logging events to file:/var/log/spark/apps/00f9gpl7uiklu609.inprogress
   23/04/20 12:55:34 INFO Utils: Using initial executors = 3, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
   23/04/20 12:55:34 INFO ExecutorAllocationManager: Dynamic allocation is enabled without a shuffle service.
   23/04/20 12:55:34 INFO ExecutorContainerAllocator: Set total expected execs to {0=3}
   23/04/20 12:55:34 INFO DefaultEmrServerlessRMClient: Containers created with container role SPARK_EXECUTOR. key to container id map: Map(2 -> 9ac3cf5d-cd55-40b5-9c7d-2491d16c9635, 1 -> f6c3cf5d-cd69-2a09-dace-aefa04b3afe6, 3 -> f6c3cf5d-cd5d-e6cb-fc14-9227dfc49d91)
   23/04/20 12:55:38 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:7aa7:3005:2901:ec5e:c073:b610:43210) with ID 3,  ResourceProfileId 0
   23/04/20 12:55:38 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:7aa7:3005:cd0:80eb:6605:593:49078) with ID 2,  ResourceProfileId 0
   23/04/20 12:55:38 INFO ExecutorMonitor: New executor 3 has registered (new total is 1)
   23/04/20 12:55:38 INFO ExecutorMonitor: New executor 2 has registered (new total is 2)
   23/04/20 12:55:39 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:7aa7:3005:2901:ec5e:c073:b610]:38209 with 2.7 GiB RAM, BlockManagerId(3, [2600:1f18:7aa7:3005:2901:ec5e:c073:b610], 38209, None)
   23/04/20 12:55:39 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:7aa7:3005:cd0:80eb:6605:593]:40479 with 2.7 GiB RAM, BlockManagerId(2, [2600:1f18:7aa7:3005:cd0:80eb:6605:593], 40479, None)
   23/04/20 12:55:39 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:7aa7:3005:463c:a178:348b:c839:38048) with ID 1,  ResourceProfileId 0
   23/04/20 12:55:39 INFO ExecutorMonitor: New executor 1 has registered (new total is 3)
   23/04/20 12:55:39 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:7aa7:3005:463c:a178:348b:c839]:45689 with 2.7 GiB RAM, BlockManagerId(1, [2600:1f18:7aa7:3005:463c:a178:348b:c839], 45689, None)
   23/04/20 12:55:39 INFO EmrServerlessClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
   23/04/20 12:55:39 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
   23/04/20 12:55:39 INFO MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
   23/04/20 12:55:39 INFO MetricsSystemImpl: s3a-file-system metrics system started
   23/04/20 12:55:39 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
   23/04/20 12:55:39 WARN HoodieCompactor: No instant time is provided for scheduling compaction.
   23/04/20 12:55:40 WARN HoodieBackedTableMetadata: Metadata table was not found at path s3a://delta-streamer-demo-hudi/hudi/.hoodie/metadata
   23/04/20 12:55:40 WARN HoodieBackedTableMetadata: Metadata table was not found at path s3a://delta-streamer-demo-hudi/hudi/.hoodie/metadata
   23/04/20 12:55:40 INFO SparkContext: Starting job: collect at HoodieSparkEngineContext.java:137
   23/04/20 12:55:40 INFO DAGScheduler: Got job 0 (collect at HoodieSparkEngineContext.java:137) with 1 output partitions
   23/04/20 12:55:40 INFO DAGScheduler: Final stage: ResultStage 0 (collect at HoodieSparkEngineContext.java:137)
   23/04/20 12:55:40 INFO DAGScheduler: Parents of final stage: List()
   23/04/20 12:55:40 INFO DAGScheduler: Missing parents: List()
   23/04/20 12:55:40 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at flatMap at HoodieSparkEngineContext.java:137), which has no missing parents
   23/04/20 12:55:40 INFO ExecutorContainerAllocator: Set total expected execs to {0=1}
   23/04/20 12:55:40 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 107.6 KiB, free 7.3 GiB)
   23/04/20 12:55:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 39.5 KiB, free 7.3 GiB)
   23/04/20 12:55:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on [2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:43873 (size: 39.5 KiB, free: 7.3 GiB)
   23/04/20 12:55:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1570
   23/04/20 12:55:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at flatMap at HoodieSparkEngineContext.java:137) (first 15 tasks are for partitions Vector(0))
   23/04/20 12:55:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
   23/04/20 12:55:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) ([2600:1f18:7aa7:3005:2901:ec5e:c073:b610], executor 3, partition 0, PROCESS_LOCAL, 4393 bytes) taskResourceAssignments Map()
   23/04/20 12:55:42 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on [2600:1f18:7aa7:3005:2901:ec5e:c073:b610]:38209 (size: 39.5 KiB, free: 2.7 GiB)
   23/04/20 12:55:43 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1889 ms on [2600:1f18:7aa7:3005:2901:ec5e:c073:b610] (executor 3) (1/1)
   23/04/20 12:55:43 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
   23/04/20 12:55:43 INFO DAGScheduler: ResultStage 0 (collect at HoodieSparkEngineContext.java:137) finished in 3.089 s
   23/04/20 12:55:43 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
   23/04/20 12:55:43 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
   23/04/20 12:55:43 INFO DAGScheduler: Job 0 finished: collect at HoodieSparkEngineContext.java:137, took 3.150595 s
   23/04/20 12:55:43 INFO ExecutorContainerAllocator: Set total expected execs to {0=0}
   23/04/20 12:55:43 INFO SparkContext: Starting job: collect at HoodieSparkEngineContext.java:103
   23/04/20 12:55:43 INFO DAGScheduler: Got job 1 (collect at HoodieSparkEngineContext.java:103) with 7 output partitions
   23/04/20 12:55:43 INFO DAGScheduler: Final stage: ResultStage 1 (collect at HoodieSparkEngineContext.java:103)
   23/04/20 12:55:43 INFO DAGScheduler: Parents of final stage: List()
   23/04/20 12:55:43 INFO DAGScheduler: Missing parents: List()
   23/04/20 12:55:43 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[3] at map at HoodieSparkEngineContext.java:103), which has no missing parents
   23/04/20 12:55:43 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 107.3 KiB, free 7.3 GiB)
   23/04/20 12:55:43 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 39.5 KiB, free 7.3 GiB)
   23/04/20 12:55:43 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on [2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:43873 (size: 39.5 KiB, free: 7.3 GiB)
   23/04/20 12:55:43 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1570
   23/04/20 12:55:43 INFO DAGScheduler: Submitting 7 missing tasks from ResultStage 1 (MapPartitionsRDD[3] at map at HoodieSparkEngineContext.java:103) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6))
   23/04/20 12:55:43 INFO TaskSchedulerImpl: Adding task set 1.0 with 7 tasks resource profile 0
   23/04/20 12:55:43 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) ([2600:1f18:7aa7:3005:463c:a178:348b:c839], executor 1, partition 0, PROCESS_LOCAL, 4701 bytes) taskResourceAssignments Map()
   23/04/20 12:55:43 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2) ([2600:1f18:7aa7:3005:2901:ec5e:c073:b610], executor 3, partition 1, PROCESS_LOCAL, 4702 bytes) taskResourceAssignments Map()
   23/04/20 12:55:43 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3) ([2600:1f18:7aa7:3005:cd0:80eb:6605:593], executor 2, partition 2, PROCESS_LOCAL, 4702 bytes) taskResourceAssignments Map()
   23/04/20 12:55:43 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 4) ([2600:1f18:7aa7:3005:463c:a178:348b:c839], executor 1, partition 3, PROCESS_LOCAL, 4703 bytes) taskResourceAssignments Map()
   23/04/20 12:55:43 INFO TaskSetManager: Starting task 4.0 in stage 1.0 (TID 5) ([2600:1f18:7aa7:3005:2901:ec5e:c073:b610], executor 3, partition 4, PROCESS_LOCAL, 4652 bytes) taskResourceAssignments Map()
   23/04/20 12:55:43 INFO TaskSetManager: Starting task 5.0 in stage 1.0 (TID 6) ([2600:1f18:7aa7:3005:cd0:80eb:6605:593], executor 2, partition 5, PROCESS_LOCAL, 4701 bytes) taskResourceAssignments Map()
   23/04/20 12:55:43 INFO TaskSetManager: Starting task 6.0 in stage 1.0 (TID 7) ([2600:1f18:7aa7:3005:463c:a178:348b:c839], executor 1, partition 6, PROCESS_LOCAL, 4592 bytes) taskResourceAssignments Map()
   23/04/20 12:55:43 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on [2600:1f18:7aa7:3005:2901:ec5e:c073:b610]:38209 (size: 39.5 KiB, free: 2.7 GiB)
   23/04/20 12:55:43 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 107 ms on [2600:1f18:7aa7:3005:2901:ec5e:c073:b610] (executor 3) (1/7)
   23/04/20 12:55:43 INFO TaskSetManager: Finished task 4.0 in stage 1.0 (TID 5) in 109 ms on [2600:1f18:7aa7:3005:2901:ec5e:c073:b610] (executor 3) (2/7)
   23/04/20 12:55:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on [2600:1f18:7aa7:3005:cd0:80eb:6605:593]:40479 (size: 39.5 KiB, free: 2.7 GiB)
   23/04/20 12:55:44 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on [2600:1f18:7aa7:3005:463c:a178:348b:c839]:45689 (size: 39.5 KiB, free: 2.7 GiB)
   23/04/20 12:55:45 INFO TaskSetManager: Finished task 5.0 in stage 1.0 (TID 6) in 1688 ms on [2600:1f18:7aa7:3005:cd0:80eb:6605:593] (executor 2) (3/7)
   23/04/20 12:55:45 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 3) in 1692 ms on [2600:1f18:7aa7:3005:cd0:80eb:6605:593] (executor 2) (4/7)
   23/04/20 12:55:45 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 2111 ms on [2600:1f18:7aa7:3005:463c:a178:348b:c839] (executor 1) (5/7)
   23/04/20 12:55:45 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 4) in 2110 ms on [2600:1f18:7aa7:3005:463c:a178:348b:c839] (executor 1) (6/7)
   23/04/20 12:55:46 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID 7) in 2486 ms on [2600:1f18:7aa7:3005:463c:a178:348b:c839] (executor 1) (7/7)
   23/04/20 12:55:46 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
   23/04/20 12:55:46 INFO DAGScheduler: ResultStage 1 (collect at HoodieSparkEngineContext.java:103) finished in 2.514 s
   23/04/20 12:55:46 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
   23/04/20 12:55:46 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
   23/04/20 12:55:46 INFO DAGScheduler: Job 1 finished: collect at HoodieSparkEngineContext.java:103, took 2.524419 s
   23/04/20 12:55:46 INFO SparkContext: Starting job: collect at HoodieSparkEngineContext.java:137
   23/04/20 12:55:46 INFO DAGScheduler: Got job 2 (collect at HoodieSparkEngineContext.java:137) with 1 output partitions
   23/04/20 12:55:46 INFO DAGScheduler: Final stage: ResultStage 2 (collect at HoodieSparkEngineContext.java:137)
   23/04/20 12:55:46 INFO DAGScheduler: Parents of final stage: List()
   23/04/20 12:55:46 INFO DAGScheduler: Missing parents: List()
   23/04/20 12:55:46 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[5] at flatMap at HoodieSparkEngineContext.java:137), which has no missing parents
   23/04/20 12:55:46 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 369.6 KiB, free 7.3 GiB)
   23/04/20 12:55:46 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 135.1 KiB, free 7.3 GiB)
   23/04/20 12:55:46 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on [2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:43873 (size: 135.1 KiB, free: 7.3 GiB)
   23/04/20 12:55:46 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1570
   23/04/20 12:55:46 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[5] at flatMap at HoodieSparkEngineContext.java:137) (first 15 tasks are for partitions Vector(0))
   23/04/20 12:55:46 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks resource profile 0
   23/04/20 12:55:46 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 8) ([2600:1f18:7aa7:3005:cd0:80eb:6605:593], executor 2, partition 0, PROCESS_LOCAL, 4332 bytes) taskResourceAssignments Map()
   23/04/20 12:55:46 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on [2600:1f18:7aa7:3005:cd0:80eb:6605:593]:40479 (size: 135.1 KiB, free: 2.7 GiB)
   23/04/20 12:55:47 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 8) in 650 ms on [2600:1f18:7aa7:3005:cd0:80eb:6605:593] (executor 2) (1/1)
   23/04/20 12:55:47 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
   23/04/20 12:55:47 INFO DAGScheduler: ResultStage 2 (collect at HoodieSparkEngineContext.java:137) finished in 0.685 s
   23/04/20 12:55:47 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
   23/04/20 12:55:47 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
   23/04/20 12:55:47 INFO DAGScheduler: Job 2 finished: collect at HoodieSparkEngineContext.java:137, took 0.689851 s
   23/04/20 12:55:47 WARN BaseHoodieCompactionPlanGenerator: No operations are retrieved for s3a://delta-streamer-demo-hudi/hudi
   23/04/20 12:55:47 WARN HoodieCompactor: Couldn't do schedule
   23/04/20 12:55:47 INFO SparkUI: Stopped Spark web UI at http://[2600:1f18:7aa7:3005:405:dccd:66cb:96e9]:8090
   23/04/20 12:55:47 INFO EmrServerlessClusterSchedulerBackend: Shutting down all executors
   23/04/20 12:55:47 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Asking each executor to shut down
   23/04/20 12:55:47 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
   23/04/20 12:55:47 INFO MemoryStore: MemoryStore cleared
   23/04/20 12:55:47 INFO BlockManager: BlockManager stopped
   23/04/20 12:55:47 INFO BlockManagerMaster: BlockManagerMaster stopped
   23/04/20 12:55:47 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
   23/04/20 12:55:47 INFO SparkContext: Successfully stopped SparkContext
   23/04/20 12:55:47 INFO ShutdownHookManager: Shutdown hook called
   23/04/20 12:55:47 INFO ShutdownHookManager: Deleting directory /tmp/spark-f0bb3006-aabf-4fc4-b3d0-5d9223ed8f68
   23/04/20 12:55:47 INFO ShutdownHookManager: Deleting directory /tmp/spark-19f096f4-4b39-4f2d-a97e-71b7e7d43e1d
   23/04/20 12:55:47 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system...
   23/04/20 12:55:47 INFO MetricsSystemImpl: s3a-file-system metrics system stopped.
   23/04/20 12:55:47 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.
   ```
   ![image](https://user-images.githubusercontent.com/39345855/233373278-4e425ad9-cb65-4833-a32b-cbad31dedcaf.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1513174914

   @soumilshah1995 Did we tried to just use hoodie.table.services.enabled to disable all table services. (archive, clean, compact, cluster) As anyway you want to disable 3 among it in your configs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1522438317

   No same error what we talked on call yesterday i will paste the screenshot 
   ![image](https://user-images.githubusercontent.com/39345855/234407655-20de6917-383b-4cf1-b0ae-920d3ba3fc1b.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1516301202

   looks like i am missing a setting can you point what am i missing 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] AmareshB commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "AmareshB (via GitHub)" <gi...@apache.org>.
AmareshB commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1603287780

   @soumilshah1995 
   
   ```
   hoodie.compact.inline.max.delta.commits
   Number of delta commits after the last compaction, before scheduling of a new compaction is attempted.
   Default Value: 5 (Optional)
   Config Param: INLINE_COMPACT_NUM_DELTA_COMMITS
   ```
   
   
   Try setting this param to 1 or add more commits, so you should be seeing a `compaction.requested` file in `.hoodie` dir as long as  this prop is set`'hoodie.compact.schedule.inline': 'true'`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1514669007

   I'm not sure why, but every time I try to connect, I get the error Let's connect 11:30. Perhaps if I show you my setup, you can tell me if I'm doing something incorrectly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1515543681

   i shall test this again on weekends with New JAR files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1522876762

   I will try to reproduce this on my end then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1627173526

   after using this jar now i am getting 
   ![image](https://github.com/apache/hudi/assets/39345855/6d05361c-f690-438d-819e-0046fa083271)
   
   
   i have given full permission as well 
   can you guys help 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1514444272

   @soumilshah1995 
   Here the error (like ) specifically says "command-runner" not found. Can you try directly give jar path in 'sparkSubmit': {'entryPoint':
   
   or can you try directly creating server less job first instead of lambda same as  (https://github.com/apache/hudi/issues/8412)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1626323845

   let me give a try over weekends 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1626353821

   Now trying with Custom JAR https://mvnrepository.com/artifact/org.apache.hudi/hudi-spark3.3-bundle_2.12/0.13.0
   
   lets see if this works 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1636454180

   @AmareshB 
   
   Sure 
   
   @AmareshB 
   
   # Step 1 : Create EMR 6.11 Cluster 
   ![image](https://github.com/apache/hudi/assets/39345855/320ac005-344f-4da6-a02c-1cdad5462226)
   
   # Step2 : Create MOR table 
   ```
   try:
       import sys
       import os
       from pyspark.context import SparkContext
       from pyspark.sql.session import SparkSession
       from awsglue.context import GlueContext
       from awsglue.job import Job
       from awsglue.dynamicframe import DynamicFrame
       from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
       from pyspark.sql.functions import *
       from awsglue.utils import getResolvedOptions
       from pyspark.sql.types import *
       from datetime import datetime, date
       import boto3
       from functools import reduce
       from pyspark.sql import Row
   
       import uuid
       from faker import Faker
   except Exception as e:
       print("Modules are missing : {} ".format(e))
   
   spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
            .config('spark.sql.hive.convertMetastoreParquet', 'false') \
            .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
            .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
            .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())
   
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   logger = glueContext.get_logger()
   
   # =================================INSERTING DATA =====================================
   global faker
   faker = Faker()
   
   
   class DataGenerator(object):
   
       @staticmethod
       def get_data():
           return [
               (
                   x,
                   faker.name(),
                   faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                   faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                   str(faker.random_int(min=10000, max=150000)),
                   str(faker.random_int(min=18, max=60)),
                   str(faker.random_int(min=0, max=100000)),
                   str(faker.unix_time()),
                   faker.email(),
                   faker.credit_card_number(card_type='amex'),
   
               ) for x in range(5)
           ]
   
   
   # ============================== Settings =======================================
   db_name = "hudidb"
   table_name = "employees"
   recordkey = 'emp_id'
   precombine = "ts"
   PARTITION_FIELD = 'state'
   path = "s3://soumilshah-hudi-demos/hudi/"
   method = 'upsert'
   table_type = "MERGE_ON_READ"
   # ====================================================================================
   
   hudi_part_write_config = {
       'className': 'org.apache.hudi',
   
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.table.type': table_type,
       'hoodie.datasource.write.operation': method,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.precombine.field': precombine,
       "hoodie.schema.on.read.enable": "true",
       "hoodie.datasource.write.reconcile.schema": "true",
   
       'hoodie.datasource.hive_sync.mode': 'hms',
       'hoodie.datasource.hive_sync.enable': 'true',
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.hive_sync.support_timestamp': 'false',
       'hoodie.datasource.hive_sync.database': db_name,
       'hoodie.datasource.hive_sync.table': table_name
   
       , "hoodie.compact.inline": "false"
       , 'hoodie.compact.schedule.inline': 'true'
       , "hoodie.metadata.index.check.timeout.seconds": "60"
       , "hoodie.write.concurrency.mode": "optimistic_concurrency_control"
       , "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
   
   }
   
   
   # ====================================================
   """Create Spark Data Frame """
   # ====================================================
   data = DataGenerator.get_data()
   
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
   df = spark.createDataFrame(data=data, schema=columns)
   df.write.format("hudi").options(**hudi_part_write_config).mode("overwrite").save(path)
   
   
   # ====================================================
   """APPEND """
   # ====================================================
   
   impleDataUpd = [
       (6, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999),
       (7, "This is APPEND", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
   ]
   
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
   usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
   usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
   
   
   # ====================================================
   """UPDATE """
   # ====================================================
   impleDataUpd = [
       (3, "this is update 1** on data lake", "Sales", "RJ", 81000, 30, 23000, 827307999),
   ]
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
   usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
   usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
   
   ```
   
   # Step 3:  Fire job 
   ```
   """
   https://github.com/apache/hudi/issues/8400
   """
   try:
       import json
       import uuid
       import os
       import boto3
       from dotenv import load_dotenv
   
       load_dotenv("../.env")
   except Exception as e:
       pass
   
   global AWS_ACCESS_KEY
   global AWS_SECRET_KEY
   global AWS_REGION_NAME
   
   AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
   AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
   AWS_REGION_NAME = "us-east-1"
   
   client = boto3.client("emr-serverless",
                         aws_access_key_id=AWS_ACCESS_KEY,
                         aws_secret_access_key=AWS_SECRET_KEY,
                         region_name=AWS_REGION_NAME)
   
   
   def lambda_handler_test_emr(event, context):
       # ============================== Settings =======================================
       table_name = "employees"
       recordkey = 'emp_id'
       precombine = "ts"
       path = "s3://soumilshah-hudi-demos/hudi/"
   
   
       # ====================================================================================
       # ---------------------------------------------------------------------------------
       #                                       EMR
       # --------------------------------------------------------------------------------
       ApplicationId = os.getenv("ApplicationId")
       ExecutionTime = 600
       ExecutionArn = os.getenv("ExecutionArn")
       JobName = 'hudi_compaction_{}'.format(table_name)
       jar_path = "s3://soumilshah-hudi-demos/jar/hudi-spark3.3-bundle_2.12-0.13.0.jar"
   
   
       # --------------------------------------------------------------------------------
       spark_submit_parameters = ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
       spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor'
       # schedule | execute | scheduleAndExecute
   
       arguments = [
           '--spark-memory', '5g',
           '--parallelism', '2',
           "--mode", "schedule",
           "--base-path", path,
           "--table-name", table_name,
           "--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(recordkey),
           "--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
           "--hoodie-conf", "hoodie.compact.schedule.inline=true",
           "--hoodie-conf", "hoodie.compact.inline.max.delta.commits=1"
   
       ]
   
       response = client.start_job_run(
           applicationId=ApplicationId,
           clientToken=uuid.uuid4().__str__(),
           executionRoleArn=ExecutionArn,
           jobDriver={
               'sparkSubmit': {
                   'entryPoint': "command-runner.jar",
                   'entryPointArguments': arguments,
                   'sparkSubmitParameters': spark_submit_parameters
               },
           },
           executionTimeoutMinutes=ExecutionTime,
           name=JobName,
       )
       print("response", end="\n")
       print(response)
   
   lambda_handler_test_emr(context=None, event=None)
   
   
   ```
   ![image](https://github.com/apache/hudi/assets/39345855/15f43e91-fc58-4b7e-8587-5615c55db6b5)
   
   # Now again trying with custom jar 
   
   ```
   """
   https://github.com/apache/hudi/issues/8400
   """
   try:
       import json
       import uuid
       import os
       import boto3
       from dotenv import load_dotenv
   
       load_dotenv("../.env")
   except Exception as e:
       pass
   
   global AWS_ACCESS_KEY
   global AWS_SECRET_KEY
   global AWS_REGION_NAME
   
   AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
   AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
   AWS_REGION_NAME = "us-east-1"
   
   client = boto3.client("emr-serverless",
                         aws_access_key_id=AWS_ACCESS_KEY,
                         aws_secret_access_key=AWS_SECRET_KEY,
                         region_name=AWS_REGION_NAME)
   
   
   def lambda_handler_test_emr(event, context):
       # ============================== Settings =======================================
       table_name = "employees"
       recordkey = 'emp_id'
       precombine = "ts"
       path = "s3://soumilshah-hudi-demos/hudi/"
   
   
       # ====================================================================================
       # ---------------------------------------------------------------------------------
       #                                       EMR
       # --------------------------------------------------------------------------------
       ApplicationId = os.getenv("ApplicationId")
       ExecutionTime = 600
       ExecutionArn = os.getenv("ExecutionArn")
       JobName = 'hudi_compaction_{}'.format(table_name)
       jar_path = "s3://soumilshah-hudi-demos/jar/hudi-spark3.3-bundle_2.12-0.13.0.jar"
   
   
       # --------------------------------------------------------------------------------
       spark_submit_parameters = ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
       spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor'
       # schedule | execute | scheduleAndExecute
   
       arguments = [
           '--spark-memory', '5g',
           '--parallelism', '2',
           "--mode", "schedule",
           "--base-path", path,
           "--table-name", table_name,
           "--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(recordkey),
           "--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
           "--hoodie-conf", "hoodie.compact.schedule.inline=true",
           "--hoodie-conf", "hoodie.compact.inline.max.delta.commits=1"
   
       ]
   
       # arguments = [
       #     '--spark-memory', '1g',
       #     '--parallelism', '2',
       #     "--mode", "schedule",
       #     "--base-path", path,
       #     "--table-name", table_name
       # ]
   
       response = client.start_job_run(
           applicationId=ApplicationId,
           clientToken=uuid.uuid4().__str__(),
           executionRoleArn=ExecutionArn,
           jobDriver={
               'sparkSubmit': {
                   'entryPoint': jar_path,
                   'entryPointArguments': arguments,
                   'sparkSubmitParameters': spark_submit_parameters
               },
           },
           executionTimeoutMinutes=ExecutionTime,
           name=JobName,
       )
       print("response", end="\n")
       print(response)
   
   
   
   lambda_handler_test_emr(context=None, event=None)
   
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1626350128

   # Glue job 
   ```
   try:
       import sys
       import os
       from pyspark.context import SparkContext
       from pyspark.sql.session import SparkSession
       from awsglue.context import GlueContext
       from awsglue.job import Job
       from awsglue.dynamicframe import DynamicFrame
       from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
       from pyspark.sql.functions import *
       from awsglue.utils import getResolvedOptions
       from pyspark.sql.types import *
       from datetime import datetime, date
       import boto3
       from functools import reduce
       from pyspark.sql import Row
   
       import uuid
       from faker import Faker
   except Exception as e:
       print("Modules are missing : {} ".format(e))
   
   spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
            .config('spark.sql.hive.convertMetastoreParquet', 'false') \
            .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
            .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
            .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())
   
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   logger = glueContext.get_logger()
   
   # =================================INSERTING DATA =====================================
   global faker
   faker = Faker()
   
   
   class DataGenerator(object):
   
       @staticmethod
       def get_data():
           return [
               (
                   x,
                   faker.name(),
                   faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                   faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                   str(faker.random_int(min=10000, max=150000)),
                   str(faker.random_int(min=18, max=60)),
                   str(faker.random_int(min=0, max=100000)),
                   str(faker.unix_time()),
                   faker.email(),
                   faker.credit_card_number(card_type='amex'),
   
               ) for x in range(5)
           ]
   
   
   # ============================== Settings =======================================
   db_name = "hudidb"
   table_name = "employees"
   recordkey = 'emp_id'
   precombine = "ts"
   PARTITION_FIELD = 'state'
   path = "s3://soumilshah-hudi-demos/hudi/"
   method = 'upsert'
   table_type = "MERGE_ON_READ"
   # ====================================================================================
   
   hudi_part_write_config = {
       'className': 'org.apache.hudi',
   
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.table.type': table_type,
       'hoodie.datasource.write.operation': method,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.precombine.field': precombine,
       "hoodie.schema.on.read.enable": "true",
       "hoodie.datasource.write.reconcile.schema": "true",
   
       'hoodie.datasource.hive_sync.mode': 'hms',
       'hoodie.datasource.hive_sync.enable': 'true',
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.hive_sync.support_timestamp': 'false',
       'hoodie.datasource.hive_sync.database': db_name,
       'hoodie.datasource.hive_sync.table': table_name
   
       , "hoodie.compact.inline": "false"
       , 'hoodie.compact.schedule.inline': 'true'
       , "hoodie.metadata.index.check.timeout.seconds": "60"
       , "hoodie.write.concurrency.mode": "optimistic_concurrency_control"
       , "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
   
   }
   
   
   # ====================================================
   """Create Spark Data Frame """
   # ====================================================
   data = DataGenerator.get_data()
   
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
   df = spark.createDataFrame(data=data, schema=columns)
   df.write.format("hudi").options(**hudi_part_write_config).mode("overwrite").save(path)
   
   
   # ====================================================
   """APPEND """
   # ====================================================
   
   impleDataUpd = [
       (6, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999),
       (7, "This is APPEND", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
   ]
   
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
   usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
   usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
   
   
   # ====================================================
   """UPDATE """
   # ====================================================
   impleDataUpd = [
       (3, "this is update 1** on data lake", "Sales", "RJ", 81000, 30, 23000, 827307999),
   ]
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
   usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
   usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
   
   ```
   
   # Compactions
   ```
   try:
       import json
       import uuid
       import os
       import boto3
       from dotenv import load_dotenv
   
       load_dotenv("../.env")
   except Exception as e:
       pass
   
   global AWS_ACCESS_KEY
   global AWS_SECRET_KEY
   global AWS_REGION_NAME
   
   AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
   AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
   AWS_REGION_NAME = "us-east-1"
   
   client = boto3.client("emr-serverless",
                         aws_access_key_id=AWS_ACCESS_KEY,
                         aws_secret_access_key=AWS_SECRET_KEY,
                         region_name=AWS_REGION_NAME)
   
   
   def lambda_handler_test_emr(event, context):
       # ============================== Settings =======================================
       table_name = "employees"
       recordkey = 'emp_id'
       precombine = "ts"
       path = "s3://soumilshah-hudi-demos/hudi/"
   
       # ====================================================================================
       # ---------------------------------------------------------------------------------
       #                                       EMR
       # --------------------------------------------------------------------------------
       ApplicationId = os.getenv("ApplicationId")
       ExecutionTime = 600
       ExecutionArn = os.getenv("ExecutionArn")
       JobName = 'delta_streamer_compaction_{}'.format(table_name)
   
       # --------------------------------------------------------------------------------
       spark_submit_parameters = ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
       spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor'
       jar_path = "s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar"
       # schedule | execute | scheduleAndExecute
   
       arguments = [
           '--spark-memory', '5g',
           '--parallelism', '2',
           "--mode", "schedule",
           "--base-path", path,
           "--table-name", table_name,
           "--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(recordkey),
           "--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
           "--hoodie-conf", "hoodie.compact.schedule.inline=true",
           "--hoodie-conf", "hoodie.compact.inline.max.delta.commits=1"
   
       ]
   
       response = client.start_job_run(
           applicationId=ApplicationId,
           clientToken=uuid.uuid4().__str__(),
           executionRoleArn=ExecutionArn,
           jobDriver={
               'sparkSubmit': {
                   'entryPoint': "command-runner.jar",
                   'entryPointArguments': arguments,
                   'sparkSubmitParameters': spark_submit_parameters
               },
           },
           executionTimeoutMinutes=ExecutionTime,
           name=JobName,
       )
       print("response", end="\n")
       print(response)
   
   
   lambda_handler_test_emr(context=None, event=None)
   
   ```
   
   # Error
   
   ![image](https://github.com/apache/hudi/assets/39345855/283d765c-39b5-47ac-93d6-c3b08e822cdc)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] AmareshB commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "AmareshB (via GitHub)" <gi...@apache.org>.
AmareshB commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1635131183

   Can you try with EMR 6.11 which has hudi 0.13 - that worked for me. EMR 6.10 didn't work me as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1505561544

   Additionally, I have a question that keeps coming up in the group: How would you disable compaction? I have a sample glue script configuration there; is it correct? Your advice will help me grasp the matter better. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1517725938

   @ad1happy2go 
   
   can you help here :D 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video [hudi]

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 closed issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video 
URL: https://github.com/apache/hudi/issues/8400


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1520480661

   @soumilshah1995 I see the job ran fine in the logs. Can you check stderr logs if you see any error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1521948897

   @soumilshah1995 Did the compaction succeed on data? 
   can you paste the screenshot of the table directory after job finished.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1533872885

   Any updates @ad1happy2go


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1636754865

   Any help would be great :D 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video [hudi]

Posted by "dacort (via GitHub)" <gi...@apache.org>.
dacort commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1812973730

   > Any help would be great :D
   
   @soumilshah1995 Did you ever figure this out? Just came across this and happy to help. I work on the EMR team.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1513432701

   thanks a lot 
   thank you very much 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1520537782

   as discussed on call let me do some more few test and i will get back to you soon :D 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1520570741

   # GLue Job 
   ```
   try:
       import sys
       import os
       from pyspark.context import SparkContext
       from pyspark.sql.session import SparkSession
       from awsglue.context import GlueContext
       from awsglue.job import Job
       from awsglue.dynamicframe import DynamicFrame
       from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
       from pyspark.sql.functions import *
       from awsglue.utils import getResolvedOptions
       from pyspark.sql.types import *
       from datetime import datetime, date
       import boto3
       from functools import reduce
       from pyspark.sql import Row
   
       import uuid
       from faker import Faker
   except Exception as e:
       print("Modules are missing : {} ".format(e))
   
   spark = (SparkSession.builder.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
            .config('spark.sql.hive.convertMetastoreParquet', 'false') \
            .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
            .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
            .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())
   
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   logger = glueContext.get_logger()
   
   # =================================INSERTING DATA =====================================
   global faker
   faker = Faker()
   
   
   class DataGenerator(object):
   
       @staticmethod
       def get_data():
           return [
               (
                   x,
                   faker.name(),
                   faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                   faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                   str(faker.random_int(min=10000, max=150000)),
                   str(faker.random_int(min=18, max=60)),
                   str(faker.random_int(min=0, max=100000)),
                   str(faker.unix_time()),
                   faker.email(),
                   faker.credit_card_number(card_type='amex'),
   
               ) for x in range(5)
           ]
   
   
   # ============================== Settings =======================================
   db_name = "hudidb"
   table_name = "employees"
   recordkey = 'emp_id'
   precombine = "ts"
   PARTITION_FIELD = 'state'
   path = "s3://delta-streamer-demo-hudi/hudi/"
   method = 'upsert'
   table_type = "MERGE_ON_READ"
   # ====================================================================================
   
   hudi_part_write_config = {
       'className': 'org.apache.hudi',
   
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.table.type': table_type,
       'hoodie.datasource.write.operation': method,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.precombine.field': precombine,
       "hoodie.schema.on.read.enable": "true",
       "hoodie.datasource.write.reconcile.schema": "true",
   
       'hoodie.datasource.hive_sync.mode': 'hms',
       'hoodie.datasource.hive_sync.enable': 'true',
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.hive_sync.support_timestamp': 'false',
       'hoodie.datasource.hive_sync.database': db_name,
       'hoodie.datasource.hive_sync.table': table_name
   
       , "hoodie.clean.automatic": "false"
       , "hoodie.clean.async": "false"
       , "hoodie.clustering.async.enabled": "false"
       , "hoodie.metadata.enable": "false"
       , "hoodie.metadata.index.async": "false"
       , "hoodie.metadata.index.column.stats.enable": "false"
       , "hoodie.compact.inline": "false"
       , 'hoodie.compact.schedule.inline': 'true'
   
       , "hoodie.metadata.index.check.timeout.seconds": "60"
       , "hoodie.write.concurrency.mode": "optimistic_concurrency_control"
       , "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
   
   }
   
   
   # ====================================================
   """Create Spark Data Frame """
   # ====================================================
   data = DataGenerator.get_data()
   
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
   df = spark.createDataFrame(data=data, schema=columns)
   df.write.format("hudi").options(**hudi_part_write_config).mode("overwrite").save(path)
   
   
   # ====================================================
   """APPEND """
   # ====================================================
   
   impleDataUpd = [
       (6, "This is APPEND", "Sales", "RJ", 81000, 30, 23000, 827307999),
       (7, "This is APPEND", "Engineering", "RJ", 79000, 53, 15000, 1627694678),
   ]
   
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
   usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
   usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
   
   
   # ====================================================
   """UPDATE """
   # ====================================================
   impleDataUpd = [
       (3, "this is update 1 on data lake", "Sales", "RJ", 81000, 30, 23000, 827307999),
   ]
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
   usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
   usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
   
   impleDataUpd = [
       (3, "this is update 2 on data lake", "Sales", "RJ", 81000, 30, 23000, 827307999),
   ]
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts"]
   usr_up_df = spark.createDataFrame(data=impleDataUpd, schema=columns)
   usr_up_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path)
   
   ```
   # Hudi Data Lake 
   ![image](https://user-images.githubusercontent.com/39345855/234073143-d1f18845-d7d1-4afe-b9f3-bf1765089b11.png)
   
   # EMR Job
   ```
   try:
       import json
       import uuid
       import os
       import boto3
       from dotenv import load_dotenv
   
       load_dotenv("../.env")
   except Exception as e:
       pass
   
   global AWS_ACCESS_KEY
   global AWS_SECRET_KEY
   global AWS_REGION_NAME
   
   AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
   AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
   AWS_REGION_NAME = "us-east-1"
   
   client = boto3.client("emr-serverless",
                         aws_access_key_id=AWS_ACCESS_KEY,
                         aws_secret_access_key=AWS_SECRET_KEY,
                         region_name=AWS_REGION_NAME)
   
   
   def lambda_handler_test_emr(event, context):
       # ============================== Settings =======================================
       db_name = "hudidb"
       table_name = "employees"
       recordkey = 'emp_id'
       precombine = "ts"
       PARTITION_FIELD = 'state'
       path = "s3://delta-streamer-demo-hudi/hudi/"
       method = 'upsert'
       table_type = "MERGE_ON_READ"
       # ====================================================================================
       # ---------------------------------------------------------------------------------
       #                                       EMR
       # --------------------------------------------------------------------------------
       ApplicationId = os.getenv("ApplicationId")
       ExecutionTime = 600
       ExecutionArn = os.getenv("ExecutionArn")
       JobName = 'delta_streamer_compaction_{}'.format(table_name)
   
       # --------------------------------------------------------------------------------
       spark_submit_parameters = ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
       spark_submit_parameters += ' --class org.apache.hudi.utilities.HoodieCompactor'
       jar_path = "s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar"
       # schedule | execute | scheduleAndExecute
   
       arguments = [
           '--spark-memory', '5g',
           '--parallelism', '2',
           "--mode", "scheduleAndExecute",
           "--base-path", path,
           "--table-name", table_name,
           "--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(recordkey),
           "--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
           "--hoodie-conf", "hoodie.metadata.index.async=false",
           "--hoodie-conf", "hoodie.metadata.enable=false"
   
       ]
   
       response = client.start_job_run(
           applicationId=ApplicationId,
           clientToken=uuid.uuid4().__str__(),
           executionRoleArn=ExecutionArn,
           jobDriver={
               'sparkSubmit': {
                   'entryPoint': jar_path,
                   'entryPointArguments': arguments,
                   'sparkSubmitParameters': spark_submit_parameters
               },
           },
           executionTimeoutMinutes=ExecutionTime,
           name=JobName,
       )
       print("response", end="\n")
       print(response)
   
   
   lambda_handler_test_emr(context=None, event=None)
   
   ```
   
   # Error 
   ![image](https://user-images.githubusercontent.com/39345855/234073282-3a102329-28c5-44c5-ab51-a92f41b8f311.png)
   
   # Stdrout 
   
   ```
   23/04/24 17:32:45 INFO SparkContext: Running Spark version 3.3.1-amzn-0
   23/04/24 17:32:45 INFO ResourceUtils: ==============================================================
   23/04/24 17:32:45 INFO ResourceUtils: No custom resources configured for spark.driver.
   23/04/24 17:32:45 INFO ResourceUtils: ==============================================================
   23/04/24 17:32:45 INFO SparkContext: Submitted application: compactor-employees
   23/04/24 17:32:45 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 4, script: , vendor: , memory -> name: memory, amount: 5120, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
   23/04/24 17:32:45 INFO ResourceProfile: Limiting resource is cpus at 4 tasks per executor
   23/04/24 17:32:46 INFO ResourceProfileManager: Added ResourceProfile id: 0
   23/04/24 17:32:46 INFO SecurityManager: Changing view acls to: hadoop
   23/04/24 17:32:46 INFO SecurityManager: Changing modify acls to: hadoop
   23/04/24 17:32:46 INFO SecurityManager: Changing view acls groups to: 
   23/04/24 17:32:46 INFO SecurityManager: Changing modify acls groups to: 
   23/04/24 17:32:46 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
   23/04/24 17:32:46 INFO deprecation: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
   23/04/24 17:32:46 INFO deprecation: mapred.output.compression.type is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type
   23/04/24 17:32:46 INFO deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
   23/04/24 17:32:46 INFO Utils: Successfully started service 'sparkDriver' on port 38683.
   23/04/24 17:32:46 INFO SparkEnv: Registering MapOutputTracker
   23/04/24 17:32:46 INFO SparkEnv: Registering BlockManagerMaster
   23/04/24 17:32:46 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
   23/04/24 17:32:46 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
   23/04/24 17:32:46 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
   23/04/24 17:32:46 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8f446246-8819-4282-bb21-cd9202f28988
   23/04/24 17:32:46 INFO MemoryStore: MemoryStore started with capacity 7.3 GiB
   23/04/24 17:32:46 INFO SparkEnv: Registering OutputCommitCoordinator
   23/04/24 17:32:46 INFO SubResultCacheManager: Sub-result caches are disabled.
   23/04/24 17:32:46 INFO Utils: Successfully started service 'SparkUI' on port 8090.
   23/04/24 17:32:46 INFO SparkContext: Added JAR s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar at s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar with timestamp 1682357565906
   23/04/24 17:32:47 INFO Utils: Using initial executors = 3, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
   23/04/24 17:32:47 INFO ExecutorContainerAllocator: Set total expected execs to {0=3}
   23/04/24 17:32:47 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35927.
   23/04/24 17:32:47 INFO NettyBlockTransferService: Server created on [2600:1f18:5856:4301:8471:a289:676c:1ff0]:35927
   23/04/24 17:32:47 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
   23/04/24 17:32:47 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, [2600:1f18:5856:4301:8471:a289:676c:1ff0], 35927, None)
   23/04/24 17:32:47 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:5856:4301:8471:a289:676c:1ff0]:35927 with 7.3 GiB RAM, BlockManagerId(driver, [2600:1f18:5856:4301:8471:a289:676c:1ff0], 35927, None)
   23/04/24 17:32:47 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, [2600:1f18:5856:4301:8471:a289:676c:1ff0], 35927, None)
   23/04/24 17:32:47 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, [2600:1f18:5856:4301:8471:a289:676c:1ff0], 35927, None)
   23/04/24 17:32:47 INFO ExecutorContainerAllocator: Going to request 3 executors for ResourceProfile Id: 0, target: 3 already provisioned: 0.
   23/04/24 17:32:47 INFO DefaultEmrServerlessRMClient: Creating containers with container role SPARK_EXECUTOR and keys: Set(1, 2, 3)
   23/04/24 17:32:47 INFO SingleEventLogFileWriter: Logging events to file:/var/log/spark/apps/00f9k5j6uatf6b09.inprogress
   23/04/24 17:32:47 INFO Utils: Using initial executors = 3, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
   23/04/24 17:32:47 INFO ExecutorAllocationManager: Dynamic allocation is enabled without a shuffle service.
   23/04/24 17:32:47 INFO ExecutorContainerAllocator: Set total expected execs to {0=3}
   23/04/24 17:32:47 INFO DefaultEmrServerlessRMClient: Containers created with container role SPARK_EXECUTOR. key to container id map: Map(2 -> b4c3da29-8d54-5c47-92b5-cb12498c32a8, 1 -> bcc3da29-8d24-1e3f-c6b7-b941a5b6c570, 3 -> fec3da29-8d30-0e57-f3b6-35b5fe897c79)
   23/04/24 17:32:51 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:5856:4301:6434:e690:3a6b:a55e:42664) with ID 1,  ResourceProfileId 0
   23/04/24 17:32:51 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:5856:4301:cddc:9113:28e1:eb26:38970) with ID 3,  ResourceProfileId 0
   23/04/24 17:32:51 INFO ExecutorMonitor: New executor 1 has registered (new total is 1)
   23/04/24 17:32:51 INFO ExecutorMonitor: New executor 3 has registered (new total is 2)
   23/04/24 17:32:51 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:5856:4301:cddc:9113:28e1:eb26]:43825 with 2.7 GiB RAM, BlockManagerId(3, [2600:1f18:5856:4301:cddc:9113:28e1:eb26], 43825, None)
   23/04/24 17:32:51 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:5856:4301:6434:e690:3a6b:a55e]:38015 with 2.7 GiB RAM, BlockManagerId(1, [2600:1f18:5856:4301:6434:e690:3a6b:a55e], 38015, None)
   23/04/24 17:32:51 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (2600:1f18:5856:4301:54a3:8e48:1700:41c7:43444) with ID 2,  ResourceProfileId 0
   23/04/24 17:32:51 INFO ExecutorMonitor: New executor 2 has registered (new total is 3)
   23/04/24 17:32:51 INFO EmrServerlessClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
   23/04/24 17:32:51 INFO BlockManagerMasterEndpoint: Registering block manager [2600:1f18:5856:4301:54a3:8e48:1700:41c7]:34311 with 2.7 GiB RAM, BlockManagerId(2, [2600:1f18:5856:4301:54a3:8e48:1700:41c7], 34311, None)
   23/04/24 17:32:51 INFO S3NativeFileSystem: Opening 's3://delta-streamer-demo-hudi/hudi/.hoodie/hoodie.properties' for reading
   23/04/24 17:32:51 WARN HoodieWriteConfig: Embedded timeline server is disabled, fallback to use direct marker type for spark
   23/04/24 17:32:52 WARN HoodieCompactor: No instant time is provided for scheduling compaction.
   23/04/24 17:32:52 INFO S3NativeFileSystem: Opening 's3://delta-streamer-demo-hudi/hudi/.hoodie/hoodie.properties' for reading
   23/04/24 17:32:52 WARN HoodieCompactor: Couldn't do schedule
   23/04/24 17:32:52 INFO SparkUI: Stopped Spark web UI at http://[2600:1f18:5856:4301:8471:a289:676c:1ff0]:8090
   23/04/24 17:32:52 INFO EmrServerlessClusterSchedulerBackend: Shutting down all executors
   23/04/24 17:32:52 INFO EmrServerlessClusterSchedulerBackend$EmrServerlessDriverEndpoint: Asking each executor to shut down
   23/04/24 17:32:52 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
   23/04/24 17:32:52 INFO MemoryStore: MemoryStore cleared
   23/04/24 17:32:52 INFO BlockManager: BlockManager stopped
   23/04/24 17:32:52 INFO BlockManagerMaster: BlockManagerMaster stopped
   23/04/24 17:32:52 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
   23/04/24 17:32:52 INFO SparkContext: Successfully stopped SparkContext
   23/04/24 17:32:52 INFO ShutdownHookManager: Shutdown hook called
   23/04/24 17:32:52 INFO ShutdownHookManager: Deleting directory /tmp/spark-7d6b3915-7b6d-4bfd-b534-a898b6dd6653
   23/04/24 17:32:52 INFO ShutdownHookManager: Deleting directory /tmp/spark-03ad476f-549d-4bc5-b1f4-16b1a343a93c
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1520484755

   i dont see much i am free to connect with you now if you want 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8400: [SUPPORT] Hudi Offline Compaction in EMR Serverless 6.10 for YouTube Video

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8400:
URL: https://github.com/apache/hudi/issues/8400#issuecomment-1523923026

   Thanks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org