You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/06/23 18:53:20 UTC

[GitHub] [hudi] bschell opened a new pull request #1760: [HUDI-1040] Update apis for spark3

bschell opened a new pull request #1760:
URL: https://github.com/apache/hudi/pull/1760


   Modifies use of spark apis for compatibility with both spark2 and spark3
   
   ## What is the purpose of the pull request
   
   Updates spark apis and allows compatibility with spark3
   
   ## Verify this pull request
   
   Existing tests verify the functionality for spark2.
   Manual tests for spark3 have been performed on EMR clusters. 
   You can verify by building Hudi successfully with spark v3.0.0
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bschell commented on a change in pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bschell commented on a change in pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#discussion_r454038901



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -78,4 +79,21 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // First attempt to use spark2 API for deserialization, otherwise attempt with spark3 API
+    try {
+      val spark2method = encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
+      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
+    } catch {
+      case e: NoSuchElementException => spark3Deserialize(encoder, internalRow)

Review comment:
       Thanks for the suggestion! I will try making the same change as that commit you linked but I am going to see if I can test the performance impact first.
   
   If you have any more details on how it affected performance please let me know.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi edited a comment on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
zhedoubushishi edited a comment on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-713049606


   > Got hive class error
   > 
   > ```
   > Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/metastore/HiveMetaHookLoader;Ljava/util/concurrent/ConcurrentHashMap;Ljava/lang/String;Z)Lorg/apache/hadoop/hive/metastore/IMetaStoreClient;
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3600)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3652)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3632)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3894)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:388)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:332)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:312)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:288)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:260)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:286)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:389)
   > 	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221)
   > 	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
   > 	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
   > 	at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221)
   > 	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:137)
   > 	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:127)
   > 	at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:157)
   > 	at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:155)
   > 	at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:59)
   > 	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:93)
   > 	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:93)
   > 	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:206)
   > 	at org.apache.spark.sql.execution.command.CreateDatabaseCommand.run(ddl.scala:81)
   > 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
   > 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
   > 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
   > 	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
   > 	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
   > 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
   > 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
   > 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
   > 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   > 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
   > 	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
   > 	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
   > 	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
   > 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   > 	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
   > 	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
   > 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   > 	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
   > 	at bb.gtd.de.CanalJsonToOds$.initSchema(CanalJsonToOds.scala:81)
   > 	at bb.gtd.de.CanalJsonToOds$.delayedEndpoint$bb$gtd$de$CanalJsonToOds$1(CanalJsonToOds.scala:42)
   > 	at bb.gtd.de.CanalJsonToOds$delayedInit$body.apply(CanalJsonToOds.scala:11)
   > 	at scala.Function0.apply$mcV$sp(Function0.scala:39)
   > 	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
   > 	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
   > 	at scala.App.$anonfun$main$1$adapted(App.scala:80)
   > 	at scala.collection.immutable.List.foreach(List.scala:431)
   > 	at scala.App.main(App.scala:80)
   > 	at scala.App.main$(App.scala:78)
   > 	at bb.gtd.de.CanalJsonToOds$.main(CanalJsonToOds.scala:11)
   > 	at bb.gtd.de.CanalJsonToOds.main(CanalJsonToOds.scala)
   > 20/10/14 15:18:12 INFO SparkContext: Invoking stop() from shutdown hook
   > 20/10/14 15:18:12 INFO AbstractConnector: Stopped Spark@64b31700{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
   > 20/10/14 15:18:12 INFO SparkUI: Stopped Spark web UI at http://192.168.200.57:4041
   > 20/10/14 15:18:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
   > 20/10/14 15:18:12 INFO MemoryStore: MemoryStore cleared
   > 20/10/14 15:18:12 INFO BlockManager: BlockManager stopped
   > 20/10/14 15:18:12 INFO BlockManagerMaster: BlockManagerMaster stopped
   > 20/10/14 15:18:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
   > 20/10/14 15:18:12 INFO SparkContext: Successfully stopped SparkContext
   > 20/10/14 15:18:12 INFO ShutdownHookManager: Shutdown hook called
   > 20/10/14 15:18:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/vv/5d3clfpj22q_c12ghwdnpfl80000gn/T/spark-a0c47cb6-1512-4f8c-8d10-38e58796fed6
   > Disconnected from the target VM, address: '127.0.0.1:50019', transport: 'socket'
   > ```
   
   I suspect that this is because Spark 3.0.0 uses Hive [2.3.7](https://github.com/apache/spark/blob/v3.0.0/pom.xml#L130) but Spark 2.x uses Hive [1.2.1.spark2](https://github.com/apache/spark/blob/v2.4.0/pom.xml#L129) and this causes some API conflict. 
   
   Looks like this signature: ```org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/metastore/HiveMetaHookLoader;Ljava/util/concurrent/ConcurrentHashMap;Ljava/lang/String;Z)``` only exists in Hive ```1.2.1.spark2```: https://github.com/JoshRosen/hive/blob/release-1.2.1-spark2/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java#L101 but no longer exists in Hive ```2.3.7```: https://github.com/apache/hive/blob/rel/release-2.3.7/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java.
   
   If we compile with Spark 2 and then run with Spark 3, we might run into this kind of issue.
   
   But I didn't see any Hudi class in the error stack, can you provide more information about this error?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-713049606


   > RetryingMetaStoreClient
   
   I suspect that this is because Spark 3.0.0 uses Hive [2.3.7](https://github.com/apache/spark/blob/v3.0.0/pom.xml#L130) but Spark 2.x uses Hive [1.2.1.spark2](https://github.com/apache/spark/blob/v2.4.0/pom.xml#L129) and this causes some API conflict. 
   
   Looks like this signature: ```org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/metastore/HiveMetaHookLoader;Ljava/util/concurrent/ConcurrentHashMap;Ljava/lang/String;Z)``` only exists in Hive ```1.2.1.spark2```: https://github.com/JoshRosen/hive/blob/release-1.2.1-spark2/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java#L101 but no longer exists in Hive ```2.3.7```: https://github.com/apache/hive/blob/rel/release-2.3.7/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java.
   
   If we compile with Spark 2 and then run with Spark 3, we will run into this kind of issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bvaradar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-696339666


   @bschell : For running integration tests with hudi packages built with scala 2.12, we just need to change scripts/run_travis_tests.sh. The docker container should automatically load those jars for running integration tests.
   
   `diff --git a/scripts/run_travis_tests.sh b/scripts/run_travis_tests.sh
   index 63fb959c..b77b4f64 100755
   --- a/scripts/run_travis_tests.sh
   +++ b/scripts/run_travis_tests.sh
   @@ -35,7 +35,7 @@ elif [ "$mode" = "integration" ]; then
      export SPARK_HOME=$PWD/spark-${sparkVersion}-bin-hadoop${hadoopVersion}
      mkdir /tmp/spark-events/
      echo "Running Integration Tests"
   -  mvn verify -Pintegration-tests -B
   +  mvn verify -Pintegration-tests -Dscala-2.12 -B
    else
      echo "Unknown mode $mode"
      exit 1
   `


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-674457329


   @bschell : cool. I will let you drive the patch then. I gave it a shot thinking if we can get it to 0.6.0. But we couldn't make it. So, will let you drive this. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-712391147


   @bschell @vinothchandar to make clear, just wondering what is the exact goal for this pr? Do we want to make Hudi support both compile & run with spark 3 or we want to make Hudi compile with spark 2 and then run with spark3?
   
   Ideally we should make Hudi both compile and run with Spark3. But current code change cannot compile with spark 3.
   
   Run
   
   ```
   mvn clean install -DskipTests -DskipITs -Dspark.version=3.0.0 -Pscala-2.12
   ```
   
   returns
   
   ```
   [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile (default-testCompile) on project hudi-client: Compilation failure
   [ERROR] /Users/wenningd/workplace/Aws157Hudi/src/Aws157Hudi/hudi-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java:[146,27] cannot find symbol
   [ERROR]   symbol:   method toRow(org.apache.spark.sql.Row)
   [ERROR]   location: variable encoder of type org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
   [ERROR] 
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-685864029


   May not work actually 
   
   ```
   class A {
       def doSomething(arg: Int) = {
       }
     }
   
     @Test def testReflectionPerformance(): Unit = {
       // warmup the jvm and ignore.
       val a = new A()
       (1 to 100000000).foreach(_ => a.doSomething(1))
   
       val normalStartMs = System.currentTimeMillis();
       (1 to 100000000).foreach(_ => a.doSomething(1))
       println(s"Normal elapsed : ${System.currentTimeMillis() - normalStartMs}")
   
       val reflectStartMs = System.currentTimeMillis();
       val method = a.getClass.getMethods.filter(method => method.getName.equals("doSomething")).last
       (1 to 100000000).foreach(_ => method.invoke(a, 1.asInstanceOf[Object]))
       println(s"Reflect elapsed : ${System.currentTimeMillis() - reflectStartMs}")
     }
   ```
   
   gives the following results
   
   ```
   Normal elapsed : 30
   Reflect elapsed : 312
   ```
   
   let's abandon the reflection approach altogether. Either we munge the APIs somehow or just make a separate module for spark3 . e.g `hudi-spark3` with its own bundling. This may not be a bad choice per se. wdyt


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bschell commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bschell commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-673051331


   @nsivabalan Looks like a recent commit made this Class private: https://github.com/apache/spark/commit/ce2cdc36e29742dda22200963cfd3f9876170455 
   
   However I think we can workaround this by porting the code for those methods over to Hudi (it's not too long).
   For example:
   
   
   [spark3.txt](https://github.com/apache/hudi/files/5065062/spark3.txt)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-713294221


   > @bschell : For running integration tests with hudi packages built with scala 2.12, we just need to change scripts/run_travis_tests.sh. The docker container should automatically load those jars for running integration tests.
   > 
   > ```diff
   > index 63fb959c..b77b4f64 100755
   > --- a/scripts/run_travis_tests.sh
   > +++ b/scripts/run_travis_tests.sh
   > @@ -35,7 +35,7 @@ elif [ "$mode" = "integration" ]; then
   >    export SPARK_HOME=$PWD/spark-${sparkVersion}-bin-hadoop${hadoopVersion}
   >    mkdir /tmp/spark-events/
   >    echo "Running Integration Tests"
   > -  mvn verify -Pintegration-tests -B
   > +  mvn verify -Pintegration-tests -Dscala-2.12 -B
   >  else
   >    echo "Unknown mode $mode"
   >    exit 1
   > ```
   
   Is this a permanent change or we just try to run test here. I ran into a scala class not found error when running docker integ testing for Hudi:
   
   ```
   20/10/21 00:04:17 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.
   Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.JavaConversions$.deprecated$u0020asScalaIterator(Ljava/util/Iterator;)Lscala/collection/Iterator;
   	at org.apache.hudi.IncrementalRelation.<init>(IncrementalRelation.scala:78)
   	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:95)
   	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:51)
   	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
   	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
   	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
   	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
   	at org.apache.hudi.utilities.sources.HoodieIncrSource.fetchNextBatch(HoodieIncrSource.java:122)
   	at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
   	at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:75)
   	at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:68)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:364)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:253)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:163)
   	at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:161)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:466)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
   	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
   	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
   	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
   	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
   	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
   	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
   	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   ```
   
   I suspect that although Hudi is able to pick the ```hudi-spark_2.12-bundle.jar``` but since the docker environment still uses spark_2.11, so there's still some conflict between scala 2.11 & 2.12.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer commented on a change in pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
sbernauer commented on a change in pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#discussion_r452348820



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -78,4 +79,21 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // First attempt to use spark2 API for deserialization, otherwise attempt with spark3 API
+    try {
+      val spark2method = encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
+      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
+    } catch {
+      case e: NoSuchElementException => spark3Deserialize(encoder, internalRow)

Review comment:
       I think `org.apache.spark.SPARK_VERSION` could help us out here. I've implemented it in this commit: https://github.com/sbernauer/hudi/commit/a4f1866f5be56639958479e9a597ae8c4d3d8f4f. But I noticed that it had a huge performance impact (I assume due to the reflection)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#discussion_r444564497



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -78,4 +79,21 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // First attempt to use spark2 API for deserialization, otherwise attempt with spark3 API
+    try {
+      val spark2method = encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
+      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
+    } catch {
+      case e: NoSuchElementException => spark3Deserialize(encoder, internalRow)

Review comment:
       wondering if we can use `SparkContext.version` or something to figure it out upfront..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-672262343


   @bschell : I gave it a shot on this. I don't have permission to push to your branch to update this PR. 
   Diff1: adding support to spark 3. but does not upgrade spark version in pom.xml : https://github.com/apache/hudi/pull/1950
   Diff2: also upgrades spark version to 3.0.0 : https://github.com/apache/hudi/pull/1951
   Diff2 results in [compilation failure](https://github.com/apache/hudi/pull/1951#issuecomment-672261179) as of the classes that Hoodie uses(SparkHadoopUtil) is not accessible anymore.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#discussion_r482174333



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -78,4 +79,21 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // First attempt to use spark2 API for deserialization, otherwise attempt with spark3 API
+    try {
+      val spark2method = encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
+      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
+    } catch {
+      case e: NoSuchElementException => spark3Deserialize(encoder, internalRow)

Review comment:
       @sbernauer 's check is better I think. 
   
   On the reflection itself, @bschell would it help if we cached the method objects, instead of recreating everytime. have you attempted anything like that? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] anniejacob commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
anniejacob commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-704722837


   @vinothchandar Has Hudi been tested for compatibility with spark v3.0.0 ? did a mvn build but pyspark throws a dependency error - org.apache.hudi#hudi-spark-bundle_2.12;0.6.1: not found


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#discussion_r466890310



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -78,4 +79,21 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // First attempt to use spark2 API for deserialization, otherwise attempt with spark3 API
+    try {
+      val spark2method = encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
+      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
+    } catch {
+      case e: NoSuchElementException => spark3Deserialize(encoder, internalRow)

Review comment:
       @nsivabalan are you able to take a swing at this for 0.6.0? this would be good to have 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-696264459


   @bschell our spark install may be 2.11 on these images. As for hudi_spark_2.12 bundle, if we run integ-test with 2_12, I think it would happen automatically?
   @bvaradar should we switch to 2.12 and repush the images? 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bschell commented on a change in pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bschell commented on a change in pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#discussion_r454038901



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -78,4 +79,21 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // First attempt to use spark2 API for deserialization, otherwise attempt with spark3 API
+    try {
+      val spark2method = encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
+      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
+    } catch {
+      case e: NoSuchElementException => spark3Deserialize(encoder, internalRow)

Review comment:
       Thanks for the suggestion! I will try making the same change as that commit you linked but I am going to see if I can test the performance impact first.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#discussion_r444563924



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -78,4 +79,21 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // First attempt to use spark2 API for deserialization, otherwise attempt with spark3 API
+    try {
+      val spark2method = encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
+      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
+    } catch {
+      case e: NoSuchElementException => spark3Deserialize(encoder, internalRow)

Review comment:
       how expensive would this be in practice? any thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-712505539


   >we want to make Hudi compile with spark 2 and then run with spark3?
   
   this was the intention. but as @bschell pointed out some classes have changed and we need to make parts of `hudi-spark` modular and plugin spark version specific implementations. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bvaradar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-697565174


   @bschell : Did you try the above change I mentioned ? Let me know 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-696264459


   @bschell our spark install may be 2.11 on these images. As for hudi_spark_2.12 bundle, if we run integ-test with 2_12, I think it would happen automatically?
   @bvaradar should we switch to 2.12 and repush the images? 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bschell commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bschell commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-686648563


   @vinothchandar Thank you for the suggestions! I agree with this approach and will try to update this PR to match.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar edited a comment on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bvaradar edited a comment on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-696339666


   @bschell : For running integration tests with hudi packages built with scala 2.12, we just need to change scripts/run_travis_tests.sh. The docker container should automatically load those jars for running integration tests.
   
   ```diff --git a/scripts/run_travis_tests.sh b/scripts/run_travis_tests.sh
   index 63fb959c..b77b4f64 100755
   --- a/scripts/run_travis_tests.sh
   +++ b/scripts/run_travis_tests.sh
   @@ -35,7 +35,7 @@ elif [ "$mode" = "integration" ]; then
      export SPARK_HOME=$PWD/spark-${sparkVersion}-bin-hadoop${hadoopVersion}
      mkdir /tmp/spark-events/
      echo "Running Integration Tests"
   -  mvn verify -Pintegration-tests -B
   +  mvn verify -Pintegration-tests -Dscala-2.12 -B
    else
      echo "Unknown mode $mode"
      exit 1
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-701531946


   @bschell if you could expand on them, we can hash out a solution. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bschell commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bschell commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-701529641


   In our testing we've found some larger scale issues with this approach and conflicts with hudi-0.6.0 refactors. Mainly with spark datasource api interface changes, need to reevaluate this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar edited a comment on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bvaradar edited a comment on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-696339666


   @bschell : For running integration tests with hudi packages built with scala 2.12, we just need to change scripts/run_travis_tests.sh. The docker container should automatically load those jars for running integration tests.
   
   ```diff --git a/scripts/run_travis_tests.sh b/scripts/run_travis_tests.sh
   index 63fb959c..b77b4f64 100755
   --- a/scripts/run_travis_tests.sh
   +++ b/scripts/run_travis_tests.sh
   @@ -35,7 +35,7 @@ elif [ "$mode" = "integration" ]; then
      export SPARK_HOME=$PWD/spark-${sparkVersion}-bin-hadoop${hadoopVersion}
      mkdir /tmp/spark-events/
      echo "Running Integration Tests"
   -  mvn verify -Pintegration-tests -B
   +  mvn verify -Pintegration-tests -Dscala-2.12 -B
    else
      echo "Unknown mode $mode"
      exit 1
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bschell commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bschell commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-695062230


   Seems like travis doesn't have the hudi_spark_2.12 bundle available, only the 2.11. Any suggestions?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-713993418


   Found another runtime error when updating MOR table:
   ```
   java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(Lorg/apache/spark/sql/SparkSession;Lscala/collection/Seq;Lscala/collection/immutable/Map;Lscala/Option;Lorg/apache/spark/sql/execution/datasources/FileStatusCache;)V
     at org.apache.hudi.HoodieSparkUtils$.createInMemoryFileIndex(HoodieSparkUtils.scala:72)
     at org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:127)
     at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:87)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:51)
     at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:339)
     at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:279)
     at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:268)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:268)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:214)
     ... 57 elided
   ```
   
   This is because the signature of ```org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>``` is different between spark 2.4.4 & spark 3.0.0.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi edited a comment on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
zhedoubushishi edited a comment on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-713294221


   > @bschell : For running integration tests with hudi packages built with scala 2.12, we just need to change scripts/run_travis_tests.sh. The docker container should automatically load those jars for running integration tests.
   > 
   > ```diff
   > index 63fb959c..b77b4f64 100755
   > --- a/scripts/run_travis_tests.sh
   > +++ b/scripts/run_travis_tests.sh
   > @@ -35,7 +35,7 @@ elif [ "$mode" = "integration" ]; then
   >    export SPARK_HOME=$PWD/spark-${sparkVersion}-bin-hadoop${hadoopVersion}
   >    mkdir /tmp/spark-events/
   >    echo "Running Integration Tests"
   > -  mvn verify -Pintegration-tests -B
   > +  mvn verify -Pintegration-tests -Dscala-2.12 -B
   >  else
   >    echo "Unknown mode $mode"
   >    exit 1
   > ```
   
   Is this a permanent change or we just try to run test here? @bschell @vinothchandar @bvaradar 
   
   I ran into a scala class not found error when running docker integ testing for Hudi:
   
   ```
   20/10/21 00:04:17 WARN SparkContext: Using an existing SparkContext; some configuration may not take effect.
   Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.JavaConversions$.deprecated$u0020asScalaIterator(Ljava/util/Iterator;)Lscala/collection/Iterator;
   	at org.apache.hudi.IncrementalRelation.<init>(IncrementalRelation.scala:78)
   	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:95)
   	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:51)
   	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
   	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
   	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
   	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
   	at org.apache.hudi.utilities.sources.HoodieIncrSource.fetchNextBatch(HoodieIncrSource.java:122)
   	at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
   	at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:75)
   	at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:68)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:364)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:253)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:163)
   	at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:161)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:466)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
   	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
   	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
   	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
   	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
   	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
   	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
   	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   ```
   
   I suspect that although Hudi is able to pick the ```hudi-spark_2.12-bundle.jar``` but since the docker environment still uses spark_2.11, so there's still some conflict between scala 2.11 & 2.12.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-683872339


   @bschell this is a heavily requested feature. are you still working on this? :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] umehrot2 closed pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
umehrot2 closed pull request #1760:
URL: https://github.com/apache/hudi/pull/1760


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] giaosudau edited a comment on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
giaosudau edited a comment on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-708212239


   Got hive class error 
   ```20/10/14 15:18:12 INFO HiveClientImpl: Warehouse location for Hive client (version 2.3.7) is /Users/giaosudau/workspace/etl-pipelines/spark-warehouse
   Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/metastore/HiveMetaHookLoader;Ljava/util/concurrent/ConcurrentHashMap;Ljava/lang/String;Z)Lorg/apache/hadoop/hive/metastore/IMetaStoreClient;
   	at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3600)
   	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3652)
   	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3632)
   	at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3894)
   	at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
   	at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
   	at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:388)
   	at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:332)
   	at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:312)
   	at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:288)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:260)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:286)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:389)
   	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221)
   	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
   	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
   	at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221)
   	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:137)
   	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:127)
   	at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:157)
   	at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:155)
   	at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:59)
   	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:93)
   	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:93)
   	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:206)
   	at org.apache.spark.sql.execution.command.CreateDatabaseCommand.run(ddl.scala:81)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
   	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
   	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
   	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
   	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
   	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
   	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
   	at bb.gtd.de.CanalJsonToOds$.initSchema(CanalJsonToOds.scala:81)
   	at bb.gtd.de.CanalJsonToOds$.delayedEndpoint$bb$gtd$de$CanalJsonToOds$1(CanalJsonToOds.scala:42)
   	at bb.gtd.de.CanalJsonToOds$delayedInit$body.apply(CanalJsonToOds.scala:11)
   	at scala.Function0.apply$mcV$sp(Function0.scala:39)
   	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
   	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
   	at scala.App.$anonfun$main$1$adapted(App.scala:80)
   	at scala.collection.immutable.List.foreach(List.scala:431)
   	at scala.App.main(App.scala:80)
   	at scala.App.main$(App.scala:78)
   	at bb.gtd.de.CanalJsonToOds$.main(CanalJsonToOds.scala:11)
   	at bb.gtd.de.CanalJsonToOds.main(CanalJsonToOds.scala)
   20/10/14 15:18:12 INFO SparkContext: Invoking stop() from shutdown hook
   20/10/14 15:18:12 INFO AbstractConnector: Stopped Spark@64b31700{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
   20/10/14 15:18:12 INFO SparkUI: Stopped Spark web UI at http://192.168.200.57:4041
   20/10/14 15:18:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
   20/10/14 15:18:12 INFO MemoryStore: MemoryStore cleared
   20/10/14 15:18:12 INFO BlockManager: BlockManager stopped
   20/10/14 15:18:12 INFO BlockManagerMaster: BlockManagerMaster stopped
   20/10/14 15:18:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
   20/10/14 15:18:12 INFO SparkContext: Successfully stopped SparkContext
   20/10/14 15:18:12 INFO ShutdownHookManager: Shutdown hook called
   20/10/14 15:18:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/vv/5d3clfpj22q_c12ghwdnpfl80000gn/T/spark-a0c47cb6-1512-4f8c-8d10-38e58796fed6
   Disconnected from the target VM, address: '127.0.0.1:50019', transport: 'socket'
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-685998284


   @bschell here's a path forward. 
   
   - We can create an abstraction for deserializing : `RowDeserializer` and which implements row. deser differently based on spark 2 and spark 3. 
   - Spark 2 impl lives in `hudi-spark` and we pick the deser impl based on spark version within `hudi-spark` itself using reflection. Note that we will create the instance alone this way, i.e reflection used only for initing the deserializer object
   - lets create a new module `hudi-spark3` which contain just the one class that implementation the deserialization using spark 3 apis, we will override the spark version to 3 for that module alone. i.e this class alone gets compiled against spark 3. (this needs to be confirmed)
   - we include `hudi-spark3` in the spark bundle. Spark is anyway a provided dependency, so as long as we invoke the spark3 deserializer only with the spark 3 jars around, things should work well. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bschell commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bschell commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-701540715


   @vinothchandar We are compiling the issues right now. I will update here once we are finished later today/tomorrow


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] aniejo commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
aniejo commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-706900383


   @vinothchandar @bschell  appreciate any guidance regarding this error, I am using COW hudi options on Spark 3.0
   pyspark --packages org.apache.hudi:hudi-spark-bundle_2.12:0.6.1-SNAPSHOT,org.apache.spark:spark-avro_2.12:3.0.1 
   
   df.write.format("hudi"). \
     options(**hudi_options). \
     mode("overwrite"). \
     save(basePath)
     
   On writing df as hudi parquet , it throws the below error : 
   
   to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, 6049cd42243a, executor driver): java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
           at org.apache.hudi.AvroConversionHelper$.createConverterToAvro(AvroConversionHelper.scala:344)
           at org.apache.hudi.AvroConversionUtils$$anonfun$2.apply(AvroConversionUtils.scala:50)
           at org.apache.hudi.AvroConversionUtils$$anonfun$2.apply(AvroConversionUtils.scala:47)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:127)
           at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   
   Driver stacktrace:
           at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
           at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
           at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
           at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
           at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
           at scala.Option.foreach(Option.scala:407)
           at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
           at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
           at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
           at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1423)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
           at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
           at org.apache.spark.rdd.RDD.take(RDD.scala:1396)
           at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1531)
           at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
           at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
           at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1531)
           at org.apache.spark.api.java.JavaRDDLike.isEmpty(JavaRDDLike.scala:544)
           at org.apache.spark.api.java.JavaRDDLike.isEmpty$(JavaRDDLike.scala:544)
           at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
           at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:162)
           at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:125)
           at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
           at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
           at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
           at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
           at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
           at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
           at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
           at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
           at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
           at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
           at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
           at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
           at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
           at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
           at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
           at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
           at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
           at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
           at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
           at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
           at py4j.Gateway.invoke(Gateway.java:282)
           at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
           at py4j.commands.CallCommand.execute(CallCommand.java:79)
           at py4j.GatewayConnection.run(GatewayConnection.java:238)
           at java.lang.Thread.run(Thread.java:748)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] umehrot2 commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
