You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/05/02 09:31:00 UTC

[GitHub] [incubator-hudi] xushiyan opened a new pull request #1584: fix schema provider issue

xushiyan opened a new pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584


   When no new data is fetched after reading from row source (like parquet), schema provider cannot be inferred and calling `getSchemaProvider()` results in HoodieException and requires schema provider to be set. This is not an ideal case for reading from schema-inferable source. Return `null` schema provider in case of no new data should be allowed.
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] xushiyan commented on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
xushiyan commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-628299745


   @pratyakshsharma I had a new proposed change reflected in the last commit. 
   
   The idea is to not throw exception when no data is fetched, so this is to loosen a bit on throwing exception and asking user to set the class. If any data is fetched, then it is still the old requirement on setting schema provider.
   
   This should work for ROW source case where users can totally forget about schema provider setting.
   
   cc @vinothchandar @afilipchik 
   
   Please kindly verify the changes and see if the proposal works or if i overlooked any side effect. Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#discussion_r419150921



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -276,6 +276,8 @@ private void refreshTimeline() throws IOException {
       // to generic records for writing
       InputBatch<Dataset<Row>> dataAndCheckpoint =
           formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
+      SchemaProvider schemaProviderFromFetched = dataAndCheckpoint.getBatch().isPresent()
+          ? dataAndCheckpoint.getSchemaProvider() : null;

Review comment:
       when fetching in row format, this change should not be needed since RowBasedSchemaProvider is already getting initialised at the end. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#discussion_r422366002



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
##########
@@ -64,19 +64,22 @@ public SourceFormatAdapter(Source source) {
       }
       case ROW: {
         InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
-        return new InputBatch<>(Option.ofNullable(r.getBatch().map(
-            rdd -> (
-                (r.getSchemaProvider() instanceof FilebasedSchemaProvider)
-                    // If the source schema is specified through Avro schema,
-                    // pass in the schema for the Row-to-Avro conversion
-                    // to avoid nullability mismatch between Avro schema and Row schema
-                    ? AvroConversionUtils.createRdd(
-                        rdd, r.getSchemaProvider().getSourceSchema(),
-                        HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                    : AvroConversionUtils.createRdd(
-                        rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                ))
-            .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (r.getBatch().isPresent()) {

Review comment:
       > I'm referring tofetchNewDataInAvroFormat() being called when transformer is not set in case of ROW type source and no new data
   
   Yeah I got that. I was actually thinking on the following lines - Avro relies on schemas. So the original thinking behind writing the function `fetchNewDataInAvroFormat()` might have been that schema provider should be pre specified. 
   
   > it conflicts with ROW source getting implicit schema. It seems like a usability issue.
   
   But the above point is valid. I guess you are correct :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] bvaradar merged pull request #1584: [HUDI-902] Avoid exception when getSchemaProvider

Posted by GitBox <gi...@apache.org>.
bvaradar merged pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#discussion_r419148578



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
##########
@@ -64,19 +64,22 @@ public SourceFormatAdapter(Source source) {
       }
       case ROW: {
         InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
-        return new InputBatch<>(Option.ofNullable(r.getBatch().map(
-            rdd -> (
-                (r.getSchemaProvider() instanceof FilebasedSchemaProvider)
-                    // If the source schema is specified through Avro schema,
-                    // pass in the schema for the Row-to-Avro conversion
-                    // to avoid nullability mismatch between Avro schema and Row schema
-                    ? AvroConversionUtils.createRdd(
-                        rdd, r.getSchemaProvider().getSourceSchema(),
-                        HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                    : AvroConversionUtils.createRdd(
-                        rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                ))
-            .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (r.getBatch().isPresent()) {
+          return new InputBatch<>(r.getBatch().map(
+              rdd -> (
+                  (r.getSchemaProvider() instanceof FilebasedSchemaProvider)

Review comment:
       So this is the line where you mentioned the exception is thrown for you. Am I correct? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] xushiyan commented on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
xushiyan commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-622926124


   @vinothchandar please kindly verify the described scenario above. In case I misunderstood some logic or use case, I'll close the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] xushiyan commented on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
xushiyan commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-625445181


   > Probably outside of the scope of this PR, but, would it be a good idea to split schema provider into 2 separate source and target ones? I contributed null target schema provider and it can be avoided by just assuming that either source or target can be inferred (from source or from transformation)
   
   @afilipchik Thanks for sharing this info. It does sound good idea to have some flexibility there.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#discussion_r419150151



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
##########
@@ -64,19 +64,22 @@ public SourceFormatAdapter(Source source) {
       }
       case ROW: {
         InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
-        return new InputBatch<>(Option.ofNullable(r.getBatch().map(
-            rdd -> (
-                (r.getSchemaProvider() instanceof FilebasedSchemaProvider)
-                    // If the source schema is specified through Avro schema,
-                    // pass in the schema for the Row-to-Avro conversion
-                    // to avoid nullability mismatch between Avro schema and Row schema
-                    ? AvroConversionUtils.createRdd(
-                        rdd, r.getSchemaProvider().getSourceSchema(),
-                        HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                    : AvroConversionUtils.createRdd(
-                        rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                ))
-            .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (r.getBatch().isPresent()) {

Review comment:
       If I understand correctly, not specifying schema provider should be feasible in case of row based sources when you try to fetch the data in row format (i.e when using transformers).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] xushiyan commented on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
xushiyan commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-623232249


   @pratyakshsharma thanks for checking..will circle back to your comments later. :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] xushiyan commented on a change in pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#discussion_r419919337



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
##########
@@ -64,19 +64,22 @@ public SourceFormatAdapter(Source source) {
       }
       case ROW: {
         InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
-        return new InputBatch<>(Option.ofNullable(r.getBatch().map(
-            rdd -> (
-                (r.getSchemaProvider() instanceof FilebasedSchemaProvider)
-                    // If the source schema is specified through Avro schema,
-                    // pass in the schema for the Row-to-Avro conversion
-                    // to avoid nullability mismatch between Avro schema and Row schema
-                    ? AvroConversionUtils.createRdd(
-                        rdd, r.getSchemaProvider().getSourceSchema(),
-                        HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                    : AvroConversionUtils.createRdd(
-                        rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                ))
-            .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (r.getBatch().isPresent()) {

Review comment:
       > If I understand correctly, not specifying schema provider should be feasible in case of row based sources when you try to fetch the data in row format (i.e when using transformers).
   
   hi @pratyakshsharma I don't quite get this point. I'm referring to`fetchNewDataInAvroFormat()` being called when transformer is not set
   https://github.com/apache/incubator-hudi/blob/14d4fea8339913c0df8ea829036a45a187c55208/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L305-L306
   in case of ROW type source and no new data, `r.getSchemaProvider()` will throw an exception. 
   
   An example use case: reading a parquet source (ROW format) without setting either transformer or schemaprovider class, it works when new data keeps coming. But an exception will be thrown when no new data detected, calling `r.getSchemaProvider()` in this switch case will ask user to set schemaprovider.
   
   If in this example we still require users to set schema provider (like the built-in RowBasedSchemaProvider), then it conflicts with ROW source getting implicit schema. It seems like a usability issue.
   
   Could you confirm this understanding is accurate please?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-628828057


   To reduce context switch, assigning to @bvaradar who is looking into couple other PRs around schema provider 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#discussion_r419149028



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
##########
@@ -64,19 +64,22 @@ public SourceFormatAdapter(Source source) {
       }
       case ROW: {
         InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
-        return new InputBatch<>(Option.ofNullable(r.getBatch().map(
-            rdd -> (
-                (r.getSchemaProvider() instanceof FilebasedSchemaProvider)
-                    // If the source schema is specified through Avro schema,
-                    // pass in the schema for the Row-to-Avro conversion
-                    // to avoid nullability mismatch between Avro schema and Row schema
-                    ? AvroConversionUtils.createRdd(
-                        rdd, r.getSchemaProvider().getSourceSchema(),
-                        HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                    : AvroConversionUtils.createRdd(
-                        rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                ))
-            .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (r.getBatch().isPresent()) {

Review comment:
       I think since this method tries to fetch data in avro format, pre specifying a schema provider is mandatory. So even if you do not get any data, you should mention RowBasedSchemaProvider as the schema provider in the very beginning. If that is done, there is no need to do this change I believe. :) 
   Do you face issues after pre specifying schema provider? 
   Please let me know your thoughts on this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] xushiyan commented on a change in pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#discussion_r419919337



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
##########
@@ -64,19 +64,22 @@ public SourceFormatAdapter(Source source) {
       }
       case ROW: {
         InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
-        return new InputBatch<>(Option.ofNullable(r.getBatch().map(
-            rdd -> (
-                (r.getSchemaProvider() instanceof FilebasedSchemaProvider)
-                    // If the source schema is specified through Avro schema,
-                    // pass in the schema for the Row-to-Avro conversion
-                    // to avoid nullability mismatch between Avro schema and Row schema
-                    ? AvroConversionUtils.createRdd(
-                        rdd, r.getSchemaProvider().getSourceSchema(),
-                        HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                    : AvroConversionUtils.createRdd(
-                        rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                ))
-            .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (r.getBatch().isPresent()) {

Review comment:
       > If I understand correctly, not specifying schema provider should be feasible in case of row based sources when you try to fetch the data in row format (i.e when using transformers).
   
   hi @pratyakshsharma I don't quite get this point. I'm referring to`fetchNewDataInAvroFormat()` being called when transformer is not set
   https://github.com/apache/incubator-hudi/blob/14d4fea8339913c0df8ea829036a45a187c55208/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L305-L306
   in case of ROW type source and no new data, `r.getSchemaProvider()` will throw an exception. 
   
   An example use case: reading a parquet source (ROW format) without setting either transformer or schemaprovider class, it works when new data keeps coming. But an exception will be thrown when no new data detected, calling `r.getSchemaProvider()` in this switch case will ask user to set schemaprovider.
   
   If in this example we still require users to set schema provider, then it conflicts with ROW source getting implicit schema. It seems like a usability issue.
   
   Could you confirm this understanding is accurate please?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] xushiyan commented on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
xushiyan commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-629472943


   Ok @bvaradar thanks for checking. I shall be able to do it in the late afternoon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] xushiyan commented on pull request #1584: [HUDI-902] Avoid exception when getSchemaProvider

Posted by GitBox <gi...@apache.org>.
xushiyan commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-629576325


   @bvaradar The CI passed. It's ready for review now. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] afilipchik commented on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
afilipchik commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-623841301


   Probably outside of the scope of this PR, but, would it be a good idea to split schema provider into 2 separate source and target ones? I contributed null target schema provider and it can be avoided by just assuming that either source or target can be inferred (from source or from transformation)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] xushiyan edited a comment on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
xushiyan edited a comment on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-628299745


   @pratyakshsharma I had a new proposed change reflected in the last commit. 
   
   The idea is to not throw exception when no data is fetched, so this is to loosen a bit on throwing exception and asking user to set the class. If any data is fetched, then it is still the old requirement on setting schema provider.
   
   This should work for ROW source case where users can totally forget about schema provider setting. For all data source types, we don't care about the schema if no data is fetched.
   
   cc @vinothchandar @afilipchik 
   
   Please kindly verify the changes and see if the proposal works or if i overlooked any side effect. Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#discussion_r422367201



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -276,6 +276,8 @@ private void refreshTimeline() throws IOException {
       // to generic records for writing
       InputBatch<Dataset<Row>> dataAndCheckpoint =
           formatAdapter.fetchNewDataInRowFormat(resumeCheckpointStr, cfg.sourceLimit);
+      SchemaProvider schemaProviderFromFetched = dataAndCheckpoint.getBatch().isPresent()
+          ? dataAndCheckpoint.getSchemaProvider() : null;

Review comment:
       @xushiyan Here this change should not be needed. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] xushiyan edited a comment on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
xushiyan edited a comment on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-622926124


   @vinothchandar please kindly verify the described scenario above. If this make sense, I may add a test case for it. In case I misunderstood the logic or use case, I'll close the PR. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] bvaradar commented on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
bvaradar commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-629467290


   @xushiyan : The idea and code changes looks good to me. Can you add a jira ticket and add an unit-test to include this change. It would be great if you could get this to 0.5.3. If you cannot get this today, let me know and I will try to help 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-628315469


   Spending today and tomorrow on all the schema PRs.. stay tuned :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#discussion_r419151032



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -298,15 +300,15 @@ private void refreshTimeline() throws IOException {
       // default to RowBasedSchemaProvider
       schemaProvider = this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null
           ? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())).orElse(
-          dataAndCheckpoint.getSchemaProvider())
+          schemaProviderFromFetched)
           : this.schemaProvider;
     } else {
       // Pull the data from the source & prepare the write
       InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
           formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, cfg.sourceLimit);
       avroRDDOptional = dataAndCheckpoint.getBatch();
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
-      schemaProvider = dataAndCheckpoint.getSchemaProvider();
+      schemaProvider = avroRDDOptional.isPresent() ? dataAndCheckpoint.getSchemaProvider() : null;

Review comment:
       please refer to my other comment on the changes in SourceFormatAdapter. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] xushiyan commented on a change in pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
xushiyan commented on a change in pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#discussion_r419919337



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
##########
@@ -64,19 +64,22 @@ public SourceFormatAdapter(Source source) {
       }
       case ROW: {
         InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit);
-        return new InputBatch<>(Option.ofNullable(r.getBatch().map(
-            rdd -> (
-                (r.getSchemaProvider() instanceof FilebasedSchemaProvider)
-                    // If the source schema is specified through Avro schema,
-                    // pass in the schema for the Row-to-Avro conversion
-                    // to avoid nullability mismatch between Avro schema and Row schema
-                    ? AvroConversionUtils.createRdd(
-                        rdd, r.getSchemaProvider().getSourceSchema(),
-                        HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                    : AvroConversionUtils.createRdd(
-                        rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()
-                ))
-            .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (r.getBatch().isPresent()) {

Review comment:
       > If I understand correctly, not specifying schema provider should be feasible in case of row based sources when you try to fetch the data in row format (i.e when using transformers).
   
   hi @pratyakshsharma I don't quite get this point. I'm referring to`fetchNewDataInAvroFormat()` being called when transformer is not set
   https://github.com/apache/incubator-hudi/blob/14d4fea8339913c0df8ea829036a45a187c55208/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L305-L306
   in case of ROW type source and no new data, `r.getSchemaProvider()` will throw an exception. 
   
   An example use case: reading a parquet source (ROW format) without setting either transformer or schemaprovider class, it works when new data keeps coming. But an exception will be thrown when no new data detected, calling `r.getSchemaProvider()` in this line below will throw and ask user to set schemaprovider.
   
   https://github.com/apache/incubator-hudi/blob/d0ee95ed16de6c3568f575169cb993b9c10ced3d/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java#L79
   
   If in this example we still require users to set schema provider (like the built-in RowBasedSchemaProvider), then it conflicts with ROW source getting implicit schema. It seems like a usability issue.
   
   Could you confirm this understanding is accurate please?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-625442569


   @xushiyan please allow me one more day's time, will address your comments then. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on pull request #1584: fix schema provider issue

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1584:
URL: https://github.com/apache/incubator-hudi/pull/1584#issuecomment-623133003


   Sure. will do tonight ! cc @pratyakshsharma as well. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org