You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/04/13 07:15:13 UTC

[GitHub] [incubator-hudi] hddong opened a new pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

hddong opened a new pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511
 
 
   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   *In `HDFSParquetImporter`, `upsert` is equivalent to `insert` (remove old metadata, then create and insert data). But `upsert` means update and insert on old data. *
   
   ## Brief change log
   
   *(for example:)*
     - *Adjust logic of upsert in HDFSParquetImporter*
   
   ## Verify this pull request
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] hmatu commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
hmatu commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r407815629
 
 

 ##########
 File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
 ##########
 @@ -100,6 +100,10 @@ public static void main(String[] args) {
 
   }
 
+  private boolean isUpsert() {
 
 Review comment:
   use boolean `flag` ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409395668
 
 

 ##########
 File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
 ##########
 @@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+    // Create schema file.
+    String schemaFile = new Path(basePath, "file.schema").toString();
+    createSchemaFile(schemaFile);
+
+    HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+  }
+
+  /**
+   * Test successful insert and verify data consistency.
+   */
+  @Test
+  public void testImportInsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      List<HoodieModel> expected = insertData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  /**
+   * Test upsert data and verify data consistency.
+   */
+  @Test
+  public void testImportWithUpsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+
+      // Create schema file.
+      String schemaFile = new Path(basePath, "file.schema").toString();
+
+      Path upsertFolder = new Path(basePath, "testUpsertSrc");
+      List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+      HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+          "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+      cfg.command = "upsert";
+      HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+      dataImporter.dataImport(jsc, 0);
+
+      // construct result, remove top 10 and add upsert data.
+      List<GenericRecord> expectData = insertData.subList(11, 96);
+      expectData.addAll(upsertData);
+
+      // read latest data
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      // get expected result
+      List<HoodieModel> expected = expectData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
 
 Review comment:
   I would suggest one field one line for so many arguments.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] codecov-io commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#issuecomment-612848674
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=h1) Report
   > Merging [#1511](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/f5f34bb1c16e6d070668486eba2a29f554c0bbc7&el=desc) will **decrease** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1511/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1511      +/-   ##
   ============================================
   - Coverage     72.15%   72.12%   -0.03%     
   + Complexity      290      289       -1     
   ============================================
     Files           338      365      +27     
     Lines         15929    16228     +299     
     Branches       1625     1632       +7     
   ============================================
   + Hits          11494    11705     +211     
   - Misses         3704     3790      +86     
   - Partials        731      733       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../org/apache/hudi/table/HoodieCopyOnWriteTable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvSG9vZGllQ29weU9uV3JpdGVUYWJsZS5qYXZh) | `69.64% <0.00%> (-19.65%)` | `0.00% <0.00%> (ø%)` | |
   | [...hudi/common/fs/inline/InLineFsDataInputStream.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9JbkxpbmVGc0RhdGFJbnB1dFN0cmVhbS5qYXZh) | `38.46% <0.00%> (-15.39%)` | `0.00% <0.00%> (ø%)` | |
   | [...ache/hudi/common/fs/inline/InMemoryFileSystem.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9Jbk1lbW9yeUZpbGVTeXN0ZW0uamF2YQ==) | `79.31% <0.00%> (-10.35%)` | `0.00% <0.00%> (ø%)` | |
   | [.../org/apache/hudi/table/HoodieMergeOnReadTable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvSG9vZGllTWVyZ2VPblJlYWRUYWJsZS5qYXZh) | `73.33% <0.00%> (-9.80%)` | `0.00% <0.00%> (ø%)` | |
   | [.../apache/hudi/client/AbstractHoodieWriteClient.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpZW50L0Fic3RyYWN0SG9vZGllV3JpdGVDbGllbnQuamF2YQ==) | `68.50% <0.00%> (-5.65%)` | `0.00% <0.00%> (ø%)` | |
   | [...e/hudi/common/table/log/HoodieLogFormatWriter.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9Ib29kaWVMb2dGb3JtYXRXcml0ZXIuamF2YQ==) | `75.00% <0.00%> (-1.93%)` | `0.00% <0.00%> (ø%)` | |
   | [...n/java/org/apache/hudi/index/hbase/HBaseIndex.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaW5kZXgvaGJhc2UvSEJhc2VJbmRleC5qYXZh) | `83.25% <0.00%> (-0.96%)` | `0.00% <0.00%> (ø%)` | |
   | [...oop/realtime/HoodieParquetRealtimeInputFormat.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL3JlYWx0aW1lL0hvb2RpZVBhcnF1ZXRSZWFsdGltZUlucHV0Rm9ybWF0LmphdmE=) | `72.34% <0.00%> (-0.78%)` | `0.00% <0.00%> (ø%)` | |
   | [...s/deltastreamer/HoodieMultiTableDeltaStreamer.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllTXVsdGlUYWJsZURlbHRhU3RyZWFtZXIuamF2YQ==) | `78.39% <0.00%> (-0.49%)` | `18.00% <0.00%> (ø%)` | |
   | [...c/main/java/org/apache/hudi/table/HoodieTable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvSG9vZGllVGFibGUuamF2YQ==) | `79.64% <0.00%> (ø)` | `0.00% <0.00%> (ø%)` | |
   | ... and [39 more](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=footer). Last update [f5f34bb...e0c6fc5](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409394674
 
 

 ##########
 File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
 ##########
 @@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+    // Create schema file.
+    String schemaFile = new Path(basePath, "file.schema").toString();
+    createSchemaFile(schemaFile);
+
+    HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+  }
+
+  /**
+   * Test successful insert and verify data consistency.
+   */
+  @Test
+  public void testImportInsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      List<HoodieModel> expected = insertData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  /**
+   * Test upsert data and verify data consistency.
+   */
+  @Test
+  public void testImportWithUpsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
 
 Review comment:
   Can we use `try-with-resource` here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] hddong commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
