You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "soumilshah1995 (via GitHub)" <gi...@apache.org> on 2023/02/22 13:32:42 UTC

[GitHub] [hudi] soumilshah1995 opened a new issue, #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI

soumilshah1995 opened a new issue, #8019:
URL: https://github.com/apache/hudi/issues/8019

   Hello 
   hope you are doing well. this is aa Question from one of hudi members in slack and i am creating support ticket on his behalf 
   
   Question is : 
   How can we run Disaster recovery in Pyspark and not HUDI ClI
   following links shows using HUDI CLI is there any code or example using PYSPARK and also how you can call procedure using pyspark 
   ![image](https://user-images.githubusercontent.com/39345855/220634514-cb1207eb-e4de-46d0-979b-4a49f2756e22.png)
   
   
   https://hudi.apache.org/docs/next/disaster_recovery/ 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI and use proc in PySpark

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1440244262

   Hi @papablus 
   
   if i add this configuration shall i be able to call proc ? in glue 4.0 ?
   and also would you please help with snippets for taking backup or how to rollback to older commits in pyspark 
   i see there is doc on hudi-cli there is no examples on pyspark 
   
   would really help community if you can help ;D 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI and use proc in PySpark

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1440353963

   Hi @papablus 
   is this right way to call show commits  procedure ?
   
   ```
   try:
       import sys
       from awsglue.transforms import *
       from awsglue.utils import getResolvedOptions
       from pyspark.context import SparkContext
       from awsglue.context import GlueContext
       from awsglue.job import Job
       from pyspark.sql.session import SparkSession
       from awsglue.dynamicframe import DynamicFrame
       from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
       from pyspark.sql.functions import *
       from awsglue.utils import getResolvedOptions
       from awsglueml.transforms import EntityDetector
       from pyspark.sql.types import StringType
       from pyspark.sql.types import *
       from datetime import datetime
       import boto3
       from functools import reduce
   except Exception as e:
       print("Error ")
   
   spark = (SparkSession.builder
            .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
            .config('spark.sql.hive.convertMetastoreParquet', 'false')
            .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog')
            .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
            .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())
   
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   logger = glueContext.get_logger()
   
   db_name = "hudidb"
   table_name = "hudi_table"
   
   recordkey = 'emp_id'
   path = "s3://hudi-demos-emr-serverless-project-soumil/tmp/"
   
   method = 'upsert'
   table_type = "COPY_ON_WRITE"
   precombine = "ts"
   partiton_field = "date"
   
   connection_options = {
       "path": path,
       "connectionName": "hudi-connection",
   
       "hoodie.datasource.write.storage.type": table_type,
       'hoodie.datasource.write.precombine.field': precombine,
       'className': 'org.apache.hudi',
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': method,
   
       'hoodie.datasource.hive_sync.enable': 'true',
       "hoodie.datasource.hive_sync.mode": "hms",
       'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
       'hoodie.datasource.hive_sync.database': db_name,
       'hoodie.datasource.hive_sync.table': table_name,
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
       'hoodie.datasource.write.hive_style_partitioning': 'true',
   
   }
   
   df = spark. \
       read. \
       format("hudi"). \
       load(path)
   
   print("\n")
   print(df.show(2))
   print("\n")
   
   
   
   
   try:
       query = """ 
       call show_commits(hudidb.hudi_table);
       """
       print(spark.sql(query).show())
   except Exception as e:
       print("Error 1", e)
   
   try:
       query = """ 
       call show_commits();
       """
       print(spark.sql(query).show())
   except Exception as e:
       print("Error 2", e)
   
   
   try:
       query = """ 
       call show_commits(hudidb.hudi_table);
       """
       print(spark.sql(query).show())
   except Exception as e:
       print("Error 1", e)
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI and use proc in PySpark

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1440260111

   Hi @papablus 
   
   im trying that and update you shortly :D 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI and use proc in PySpark

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1440395879

   Hi @papablus 
   i tried show_commit and im getting following error can you please tell me if there is something i am not doing right 
   
   ```
   try:
       import sys
       from awsglue.transforms import *
       from awsglue.utils import getResolvedOptions
       from pyspark.context import SparkContext
       from awsglue.context import GlueContext
       from awsglue.job import Job
       from pyspark.sql.session import SparkSession
       from awsglue.dynamicframe import DynamicFrame
       from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
       from pyspark.sql.functions import *
       from awsglue.utils import getResolvedOptions
       from awsglueml.transforms import EntityDetector
       from pyspark.sql.types import StringType
       from pyspark.sql.types import *
       from datetime import datetime
       import boto3
       from functools import reduce
   except Exception as e:
       print("Error ")
   
   spark = (SparkSession.builder
            .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
            .config('spark.sql.hive.convertMetastoreParquet', 'false')
            .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog')
            .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension')
            .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())
   
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   logger = glueContext.get_logger()
   
   db_name = "hudidb"
   table_name = "hudi_table"
   
   recordkey = 'emp_id'
   path = "s3://hudi-demos-emr-serverless-project-soumil/tmp/"
   
   method = 'upsert'
   table_type = "COPY_ON_WRITE"
   precombine = "ts"
   partiton_field = "date"
   
   connection_options = {
       "path": path,
       "connectionName": "hudi-connection",
   
       "hoodie.datasource.write.storage.type": table_type,
       'hoodie.datasource.write.precombine.field': precombine,
       'className': 'org.apache.hudi',
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': method,
   
       'hoodie.datasource.hive_sync.enable': 'true',
       "hoodie.datasource.hive_sync.mode": "hms",
       'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
       'hoodie.datasource.hive_sync.database': db_name,
       'hoodie.datasource.hive_sync.table': table_name,
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
       'hoodie.datasource.write.hive_style_partitioning': 'true',
   
   }
   
   df = spark. \
       read. \
       format("hudi"). \
       load(path)
   
   print("\n")
   print(df.show(2))
   print("\n")
   
   
   
   
   try:
       varSP = spark.sql("call show_commits('hudidb.hudi_table', 10)")
       print(varSP.show())
   except Exception as e:
       print("Error 1", e)
   
   
   
   
   try:
       varSP = spark.sql("call show_commits('hudidb.hudi_table', '10')")
       print(varSP.show())
   except Exception as e:
       print("Error 2", e)
   
   
   ```
   
   ### Error Message 
   ```
   <html>
   <body>
   <!--StartFragment-->
   
   An error occurred while calling o90.sql.: java.lang.AssertionError: assertion failed: It's not a Hudi table	at scala.Predef$.assert(Predef.scala:219)	at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.<init>(HoodieCatalogTable.scala:51)	at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:367)	at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable$.apply(HoodieCatalogTable.scala:363)	at org.apache.hudi.HoodieCLIUtils$.getHoodieCatalogTable(HoodieCLIUtils.scala:70)	at org.apache.spark.sql.hudi.command.procedures.ShowCommitsProcedure.call(ShowCommitsProcedure.scala:82)	at org.apache.spark.sql.hudi.command.CallProcedureHoodieCommand.run(CallProcedureHoodieCommand.scala:33)	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)	at org.apache.spark.sql.execution.command.ExecutedComman
 dExec.executeColle
   --
   ct(commands.scala:84)	at org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)	at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)	at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.
 scala:138)	at org
   .apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)	at org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)	at org.apache.spark.sql.execution.QueryExecution$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$super$transformDownWithPruning(LogicalPlan.scala:30)	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)	at org.apache.spark.sql.catalyst.p
 lans.logical.Anal
   ysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:222)	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:102)	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:99)	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)	at org.apache.sp
 ark.sql.SparkSess
   ion.withActive(SparkSession.scala:779)	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)	at java.lang.reflect.Method.invoke(Method.java:498)	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)	at py4j.Gateway.invoke(Gateway.java:282)	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)	at py4j.commands.CallCommand.execute(CallCommand.java:79)	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)	at java.lang.Thread.run(Thread.java:750)
   
   <!--EndFragment-->
   </body>
   </html>
   ```
   
   ![image](https://user-images.githubusercontent.com/39345855/220696561-74fc96c0-30a6-4f87-acf3-ee9789a966d6.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 closed issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI and use proc in PySpark

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 closed issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI and use proc in PySpark 
URL: https://github.com/apache/hudi/issues/8019


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1440080735

   Hello @kazdy  thank you very much for such a prompt reply 
   there are lot of python users which would love to see some python examples is there any python example that shows how to recover or rollback to older checkpoints ?
   if not we can post sample code i am okay creating tutorials for community that way we can make information accessible to everyone easily :D 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI and use proc in PySpark

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1440886019

   # My Issue have been resolved and would be making content for community 
   
   <img width="784" alt="1231" src="https://user-images.githubusercontent.com/39345855/220772505-259d7685-c52d-4884-8d25-427004532ba5.PNG">
   
   
   <img width="790" alt="Capture" src="https://user-images.githubusercontent.com/39345855/220772510-858060a7-f3b4-4214-8799-5b4f145dd9b7.PNG">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nfarah86 commented on issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI and use proc in PySpark

Posted by "nfarah86 (via GitHub)" <gi...@apache.org>.
nfarah86 commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1441199915

   thanks @soumilshah1995 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] soumilshah1995 commented on issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI

Posted by "soumilshah1995 (via GitHub)" <gi...@apache.org>.
soumilshah1995 commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1440153705

   Here is sample code and not sure how really to call this in pyspark 
   
   ```
   """
   %connections hudi-connection
   %glue_version 3.0
   %region us-east-1
   %worker_type G.1X
   %number_of_workers 3
   %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
   %additional_python_modules Faker
   
   """
   
   try:
       import sys
       from awsglue.transforms import *
       from awsglue.utils import getResolvedOptions
       from pyspark.context import SparkContext
       from awsglue.context import GlueContext
       from awsglue.job import Job
       from pyspark.sql.session import SparkSession
       from awsglue.dynamicframe import DynamicFrame
       from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
       from pyspark.sql.functions import *
       from awsglue.utils import getResolvedOptions
       from awsglueml.transforms import EntityDetector
       from pyspark.sql.types import StringType
       from pyspark.sql.types import *
       from datetime import datetime
   
       import boto3
       from functools import reduce
   except Exception as e:
       print("Error ")
   
   
   
   spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer') \
       .config('spark.sql.hive.convertMetastoreParquet','false') \
       .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true') .getOrCreate()
   
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   logger = glueContext.get_logger()
   
   
   import uuid
   from faker import Faker
   
   global faker
   faker = Faker()
   
   class DataGenerator(object):
   
       @staticmethod
       def get_data():
           return [
               (
                   uuid.uuid4().__str__(),
                   faker.name(),
                   faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')),
                   faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')),
                   str(faker.random_int(min=10000, max=150000)),
                   str(faker.random_int(min=18, max=60)),
                   str(faker.random_int(min=0, max=100000)),
                   str(faker.unix_time()),
                   faker.email(),
                   faker.credit_card_number(card_type='amex'),
                   faker.date()
               ) for x in range(100)
           ]
   
   data = DataGenerator.get_data()
   columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card", "date"]
   spark_df = spark.createDataFrame(data=data, schema=columns)
   
   
   db_name = "hudidb"
   table_name="hudi_table"
   
   recordkey = 'emp_id'
   path = "s3://hudi-demos-emr-serverless-project-soumil/tmp/"
   
   method = 'upsert'
   table_type = "COPY_ON_WRITE"
   precombine = "ts"
   partiton_field = "date"
   
   connection_options={
       "path": path,
       "connectionName": "hudi-connection",
   
       "hoodie.datasource.write.storage.type": table_type,
       'hoodie.datasource.write.precombine.field': precombine,
       'className': 'org.apache.hudi',
       'hoodie.table.name': table_name,
       'hoodie.datasource.write.recordkey.field': recordkey,
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': method,
   
       'hoodie.datasource.hive_sync.enable': 'true',
       "hoodie.datasource.hive_sync.mode":"hms",
       'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
       'hoodie.datasource.hive_sync.database': db_name,
       'hoodie.datasource.hive_sync.table': table_name,
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
       'hoodie.datasource.write.hive_style_partitioning': 'true',
   
   
   }
   
   WriteDF = (
       glueContext.write_dynamic_frame.from_options(
           frame=DynamicFrame.fromDF(spark_df, glueContext,"glue_df"),
           connection_type="marketplace.spark",
           connection_options=connection_options,
           transformation_ctx="glue_df",
       )
   )
   
   df = spark. \
       read. \
       format("hudi"). \
       load(path)
   
   #### tried
   query = """ 
   call show_commits(table = 'hudi_table' limit 10 );
   """
   spark.sql(query).show()
   
   #### tried
   query = """ 
   call show_commits(table => 'hudi_table', limit => 10);
   """
   spark.sql(query).show()
   
   ```
   #### Error Message 
   ```
   ParseException: 
   mismatched input 'call' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 2, pos 0)
   
   == SQL ==
    
   call show_commits(table = 'hudi_table' limit 10 );
   ^^^
   ``
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] papablus commented on issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI and use proc in PySpark

