You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/02 12:17:46 UTC

[GitHub] [hudi] schlichtanders commented on issue #6808: [SUPPORT] Cannot sync to spark embedded derby hive meta store (the default one)

schlichtanders commented on issue #6808:
URL: https://github.com/apache/hudi/issues/6808#issuecomment-1264629689

   Thank you for your help,
   have you tried to replicate it on your side?
   
   I added those configs so that the spark sql returns the following
   ```
   +--------------------------------------------------------------+------------------------------------+
   |key                                                           |value                               |
   +--------------------------------------------------------------+------------------------------------+
   |spark.hadoop.hive.metastore.schema.verification               |false                               |
   |spark.hadoop.hive.metastore.schema.verification.record.version|false                               |
   |spark.hadoop.javax.jdo.option.ConnectionDriverName            |org.apache.derby.jdbc.EmbeddedDriver|
   |spark.hadoop.javax.jdo.option.ConnectionURL                   |*********(redacted)                 |
   +--------------------------------------------------------------+------------------------------------+
   ```
   
   Now the code runs through, but no tables get registered. As a test, I am running `df.write.saveAsTable("saveastable_table")` which indeed works.
   
   I won't have the resources to debug this further. I switched to delta lake which works out of the box with sparks local metastore (the one autoenabled by merely setting `spark.sql.catalogImplementation=hive`, or using `SparkSession.builder.enableHiveSupport()` respectively).
   
   Having the possibility to test hudi locally with a local metastore is really crucial for us. It would be great if hudi can support this in the future. An example in the documentation which works would be really great. Best case would be that the default spark metastore works just out of the box (like it is the case for delta lake).
   
   
   --------------
   
   # Here details about my current try
   
   ```python
   from pyspark.sql import SparkSession
   from pathlib import Path
   import os
   
   os.environ["PYSPARK_SUBMIT_ARGS"] = " ".join([
       # hudi config
       "--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.0",
       "--conf spark.serializer=org.apache.spark.serializer.KryoSerializer",
       "--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
       "--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
       # "--conf spark.sql.hive.convertMetastoreParquet=false", # taken from AWS example
       # others
       # "--conf spark.hadoop.hive.metastore.uris=jdbc:derby:;databaseName=metastore_db;create=true",
       # "--conf spark.hadoop.hive.metastore.uris=''",
       # f"--conf spark.sql.warehouse.dir={Path('.').absolute() / 'metastore_warehouse'}",
       # "--conf spark.eventLog.enabled=false",
       "--conf spark.sql.catalogImplementation=hive",
       "--conf spark.hadoop.hive.metastore.schema.verification=false",
       "--conf spark.hadoop.hive.metastore.schema.verification.record.version=false",
       "--conf spark.hadoop.javax.jdo.option.ConnectionDriverName='org.apache.derby.jdbc.EmbeddedDriver'",
       "--conf spark.hadoop.javax.jdo.option.ConnectionURL='jdbc:derby:memory:myInMemDB;create=true'",
       "--conf spark.hadoop.datanucleus.schema.autoCreateTables=true",
       # f"--conf spark.sql.warehouse.dir={Path('.').absolute() / 'metastore_warehouse'}",
       # f"--conf spark.sql.hive.metastore.warehouse.dir={Path('.').absolute() / 'metastore_warehouse'}",
       # necessary last string
       "pyspark-shell",
   ])
   os.environ["PYSPARK_SUBMIT_ARGS"]
   
   spark = SparkSession.builder.getOrCreate()
   spark.sql("set").filter("key rlike 'metastore|jdo'").show(1000,False)
   
   sc = spark.sparkContext
   
   sc.setLogLevel("WARN")
   dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
   inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(
       dataGen.generateInserts(10)
   )
   from pyspark.sql.functions import expr
   
   df = spark.read.json(spark.sparkContext.parallelize(inserts, 10)).withColumn(
       "part", expr("'foo'")
   )
   df.toPandas()
   
   tableName = "test_hudi_pyspark_local"
   basePath = f"{Path('.').absolute()}/tmp/{tableName}"
   
   hudi_options = {
       "hoodie.table.name": tableName,
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "part",
       "hoodie.datasource.write.table.name": tableName,
       "hoodie.datasource.write.operation": "upsert",
       "hoodie.datasource.write.precombine.field": "ts",
       # "hoodie.upsert.shuffle.parallelism": 2,
       # "hoodie.insert.shuffle.parallelism": 2,
       "hoodie.datasource.hive_sync.database": "default",
       "hoodie.datasource.hive_sync.table": tableName,
       "hoodie.datasource.hive_sync.enable": "true",
       # "hoodie.datasource.meta.sync.enable": "true",
       # "hoodie.datasource.hive_sync.mode": "hiveql",
       "hoodie.datasource.hive_sync.mode": "hms",
       # "hoodie.datasource.hive_sync.mode": "jdbc",
       # "hoodie.datasource.hive_sync.username": "APP",
       # "hoodie.datasource.hive_sync.use_jdbc": "false",
       # "hoodie.datasource.hive_sync.jdbcurl": f"jdbc:derby:;databaseName={Path('.').absolute() / 'metastore_db'};create=true",
       # "hoodie.datasource.hive_sync.jdbcurl": "jdbc:derby:;databaseName=metastore_db;create=true",
       "hoodie.datasource.hive_sync.partition_fields": "part",
       "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
       "index.global.enabled": "true",
       "hoodie.index.type": "GLOBAL_BLOOM",
   }
   (df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
   spark.sql("Show tables from default").toPandas()
   
   df.write.saveAsTable("saveastable_table")
   spark.sql("Show tables from default").toPandas()
   ```
   
   The hoodie write outputs
   ```
   22/10/02 14:12:44 WARN HoodieSparkSqlWriter$: hoodie table at /home/ssahm/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/tmp/tmp/test_hudi_pyspark_local already exists. Deleting existing data & overwriting with new data.
   22/10/02 14:12:44 WARN HoodieBackedTableMetadata: Metadata table was not found at path /home/ssahm/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/tmp/tmp/test_hudi_pyspark_local/.hoodie/metadata
   ```
   and if I don't use `hms` but enable 
   ```
   "hoodie.datasource.hive_sync.mode": "jdbc",
   "hoodie.datasource.hive_sync.use_jdbc": "true",
   ```
   I even get an error
   ```
   Output exceeds the [size limit](command:workbench.action.openSettings?%5B%22notebook.output.textLineLimit%22%5D). Open the full output data[ in a text editor](command:workbench.action.openLargeOutput?4c0adbbf-d034-45ec-b730-dcdbff52b5cd)
   An error occurred while calling o217.save.
   : org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool
   	at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:58)
   	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2(HoodieSparkSqlWriter.scala:648)
   	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2$adapted(HoodieSparkSqlWriter.scala:647)
   	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
   	at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:647)
   	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:734)
   	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:338)
   	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:183)
   	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
   	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
   ...
   	at java.net.Socket.connect(Socket.java:607)
   	at org.apache.thrift.transport.TSocket.open(TSocket.java:221)
   	... 67 more
   ```
   ```
   ---------------------------------------------------------------------------
   Py4JJavaError                             Traceback (most recent call last)
   Cell In [16], line 29
         2 basePath = f"{Path('.').absolute()}/tmp/{tableName}"
         4 hudi_options = {
         5     "hoodie.table.name": tableName,
         6     "hoodie.datasource.write.recordkey.field": "uuid",
      (...)
        27     "hoodie.index.type": "GLOBAL_BLOOM",
        28 }
   ---> 29 (df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath))
   
   File ~/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/.venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py:740, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
       738     self._jwrite.save()
       739 else:
   --> 740     self._jwrite.save(path)
   
   File ~/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/.venv/lib/python3.9/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
      1315 command = proto.CALL_COMMAND_NAME +\
      1316     self.command_header +\
      1317     args_command +\
      1318     proto.END_COMMAND_PART
      1320 answer = self.gateway_client.send_command(command)
   -> 1321 return_value = get_return_value(
      1322     answer, self.gateway_client, self.target_id, self.name)
      1324 for temp_arg in temp_args:
      1325     temp_arg._detach()
   
   File ~/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/.venv/lib/python3.9/site-packages/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
       109 def deco(*a, **kw):
       110     try:
   --> 111         return f(*a, **kw)
       112     except py4j.protocol.Py4JJavaError as e:
       113         converted = convert_exception(e.java_exception)
   
   File ~/Projects_Freelance/Fielmann/bi_kls_data_ocean_import/src/.venv/lib/python3.9/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
       324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
       325 if answer[1] == REFERENCE_TYPE:
   --> 326     raise Py4JJavaError(
       327         "An error occurred while calling {0}{1}{2}.\n".
       328         format(target_id, ".", name), value)
       329 else:
       330     raise Py4JError(
       331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
       332         format(target_id, ".", name, value))
   
   Py4JJavaError: An error occurred while calling o217.save.
   : org.apache.hudi.exception.HoodieException: Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool
   	at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:58)
   	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2(HoodieSparkSqlWriter.scala:648)
   	at org.apache.hudi.HoodieSparkSqlWriter$.$anonfun$metaSync$2$adapted(HoodieSparkSqlWriter.scala:647)
   	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
   	at org.apache.hudi.HoodieSparkSqlWriter$.metaSync(HoodieSparkSqlWriter.scala:647)
   	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:734)
   	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:338)
   	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:183)
   	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
   	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
   	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
   	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
   	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
   	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
   	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
   	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
   	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
   	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
   	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
   	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
   	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   	at py4j.Gateway.invoke(Gateway.java:282)
   	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   	at py4j.commands.CallCommand.execute(CallCommand.java:79)
   	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
   	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.hive.HiveSyncTool
   	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:91)
   	at org.apache.hudi.sync.common.util.SyncUtilHelpers.instantiateMetaSyncTool(SyncUtilHelpers.java:75)
   	at org.apache.hudi.sync.common.util.SyncUtilHelpers.runHoodieMetaSync(SyncUtilHelpers.java:56)
   	... 48 more
   Caused by: java.lang.reflect.InvocationTargetException
   	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
   	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
   	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
   	at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
   	... 50 more
   Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Got runtime exception when hive syncing
   	at org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:106)
   	at org.apache.hudi.hive.HiveSyncTool.<init>(HiveSyncTool.java:95)
   	... 55 more
   Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Failed to create HiveMetaStoreClient
   	at org.apache.hudi.hive.HoodieHiveSyncClient.<init>(HoodieHiveSyncClient.java:95)
   	at org.apache.hudi.hive.HiveSyncTool.initSyncClient(HiveSyncTool.java:101)
   	... 56 more
   Caused by: org.apache.hudi.hive.HoodieHiveSyncException: Cannot create hive connection jdbc:hive2://localhost:10000/
   	at org.apache.hudi.hive.ddl.JDBCExecutor.createHiveConnection(JDBCExecutor.java:107)
   	at org.apache.hudi.hive.ddl.JDBCExecutor.<init>(JDBCExecutor.java:59)
   	at org.apache.hudi.hive.HoodieHiveSyncClient.<init>(HoodieHiveSyncClient.java:85)
   	... 57 more
   Caused by: java.sql.SQLException: Could not open client transport with JDBC Uri: jdbc:hive2://localhost:10000: java.net.ConnectException: Connection refused (Connection refused)
   	at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:224)
   	at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:107)
   	at java.sql.DriverManager.getConnection(DriverManager.java:664)
   	at java.sql.DriverManager.getConnection(DriverManager.java:247)
   	at org.apache.hudi.hive.ddl.JDBCExecutor.createHiveConnection(JDBCExecutor.java:104)
   	... 59 more
   Caused by: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)
   	at org.apache.thrift.transport.TSocket.open(TSocket.java:226)
   	at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:266)
   	at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:38)
   	at org.apache.hive.jdbc.HiveConnection.openTransport(HiveConnection.java:311)
   	at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:196)
   	... 63 more
   Caused by: java.net.ConnectException: Connection refused (Connection refused)
   	at java.net.PlainSocketImpl.socketConnect(Native Method)
   	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
   	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
   	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
   	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
   	at java.net.Socket.connect(Socket.java:607)
   	at org.apache.thrift.transport.TSocket.open(TSocket.java:221)
   	... 67 more
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org