hddong commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r410005068
 
 

 ##########
 File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
 ##########
 @@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+    // Create schema file.
+    String schemaFile = new Path(basePath, "file.schema").toString();
+    createSchemaFile(schemaFile);
+
+    HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+  }
+
+  /**
+   * Test successful insert and verify data consistency.
+   */
+  @Test
+  public void testImportInsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      List<HoodieModel> expected = insertData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  /**
+   * Test upsert data and verify data consistency.
+   */
+  @Test
+  public void testImportWithUpsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+
+      // Create schema file.
+      String schemaFile = new Path(basePath, "file.schema").toString();
+
+      Path upsertFolder = new Path(basePath, "testUpsertSrc");
+      List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+      HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+          "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+      cfg.command = "upsert";
+      HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+      dataImporter.dataImport(jsc, 0);
+
+      // construct result, remove top 10 and add upsert data.
+      List<GenericRecord> expectData = insertData.subList(11, 96);
+      expectData.addAll(upsertData);
+
+      // read latest data
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      // get expected result
+      List<HoodieModel> expected = expectData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  private List<GenericRecord> createRecords(Path srcFolder) throws ParseException, IOException {
 
 Review comment:
   > Does this method use to init records for inserting? IMO, we should distinguish it with upsert.
   
   Yes, it is for inserting only, `upsert` has it's own method.
   https://github.com/apache/incubator-hudi/blob/e1a47ff32f900d9c723dc907784210ce756915f9/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java#L284-L286

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] hddong commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
hddong commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#issuecomment-615161533
 
 
   @yanghua Thanks for you review, had address them.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] hmatu commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
hmatu commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#issuecomment-612865257
 
 
   These changes don't make sense(`command` only `UPSERT`) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409393206
 
 

 ##########
 File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
 ##########
 @@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+    // Create schema file.