umehrot2 commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-720904231


   The work for spark 3 has been moved over to https://github.com/apache/hudi/pull/2208 . Hence closing this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-672053354


   @bschell is this tested and ready to go? would like to get it into the RC if possible
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bschell commented on a change in pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bschell commented on a change in pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#discussion_r444566173



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -78,4 +79,21 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // First attempt to use spark2 API for deserialization, otherwise attempt with spark3 API
+    try {
+      val spark2method = encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
+      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
+    } catch {
+      case e: NoSuchElementException => spark3Deserialize(encoder, internalRow)

Review comment:
       I think checking spark version would be a good idea to prevent the failed call everytime, let me look into it. I don't believe the reflection performance is significant as a whole, especially if we can figure out the spark version upfront. We do something similar here:
   https://github.com/apache/hudi/pull/1638/commits/9f1284374e72717222c51ea681dc2a5ceb696a50




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi edited a comment on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
zhedoubushishi edited a comment on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-713049606


   > Got hive class error
   > 
   > ```
   > Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/metastore/HiveMetaHookLoader;Ljava/util/concurrent/ConcurrentHashMap;Ljava/lang/String;Z)Lorg/apache/hadoop/hive/metastore/IMetaStoreClient;
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3600)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3652)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3632)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3894)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:388)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:332)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:312)
   > 	at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:288)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:260)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:286)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
   > 	at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:389)
   > 	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221)
   > 	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
   > 	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
   > 	at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221)
   > 	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:137)
   > 	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:127)
   > 	at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:157)
   > 	at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:155)
   > 	at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:59)
   > 	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:93)
   > 	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:93)
   > 	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:206)
   > 	at org.apache.spark.sql.execution.command.CreateDatabaseCommand.run(ddl.scala:81)
   > 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
   > 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
   > 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
   > 	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
   > 	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
   > 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
   > 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
   > 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
   > 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   > 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
   > 	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
   > 	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
   > 	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
   > 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   > 	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
   > 	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
   > 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   > 	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
   > 	at bb.gtd.de.CanalJsonToOds$.initSchema(CanalJsonToOds.scala:81)
   > 	at bb.gtd.de.CanalJsonToOds$.delayedEndpoint$bb$gtd$de$CanalJsonToOds$1(CanalJsonToOds.scala:42)
   > 	at bb.gtd.de.CanalJsonToOds$delayedInit$body.apply(CanalJsonToOds.scala:11)
   > 	at scala.Function0.apply$mcV$sp(Function0.scala:39)
   > 	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
   > 	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
   > 	at scala.App.$anonfun$main$1$adapted(App.scala:80)
   > 	at scala.collection.immutable.List.foreach(List.scala:431)
   > 	at scala.App.main(App.scala:80)
   > 	at scala.App.main$(App.scala:78)
   > 	at bb.gtd.de.CanalJsonToOds$.main(CanalJsonToOds.scala:11)
   > 	at bb.gtd.de.CanalJsonToOds.main(CanalJsonToOds.scala)
   > 20/10/14 15:18:12 INFO SparkContext: Invoking stop() from shutdown hook
   > 20/10/14 15:18:12 INFO AbstractConnector: Stopped Spark@64b31700{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
   > 20/10/14 15:18:12 INFO SparkUI: Stopped Spark web UI at http://192.168.200.57:4041
   > 20/10/14 15:18:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
   > 20/10/14 15:18:12 INFO MemoryStore: MemoryStore cleared
   > 20/10/14 15:18:12 INFO BlockManager: BlockManager stopped
   > 20/10/14 15:18:12 INFO BlockManagerMaster: BlockManagerMaster stopped
   > 20/10/14 15:18:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
   > 20/10/14 15:18:12 INFO SparkContext: Successfully stopped SparkContext
   > 20/10/14 15:18:12 INFO ShutdownHookManager: Shutdown hook called
   > 20/10/14 15:18:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/vv/5d3clfpj22q_c12ghwdnpfl80000gn/T/spark-a0c47cb6-1512-4f8c-8d10-38e58796fed6
   > Disconnected from the target VM, address: '127.0.0.1:50019', transport: 'socket'
   > ```
   
   I suspect that this is because Spark 3.0.0 uses Hive [2.3.7](https://github.com/apache/spark/blob/v3.0.0/pom.xml#L130) but Spark 2.x uses Hive [1.2.1.spark2](https://github.com/apache/spark/blob/v2.4.0/pom.xml#L129) and this causes some API conflict. 
   
   Looks like this signature: ```org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/metastore/HiveMetaHookLoader;Ljava/util/concurrent/ConcurrentHashMap;Ljava/lang/String;Z)``` only exists in Hive ```1.2.1.spark2```: https://github.com/JoshRosen/hive/blob/release-1.2.1-spark2/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java#L101 but no longer exists in Hive ```2.3.7```: https://github.com/apache/hive/blob/rel/release-2.3.7/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java.
   
   If we compile with Spark 2 and then run with Spark 3, we will run into this kind of issue.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bvaradar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-696339666


   @bschell : For running integration tests with hudi packages built with scala 2.12, we just need to change scripts/run_travis_tests.sh. The docker container should automatically load those jars for running integration tests.
   
   `diff --git a/scripts/run_travis_tests.sh b/scripts/run_travis_tests.sh
   index 63fb959c..b77b4f64 100755
   --- a/scripts/run_travis_tests.sh
   +++ b/scripts/run_travis_tests.sh
   @@ -35,7 +35,7 @@ elif [ "$mode" = "integration" ]; then
      export SPARK_HOME=$PWD/spark-${sparkVersion}-bin-hadoop${hadoopVersion}
      mkdir /tmp/spark-events/
      echo "Running Integration Tests"
   -  mvn verify -Pintegration-tests -B
   +  mvn verify -Pintegration-tests -Dscala-2.12 -B
    else
      echo "Unknown mode $mode"
      exit 1
   `


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bschell commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bschell commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-672067962


   @vinothchandar While this works, the reflection does hurt performance as this is a frequently used path. I was looking into any better options to workaround the performance hit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-720399687






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#discussion_r444578021



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -78,4 +79,21 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // First attempt to use spark2 API for deserialization, otherwise attempt with spark3 API
+    try {
+      val spark2method = encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
+      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
+    } catch {
+      case e: NoSuchElementException => spark3Deserialize(encoder, internalRow)

Review comment:
       I was concerned about the exception thrown each time.. in the case you mentioned, its obtaining the description once.. this would be in the fast path.. Anyways.. lets hope the version thing is feasible..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] giaosudau commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
giaosudau commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-708212239


   Got hive class error 
   ```20/10/14 15:18:12 INFO HiveClientImpl: Warehouse location for Hive client (version 2.3.7) is /Users/chanhtrungle/workspace/etl-pipelines/spark-warehouse
   Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Lorg/apache/hadoop/hive/conf/HiveConf;Lorg/apache/hadoop/hive/metastore/HiveMetaHookLoader;Ljava/util/concurrent/ConcurrentHashMap;Ljava/lang/String;Z)Lorg/apache/hadoop/hive/metastore/IMetaStoreClient;
   	at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3600)
   	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3652)
   	at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3632)
   	at org.apache.hadoop.hive.ql.metadata.Hive.getAllFunctions(Hive.java:3894)
   	at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:248)
   	at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
   	at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:388)
   	at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:332)
   	at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:312)
   	at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:288)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:260)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:286)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
   	at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:389)
   	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:221)
   	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
   	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
   	at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:221)
   	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:137)
   	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:127)
   	at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:157)
   	at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:155)
   	at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$2(HiveSessionStateBuilder.scala:59)
   	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:93)
   	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:93)
   	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:206)
   	at org.apache.spark.sql.execution.command.CreateDatabaseCommand.run(ddl.scala:81)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
   	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
   	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
   	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
   	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
   	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
   	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
   	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
   	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
   	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:606)
   	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
   	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
   	at bb.gtd.de.CanalJsonToOds$.initSchema(CanalJsonToOds.scala:81)
   	at bb.gtd.de.CanalJsonToOds$.delayedEndpoint$bb$gtd$de$CanalJsonToOds$1(CanalJsonToOds.scala:42)
   	at bb.gtd.de.CanalJsonToOds$delayedInit$body.apply(CanalJsonToOds.scala:11)
   	at scala.Function0.apply$mcV$sp(Function0.scala:39)
   	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
   	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
   	at scala.App.$anonfun$main$1$adapted(App.scala:80)
   	at scala.collection.immutable.List.foreach(List.scala:431)
   	at scala.App.main(App.scala:80)
   	at scala.App.main$(App.scala:78)
   	at bb.gtd.de.CanalJsonToOds$.main(CanalJsonToOds.scala:11)
   	at bb.gtd.de.CanalJsonToOds.main(CanalJsonToOds.scala)
   20/10/14 15:18:12 INFO SparkContext: Invoking stop() from shutdown hook
   20/10/14 15:18:12 INFO AbstractConnector: Stopped Spark@64b31700{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
   20/10/14 15:18:12 INFO SparkUI: Stopped Spark web UI at http://192.168.200.57:4041
   20/10/14 15:18:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
   20/10/14 15:18:12 INFO MemoryStore: MemoryStore cleared
   20/10/14 15:18:12 INFO BlockManager: BlockManager stopped
   20/10/14 15:18:12 INFO BlockManagerMaster: BlockManagerMaster stopped
   20/10/14 15:18:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
   20/10/14 15:18:12 INFO SparkContext: Successfully stopped SparkContext
   20/10/14 15:18:12 INFO ShutdownHookManager: Shutdown hook called
   20/10/14 15:18:12 INFO ShutdownHookManager: Deleting directory /private/var/folders/vv/5d3clfpj22q_c12ghwdnpfl80000gn/T/spark-a0c47cb6-1512-4f8c-8d10-38e58796fed6
   Disconnected from the target VM, address: '127.0.0.1:50019', transport: 'socket'
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-672262343


   @bschell : I have it a shot on this. I don't have permission to push to your branch to update this PR. 
   Diff1: adding support to spark 3. but does not upgrade spark version in pom.xml : https://github.com/apache/hudi/pull/1950
   Diff2: also upgrades spark version to 3.0.0 : https://github.com/apache/hudi/pull/1951
   Diff2 results in compilation failure as of the classes that Hoodie uses(SparkHadoopUtil) is not accessible anymore.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-672262343


   @bschell : I gave it a shot on this. I don't have permission to push to your branch to update this PR. 
   Diff1: adding support to spark 3. but does not upgrade spark version in pom.xml : https://github.com/apache/hudi/pull/1950
   Diff2: also upgrades spark version to 3.0.0 : https://github.com/apache/hudi/pull/1951
   Diff2 results in [compilation failure](https://github.com/apache/hudi/pull/1951#issuecomment-672261179) as one of the classes that Hoodie uses(SparkHadoopUtil) is not accessible anymore.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-672262343


   @bschell @vinothchandar : I gave it a shot on this. I don't have permission to push to your branch to update this PR. 
   Diff1: adding support to spark 3. but does not upgrade spark version in pom.xml : https://github.com/apache/hudi/pull/1950
   Diff2: also upgrades spark version to 3.0.0 : https://github.com/apache/hudi/pull/1951
   Diff2 results in [compilation failure](https://github.com/apache/hudi/pull/1951#issuecomment-672261179) as one of the classes that Hoodie uses(SparkHadoopUtil) is not accessible anymore.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer commented on a change in pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
sbernauer commented on a change in pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#discussion_r452780045



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -78,4 +79,21 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // First attempt to use spark2 API for deserialization, otherwise attempt with spark3 API
+    try {
+      val spark2method = encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
+      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
+    } catch {
+      case e: NoSuchElementException => spark3Deserialize(encoder, internalRow)

Review comment:
       I would suggest to change the method to something like this
   ```
   import org.apache.spark.SPARK_VERSION
   
     private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
       // TODO remove reflection if Spark 2.x support is dropped
       if (SPARK_VERSION.startsWith("2.")) {
         val spark2method = encoder.getClass.getMethod("fromRow", classOf[InternalRow])
         spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
       } else {
         val deserializer = encoder.getClass.getMethod("createDeserializer").invoke(encoder)
         val aboveSpark2method = deserializer.getClass.getMethod("apply", classOf[InternalRow])
         aboveSpark2method.invoke(deserializer, internalRow).asInstanceOf[Row]
       }
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bschell commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
bschell commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-700137656


   Looks like only integ tests are failing, but I'm not sure of the error. Will take a closer look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-706791396


   @aniejo there are some known issues since some spark APIs have changed in 3. 
   
   @bschell any updates for us? This is being requested heavily, love to do this sooner if possible. are we blocked on something? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer commented on a change in pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
sbernauer commented on a change in pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#discussion_r455542800



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -78,4 +79,21 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  private def deserializeRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow): Row = {
+    // First attempt to use spark2 API for deserialization, otherwise attempt with spark3 API
+    try {
+      val spark2method = encoder.getClass.getMethods.filter(method => method.getName.equals("fromRow")).last
+      spark2method.invoke(encoder, internalRow).asInstanceOf[Row]
+    } catch {
+      case e: NoSuchElementException => spark3Deserialize(encoder, internalRow)

Review comment:
       The commit with all the patches (including better reflection) is this here: https://github.com/sbernauer/hudi/commit/0dd7172521a92d9a13425dde4a23c565cb37a533




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi edited a comment on pull request #1760: [HUDI-1040] Update apis for spark3 compatibility

Posted by GitBox <gi...@apache.org>.
zhedoubushishi edited a comment on pull request #1760:
URL: https://github.com/apache/hudi/pull/1760#issuecomment-712391147


   @bschell @vinothchandar to make clear, just wondering what is the exact goal for this pr? Do we want to make Hudi support both compile & run with spark 3 or we want to make Hudi compile with spark 2 and then run with spark3?
   
   Ideally we should make Hudi both compile and run with Spark3. But current code change cannot compile with spark 3.
   
   Run
   
   ```
   mvn clean install -DskipTests -DskipITs -Dspark.version=3.0.0 -Pscala-2.12
   ```
   
   returns
   
   ```
   [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile (default-testCompile) on project hudi-client: Compilation failure
   [ERROR] /Users/wenningd/workplace/hudi/hudi-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java:[146,27] cannot find symbol
   [ERROR]   symbol:   method toRow(org.apache.spark.sql.Row)
   [ERROR]   location: variable encoder of type org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
   [ERROR] 
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org