Posted by "papablus (via GitHub)" <gi...@apache.org>.
papablus commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1440237517

   Hi, i am restoring  savepoint using glue4 and hudi 0.12.1 using this spark configuration
   
   ```
   spark = (SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer') \
           .config('spark.sql.hive.convertMetastoreParquet','false') \
           .config('spark.sql.catalog.spark_catalog','org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
           .config('spark.sql.extensions','org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
           .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())
   ```
           
           
   I see that the spark.sql.extensions is mandatory when you are going to call a procedure. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] papablus commented on issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI and use proc in PySpark

Posted by "papablus (via GitHub)" <gi...@apache.org>.
papablus commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1440254996

   Hi @soumilshah1995, 
   
   This is my glue job (Glue 4 - Spark 3.3 - Python 3)
   
   `import sys
   from pyspark.context import SparkContext
   from pyspark.sql.session import SparkSession
   from awsglue.context import GlueContext
   from awsglue.job import Job
   from awsglue.dynamicframe import DynamicFrame
   from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
   from pyspark.sql.functions import *
   from awsglue.utils import getResolvedOptions
   from pyspark.sql.types import *
   from datetime import datetime, date
   import boto3
   from functools import reduce
   from pyspark.sql import Row
   
   args = getResolvedOptions(sys.argv, ['JOB_NAME',
                                       'OUTPUT_BUCKET',
                                       'HUDI_INIT_SORT_OPTION',
                                       'HUDI_TABLE_NAME',
                                       'HUDI_DB_NAME',
                                       'CATEGORY_ID'
                                       ])
   print(args)
   # print("The name of the bucket is :- ",  args['curated_bucket'])
   job_start_ts = datetime.now()
   ts_format = '%Y-%m-%d %H:%M:%S'
   spark = (SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer') \
           .config('spark.sql.hive.convertMetastoreParquet','false') \
           .config('spark.sql.catalog.spark_catalog','org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
           .config('spark.sql.extensions','org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
           .config('spark.sql.legacy.pathOptionBehavior.enabled', 'true').getOrCreate())
   sc = spark.sparkContext
   glueContext = GlueContext(sc)
   job = Job(glueContext)
   logger = glueContext.get_logger()
   job.init(args['JOB_NAME'], args)
   
   
   def perform_hudi_bulk_insert(
                                hudi_init_sort_option,
                                hudi_output_bucket,
                                hudi_table_name,
                                hudi_db_name,
                                category_id
                               ):
       
       hudi_table_type =  'COPY_ON_WRITE'
     
       hudi_table_name = hudi_table_name.lower() + '_' + hudi_init_sort_option.lower()
   
       # create the schema for the ny trips raw data file in CSV format. 
        
       #RECORD_KEY: C 
       #PRECOMBINE FIELD: F
       #PARTITION FIELD: D
       
       # PySpark DataFrame with Ecplicit Schema.  
       df = spark.createDataFrame([
           (10000001, '110000001', 'part1', 'date(1100000001, 81, 1)', 19, 201),
           (10000001, '110000001', 'part1', 'date(1100000001, 81, 1)', 19, 202),
           (20000001, '220000001', 'part1', 'date(2200000001, 82, 1)', 29, 202), 
           (20000001, '220000001', 'part1', 'date(2200000001, 82, 1)', 29, 201), 
           (30000001, '330000001', 'part1', 'date(3300000001, 83, 1)', 30, 203),
           (30000001, '330000001', 'part1', 'date(3300000001, 83, 1)', 32, 203),
           (30000001, '330000001', 'part1', 'date(3300000001, 83, 1)', 34, 203),
           (30000001, '330000001', 'part1', 'date(3300000001, 83, 1)', 36, 203),
           (40000001, '440000001', 'part1', 'date(4400000001, 84, 1)', 47, 888),
           (40000001, '440000001', 'part1', 'date(4400000001, 84, 1)', 43, 888),
           (40000001, '440000001', 'part1', 'date(4400000001, 84, 1)', 45, 888),        
           (40000001, '440000001', 'part1', 'date(4400000001, 84, 1)', 41, 888)
       ], 
       schema='a int, c string, d string, e string, f int, g int')
        
       #df = spark.createDataFrame([
       #], 
       #schema='a int, c string, d string, e string, f int, g int')
         
       # show table
       df.show()
         
       # show schema 
       df.printSchema()
       
       RECORD_KEY = "c"
       PARTITION_FIELD = "d"
       PRECOMBINE_FIELD = "f"
       
       table_schema = StructType(
               [ StructField("a",IntegerType(),True), 
                 StructField("b",DoubleType(),True), 
                 StructField("c",StringType(),True), 
                 StructField("d", DateType(), True), 
                 StructField("e", DateType(), True), 
                 StructField("f", IntegerType(), True)
               ])
           
       #table_schema2 = '{"type" : "record","name" : "userInfo","namespace" : "my.example","fields" : [{"name" : "a", "type" : "int"},{"name" : "b", "type" : "int"},{"name" : "c", "type" : "string"},{"name" : "d", "type" : "string"},{"name" : "e", "type" : "string"},{"name" : "f", "type" : "int"}]}'
       
       curr_session = boto3.session.Session()
       curr_region = curr_session.region_name
       # set the hudi configuration 
       hudi_part_write_config = {
           'className': 'org.apache.hudi',
           'hoodie.datasource.hive_sync.enable':'true',
           'hoodie.datasource.hive_sync.use_jdbc':'false',
           'hoodie.datasource.hive_sync.support_timestamp': 'false',
           'hoodie.datasource.write.operation': 'bulk_insert',
           'hoodie.datasource.write.table.type': hudi_table_type,
           'hoodie.table.name': hudi_table_name,
           'hoodie.datasource.hive_sync.table': hudi_table_name,
           'hoodie.datasource.write.recordkey.field': RECORD_KEY,
           'hoodie.datasource.write.precombine.field': PRECOMBINE_FIELD,
           'hoodie.datasource.write.partitionpath.field': 'd:SIMPLE',
           'hoodie.datasource.write.hive_style_partitioning': 'true',
           'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
           'hoodie.datasource.write.hive_style_partitioning': 'true',
           'hoodie.datasource.hive_sync.partition_fields': PARTITION_FIELD,
           'hoodie.datasource.hive_sync.database': hudi_db_name,
           'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
           'hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled' : 'true',
           'hoodie.write.concurrency.mode' : 'single_writer',
           'hoodie.cleaner.policy.failed.writes' : 'EAGER',
           'hoodie.combine.before.insert' : 'true'
   
          ,'hoodie.cleaner.policy' : 'KEEP_LATEST_COMMITS'
          ,'hoodie.clean.automatic':'true'
          #,'hoodie.clean.async':'true'
          ,'hoodie.cleaner.commits.retained' : 4
          ,'hoodie.datasource.hive_sync.mode' : 'hms'
   
           
           }
           
   
       table_path = 's3://{}/{}'.format(hudi_output_bucket, hudi_table_name)
       
       print('The path for Hudi table where data is stored', table_path)
   
       # only set this hudi parameter is a bulk insert sort option other than default is choosen
       if hudi_init_sort_option.upper() in ['PARTITION_SORT', 'NONE']:
           hudi_part_write_config['hoodie.bulkinsert.sort.mode'] = hudi_init_sort_option
           
       start_tm = datetime.now()
       print('The start time for the Bulk Insert :-' , start_tm)
       
       df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(table_path)
      
       end_tm = datetime.now()
       print('The End time for the Bulk Insert :-' , end_tm)
       time_diff = end_tm - start_tm
       print('The time it took for Bulk Insert operation :-' , time_diff)
   
       #Creacion del Save Point     
       #varSP = spark.sql("call create_savepoint('hudi2.live_demo_restore_global_sort', '20230124192052120')")
       #print(varSP.show())
       
       #Eliminación del SavePoint
       #varSP0 = spark.sql("call delete_savepoint('hudi2.live_demo_restore_global_sort', '20230124192052120')")
       #print(varSP0.show())
       
       #Visualizacion del SavePoint
       #varSP2 = spark.sql("call show_savepoints('hudi2.live_demo_restore_global_sort')")
       #print(varSP2.show())
       
       #Eliminación del SavePoint
       #varSP0 = spark.sql("call delete_savepoint('hudi2.live_demo_restore_global_sort', '20230118154353907')")
       #print(varSP0.show())
       
       #Restauración del SavePoint   
       #varSP = spark.sql("call rollback_to_savepoint('hudi2.live_demo_restore_global_sort', '20230124192052120')")
       #print(varSP.show())
   
   if __name__ == "__main__":
       
       try:
           perform_hudi_bulk_insert(args['HUDI_INIT_SORT_OPTION'],
                                    args['OUTPUT_BUCKET'],
                                    args['HUDI_TABLE_NAME'],
                                    args['HUDI_DB_NAME'],
                                    args['CATEGORY_ID']
                                    
                               )
       except Exception as e:
           print(e)
           raise
                   
   job.commit()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] kazdy commented on issue #8019: [SUPPORT] How to Restore from Checkpoints from disaster recovery in hudi using Pyspark and not Hudi CLI

Posted by "kazdy (via GitHub)" <gi...@apache.org>.
kazdy commented on issue #8019:
URL: https://github.com/apache/hudi/issues/8019#issuecomment-1440075626

   Hi
   
   Yes you can call spark procedures using pyspark, simply wrap your call command in 
   `spark.sql("call <procedure>(<args>>)")`.
   Just make sure spark session is properly configured for Hudi (catalog, sql extensions, as described in quickstart for spark)
   
   Here are spark procedures available to users, most are not documented yet:
   https://github.com/apache/hudi/tree/master/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures
   
   For disaster recovery you are probably looking for:
   CreateSavepointProcedure.scala
   DeleteSavepointProcedure.scala
   RollbackToSavepointProcedure.scala
   ShowSavepointsProcedure.scala


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org