+    String schemaFile = new Path(basePath, "file.schema").toString();
+    createSchemaFile(schemaFile);
+
+    HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+  }
+
+  /**
+   * Test successful insert and verify data consistency.
+   */
+  @Test
+  public void testImportInsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      List<HoodieModel> expected = insertData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  /**
+   * Test upsert data and verify data consistency.
+   */
+  @Test
+  public void testImportWithUpsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+
+      // Create schema file.
+      String schemaFile = new Path(basePath, "file.schema").toString();
+
+      Path upsertFolder = new Path(basePath, "testUpsertSrc");
+      List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+      HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+          "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+      cfg.command = "upsert";
+      HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+      dataImporter.dataImport(jsc, 0);
+
+      // construct result, remove top 10 and add upsert data.
+      List<GenericRecord> expectData = insertData.subList(11, 96);
+      expectData.addAll(upsertData);
+
+      // read latest data
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      // get expected result
+      List<HoodieModel> expected = expectData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
 
 Review comment:
   Here exists redundant boxing issues. It would be better to use `Double.parseDouble(xxx)`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409389425
 
 

 ##########
 File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
 ##########
 @@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+    // Create schema file.
+    String schemaFile = new Path(basePath, "file.schema").toString();
+    createSchemaFile(schemaFile);
+
+    HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+  }
+
+  /**
+   * Test successful insert and verify data consistency.
+   */
+  @Test
+  public void testImportInsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      List<HoodieModel> expected = insertData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  /**
+   * Test upsert data and verify data consistency.
+   */
+  @Test
+  public void testImportWithUpsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+
+      // Create schema file.
+      String schemaFile = new Path(basePath, "file.schema").toString();
+
+      Path upsertFolder = new Path(basePath, "testUpsertSrc");
+      List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+      HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+          "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+      cfg.command = "upsert";
+      HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+      dataImporter.dataImport(jsc, 0);
+
+      // construct result, remove top 10 and add upsert data.
+      List<GenericRecord> expectData = insertData.subList(11, 96);
+      expectData.addAll(upsertData);
+
+      // read latest data
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      // get expected result
+      List<HoodieModel> expected = expectData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  private List<GenericRecord> createRecords(Path srcFolder) throws ParseException, IOException {
 
 Review comment:
   Does this method use to init records for inserting? IMO, we should distinguish it with upsert.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409399060
 
 

 ##########
 File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
 ##########
 @@ -275,4 +403,44 @@ private JavaSparkContext getJavaSparkContext() {
     sparkConf = HoodieWriteClient.registerClasses(sparkConf);
     return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
   }
+
+  /**
+   * Class used for compare result and expected.
+   */
+  private class HoodieModel {
+    double timestamp;
+    String rowKey;
+    String rider;
+    String driver;
+    double beginLat;
+    double beginLon;
+    double endLat;
+    double endLon;
+
+    private HoodieModel(double timestamp, String rowKey, String rider, String driver, double beginLat,
+        double beginLon, double endLat, double endLon) {
+      this.timestamp = timestamp;
+      this.rowKey = rowKey;
+      this.rider = rider;
+      this.driver = driver;
+      this.beginLat = beginLat;
+      this.beginLon = beginLon;
+      this.endLat = endLat;
+      this.endLon = endLon;
+    }
+
+    @Override
+    public boolean equals(Object o) {
 
 Review comment:
   As a good habit, when we override the `equals` method, it would also be better to override the `hashCode` even though we did use it here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] hddong commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
hddong commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#issuecomment-612930401
 
 
   @hmatu Before change, `upsert` is equivalent is `insert`, because target path must not present(will be delete if present). We can do `upsert` based on existing data after change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409396839
 
 

 ##########
 File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
 ##########
 @@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException, ParseException {
 
 Review comment:
   `ParseException` would never be thrown.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409396156
 
 

 ##########
 File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
 ##########
 @@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+    // Create schema file.
+    String schemaFile = new Path(basePath, "file.schema").toString();
+    createSchemaFile(schemaFile);
+
+    HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+  }
+
+  /**
+   * Test successful insert and verify data consistency.
+   */
+  @Test
+  public void testImportInsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
 
 Review comment:
   ditto, `try-with-resource`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] codecov-io edited a comment on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #1511:
URL: https://github.com/apache/incubator-hudi/pull/1511#issuecomment-612848674


   # [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=h1) Report
   > Merging [#1511](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/f5f34bb1c16e6d070668486eba2a29f554c0bbc7&el=desc) will **decrease** coverage by `0.49%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1511/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1511      +/-   ##
   ============================================
   - Coverage     72.15%   71.66%   -0.50%     
   - Complexity      290      294       +4     
   ============================================
     Files           338      378      +40     
     Lines         15929    16535     +606     
     Branches       1625     1672      +47     
   ============================================
   + Hits          11494    11849     +355     
   - Misses         3704     3954     +250     
   - Partials        731      732       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../org/apache/hudi/table/HoodieCopyOnWriteTable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvSG9vZGllQ29weU9uV3JpdGVUYWJsZS5qYXZh) | `61.62% <0.00%> (-27.66%)` | `0.00% <0.00%> (ø%)` | |
   | [.../org/apache/hudi/table/HoodieMergeOnReadTable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvSG9vZGllTWVyZ2VPblJlYWRUYWJsZS5qYXZh) | `60.00% <0.00%> (-23.13%)` | `0.00% <0.00%> (ø%)` | |
   | [...hudi/common/fs/inline/InLineFsDataInputStream.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9JbkxpbmVGc0RhdGFJbnB1dFN0cmVhbS5qYXZh) | `38.46% <0.00%> (-15.39%)` | `0.00% <0.00%> (ø%)` | |
   | [...di/hadoop/realtime/HoodieRealtimeRecordReader.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL3JlYWx0aW1lL0hvb2RpZVJlYWx0aW1lUmVjb3JkUmVhZGVyLmphdmE=) | `70.00% <0.00%> (-14.22%)` | `0.00% <0.00%> (ø%)` | |
   | [...ache/hudi/common/fs/inline/InMemoryFileSystem.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9Jbk1lbW9yeUZpbGVTeXN0ZW0uamF2YQ==) | `79.31% <0.00%> (-10.35%)` | `0.00% <0.00%> (ø%)` | |
   | [.../apache/hudi/client/AbstractHoodieWriteClient.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpZW50L0Fic3RyYWN0SG9vZGllV3JpdGVDbGllbnQuamF2YQ==) | `68.33% <0.00%> (-5.83%)` | `0.00% <0.00%> (ø%)` | |
   | [...lities/checkpointing/KafkaConnectHdfsProvider.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2NoZWNrcG9pbnRpbmcvS2Fma2FDb25uZWN0SGRmc1Byb3ZpZGVyLmphdmE=) | `89.28% <0.00%> (-3.03%)` | `14.00% <0.00%> (+2.00%)` | :arrow_down: |
   | [...n/java/org/apache/hudi/index/hbase/HBaseIndex.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaW5kZXgvaGJhc2UvSEJhc2VJbmRleC5qYXZh) | `83.25% <0.00%> (-0.96%)` | `0.00% <0.00%> (ø%)` | |
   | [...oop/realtime/HoodieParquetRealtimeInputFormat.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL3JlYWx0aW1lL0hvb2RpZVBhcnF1ZXRSZWFsdGltZUlucHV0Rm9ybWF0LmphdmE=) | `72.34% <0.00%> (-0.78%)` | `0.00% <0.00%> (ø%)` | |
   | [...in/java/org/apache/hudi/utilities/UtilHelpers.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL1V0aWxIZWxwZXJzLmphdmE=) | `64.70% <0.00%> (-0.71%)` | `22.00% <0.00%> (+1.00%)` | :arrow_down: |
   | ... and [77 more](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=footer). Last update [f5f34bb...9381b9b](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511:
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r411053511



##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -275,4 +380,70 @@ private JavaSparkContext getJavaSparkContext() {
     sparkConf = HoodieWriteClient.registerClasses(sparkConf);
     return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
   }
+
+  /**
+   * Class used for compare result and expected.
+   */
+  private class HoodieModel {

Review comment:
       Can we mark this class to be a `static` class and rename it to `HoodieTripModel`?

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -275,4 +380,70 @@ private JavaSparkContext getJavaSparkContext() {
     sparkConf = HoodieWriteClient.registerClasses(sparkConf);
     return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
   }
+
+  /**
+   * Class used for compare result and expected.
+   */
+  private class HoodieModel {
+    double timestamp;
+    String rowKey;
+    String rider;
+    String driver;
+    double beginLat;
+    double beginLon;
+    double endLat;
+    double endLon;
+
+    private HoodieModel(double timestamp, String rowKey, String rider, String driver, double beginLat,
+        double beginLon, double endLat, double endLon) {
+      this.timestamp = timestamp;
+      this.rowKey = rowKey;
+      this.rider = rider;
+      this.driver = driver;
+      this.beginLat = beginLat;
+      this.beginLon = beginLon;
+      this.endLat = endLat;
+      this.endLon = endLon;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      HoodieModel other = (HoodieModel) o;
+      return timestamp == other.timestamp && rowKey.equals(other.rowKey) && rider.equals(other.rider)
+          && driver.equals(other.driver) && beginLat == other.beginLat && beginLon == other.beginLon
+          && endLat == other.endLat && endLon == other.endLon;
+    }
+
+    @Override
+    public int hashCode() {

Review comment:
       It's a bit long. Here, I would suggest using `Objects.hash(...)`.

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+    // Create schema file.
+    String schemaFile = new Path(basePath, "file.schema").toString();
+    createSchemaFile(schemaFile);
+
+    HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+  }
+
+  /**
+   * Test successful insert and verify data consistency.
+   */
+  @Test
+  public void testImportInsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      List<HoodieModel> expected = insertData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  /**
+   * Test upsert data and verify data consistency.
+   */
+  @Test
+  public void testImportWithUpsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+
+      // Create schema file.
+      String schemaFile = new Path(basePath, "file.schema").toString();
+
+      Path upsertFolder = new Path(basePath, "testUpsertSrc");
+      List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+      HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+          "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+      cfg.command = "upsert";
+      HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+      dataImporter.dataImport(jsc, 0);
+
+      // construct result, remove top 10 and add upsert data.
+      List<GenericRecord> expectData = insertData.subList(11, 96);
+      expectData.addAll(upsertData);
+
+      // read latest data
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      // get expected result
+      List<HoodieModel> expected = expectData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  private List<GenericRecord> createRecords(Path srcFolder) throws ParseException, IOException {

Review comment:
       I mean for matching purpose, can we rename it to `createInsertRecords`?

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -171,6 +277,30 @@ private void createRecords(Path srcFolder) throws ParseException, IOException {
       writer.write(record);
     }
     writer.close();
+    return records;
+  }
+
+  private List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException {
+    Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
+    long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
+    List<GenericRecord> records = new ArrayList<GenericRecord>();
+    // 10 for update
+    for (long recordNum = 0; recordNum < 11; recordNum++) {
+      records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
+          "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
+    }
+    // 4 for insert
+    for (long recordNum = 96; recordNum < 100; recordNum++) {
+      records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
+          "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
+    }
+    ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
+        .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
+    for (GenericRecord record : records) {
+      writer.write(record);

Review comment:
       This method throws `IOException`, so the `.close` method should be wrapped into a `finally` or `try-with-resource` block. The same issue exists in `createRecords` method.

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -150,14 +166,104 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
       for (Entry<String, Long> e : recordCounts.entrySet()) {
         assertEquals("missing records", 24, e.getValue().longValue());
       }
-    } finally {
-      if (jsc != null) {
-        jsc.stop();
-      }
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException {
+    // Create schema file.
+    String schemaFile = new Path(basePath, "file.schema").toString();
+    createSchemaFile(schemaFile);
+
+    HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+  }
+
+  /**
+   * Test successful insert and verify data consistency.
+   */
+  @Test
+  public void testImportInsert() throws IOException, ParseException {

Review comment:
       rename to `testImportWithInsert`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] hddong commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

Posted by GitBox <gi...@apache.org>.
hddong commented on a change in pull request #1511:
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r411258007



##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -275,4 +380,70 @@ private JavaSparkContext getJavaSparkContext() {
     sparkConf = HoodieWriteClient.registerClasses(sparkConf);
     return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
   }
+
+  /**
+   * Class used for compare result and expected.
+   */
+  private class HoodieModel {

Review comment:
       > Can we mark this class to be a `static` class and rename it to `HoodieTripModel`?
   
   Yes, and change it `public`, `testHDFSParquetImportCommand` maybe reuse.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org