You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/04/13 07:15:13 UTC
[GitHub] [incubator-hudi] hddong opened a new pull request #1511:
[HUDI-789]Adjust logic of upsert in HDFSParquetImporter
hddong opened a new pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511
## *Tips*
- *Thank you very much for contributing to Apache Hudi.*
- *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
## What is the purpose of the pull request
*In `HDFSParquetImporter`, `upsert` is equivalent to `insert` (remove old metadata, then create and insert data). But `upsert` means update and insert on old data. *
## Brief change log
*(for example:)*
- *Adjust logic of upsert in HDFSParquetImporter*
## Verify this pull request
This pull request is a trivial rework / code cleanup without any test coverage.
## Committer checklist
- [ ] Has a corresponding JIRA in PR title & commit
- [ ] Commit message is descriptive of the change
- [ ] CI is green
- [ ] Necessary doc changes done or have another open PR
- [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] hmatu commented on a change in pull request #1511:
[HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
hmatu commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r407815629
##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
##########
@@ -100,6 +100,10 @@ public static void main(String[] args) {
}
+ private boolean isUpsert() {
Review comment:
use boolean `flag` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] yanghua commented on a change in pull request
#1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409395668
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
}
}
- private void createRecords(Path srcFolder) throws ParseException, IOException {
+ private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportInsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ List<HoodieModel> expected = insertData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ /**
+ * Test upsert data and verify data consistency.
+ */
+ @Test
+ public void testImportWithUpsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+
+ Path upsertFolder = new Path(basePath, "testUpsertSrc");
+ List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ cfg.command = "upsert";
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+
+ // construct result, remove top 10 and add upsert data.
+ List<GenericRecord> expectData = insertData.subList(11, 96);
+ expectData.addAll(upsertData);
+
+ // read latest data
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ // get expected result
+ List<HoodieModel> expected = expectData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
Review comment:
I would suggest one field one line for so many arguments.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] codecov-io commented on issue #1511:
[HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#issuecomment-612848674
# [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=h1) Report
> Merging [#1511](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/f5f34bb1c16e6d070668486eba2a29f554c0bbc7&el=desc) will **decrease** coverage by `0.02%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1511/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #1511 +/- ##
============================================
- Coverage 72.15% 72.12% -0.03%
+ Complexity 290 289 -1
============================================
Files 338 365 +27
Lines 15929 16228 +299
Branches 1625 1632 +7
============================================
+ Hits 11494 11705 +211
- Misses 3704 3790 +86
- Partials 731 733 +2
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [.../org/apache/hudi/table/HoodieCopyOnWriteTable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvSG9vZGllQ29weU9uV3JpdGVUYWJsZS5qYXZh) | `69.64% <0.00%> (-19.65%)` | `0.00% <0.00%> (ø%)` | |
| [...hudi/common/fs/inline/InLineFsDataInputStream.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9JbkxpbmVGc0RhdGFJbnB1dFN0cmVhbS5qYXZh) | `38.46% <0.00%> (-15.39%)` | `0.00% <0.00%> (ø%)` | |
| [...ache/hudi/common/fs/inline/InMemoryFileSystem.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9Jbk1lbW9yeUZpbGVTeXN0ZW0uamF2YQ==) | `79.31% <0.00%> (-10.35%)` | `0.00% <0.00%> (ø%)` | |
| [.../org/apache/hudi/table/HoodieMergeOnReadTable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvSG9vZGllTWVyZ2VPblJlYWRUYWJsZS5qYXZh) | `73.33% <0.00%> (-9.80%)` | `0.00% <0.00%> (ø%)` | |
| [.../apache/hudi/client/AbstractHoodieWriteClient.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpZW50L0Fic3RyYWN0SG9vZGllV3JpdGVDbGllbnQuamF2YQ==) | `68.50% <0.00%> (-5.65%)` | `0.00% <0.00%> (ø%)` | |
| [...e/hudi/common/table/log/HoodieLogFormatWriter.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3RhYmxlL2xvZy9Ib29kaWVMb2dGb3JtYXRXcml0ZXIuamF2YQ==) | `75.00% <0.00%> (-1.93%)` | `0.00% <0.00%> (ø%)` | |
| [...n/java/org/apache/hudi/index/hbase/HBaseIndex.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaW5kZXgvaGJhc2UvSEJhc2VJbmRleC5qYXZh) | `83.25% <0.00%> (-0.96%)` | `0.00% <0.00%> (ø%)` | |
| [...oop/realtime/HoodieParquetRealtimeInputFormat.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL3JlYWx0aW1lL0hvb2RpZVBhcnF1ZXRSZWFsdGltZUlucHV0Rm9ybWF0LmphdmE=) | `72.34% <0.00%> (-0.78%)` | `0.00% <0.00%> (ø%)` | |
| [...s/deltastreamer/HoodieMultiTableDeltaStreamer.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlbHRhc3RyZWFtZXIvSG9vZGllTXVsdGlUYWJsZURlbHRhU3RyZWFtZXIuamF2YQ==) | `78.39% <0.00%> (-0.49%)` | `18.00% <0.00%> (ø%)` | |
| [...c/main/java/org/apache/hudi/table/HoodieTable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvSG9vZGllVGFibGUuamF2YQ==) | `79.64% <0.00%> (ø)` | `0.00% <0.00%> (ø%)` | |
| ... and [39 more](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=footer). Last update [f5f34bb...e0c6fc5](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] yanghua commented on a change in pull request
#1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409394674
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
}
}
- private void createRecords(Path srcFolder) throws ParseException, IOException {
+ private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportInsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ List<HoodieModel> expected = insertData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ /**
+ * Test upsert data and verify data consistency.
+ */
+ @Test
+ public void testImportWithUpsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
Review comment:
Can we use `try-with-resource` here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] hddong commented on a change in pull request
#1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
hddong commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r410005068
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
}
}
- private void createRecords(Path srcFolder) throws ParseException, IOException {
+ private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportInsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ List<HoodieModel> expected = insertData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ /**
+ * Test upsert data and verify data consistency.
+ */
+ @Test
+ public void testImportWithUpsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+
+ Path upsertFolder = new Path(basePath, "testUpsertSrc");
+ List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ cfg.command = "upsert";
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+
+ // construct result, remove top 10 and add upsert data.
+ List<GenericRecord> expectData = insertData.subList(11, 96);
+ expectData.addAll(upsertData);
+
+ // read latest data
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ // get expected result
+ List<HoodieModel> expected = expectData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ private List<GenericRecord> createRecords(Path srcFolder) throws ParseException, IOException {
Review comment:
> Does this method use to init records for inserting? IMO, we should distinguish it with upsert.
Yes, it is for inserting only, `upsert` has it's own method.
https://github.com/apache/incubator-hudi/blob/e1a47ff32f900d9c723dc907784210ce756915f9/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java#L284-L286
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] hddong commented on issue #1511: [HUDI-789]Adjust
logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
hddong commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#issuecomment-615161533
@yanghua Thanks for you review, had address them.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] hmatu commented on issue #1511: [HUDI-789]Adjust
logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
hmatu commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#issuecomment-612865257
These changes don't make sense(`command` only `UPSERT`)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] yanghua commented on a change in pull request
#1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409393206
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
}
}
- private void createRecords(Path srcFolder) throws ParseException, IOException {
+ private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportInsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ List<HoodieModel> expected = insertData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ /**
+ * Test upsert data and verify data consistency.
+ */
+ @Test
+ public void testImportWithUpsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+
+ Path upsertFolder = new Path(basePath, "testUpsertSrc");
+ List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ cfg.command = "upsert";
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+
+ // construct result, remove top 10 and add upsert data.
+ List<GenericRecord> expectData = insertData.subList(11, 96);
+ expectData.addAll(upsertData);
+
+ // read latest data
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ // get expected result
+ List<HoodieModel> expected = expectData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
Review comment:
Here exists redundant boxing issues. It would be better to use `Double.parseDouble(xxx)`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] yanghua commented on a change in pull request
#1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409389425
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
}
}
- private void createRecords(Path srcFolder) throws ParseException, IOException {
+ private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportInsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ List<HoodieModel> expected = insertData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ /**
+ * Test upsert data and verify data consistency.
+ */
+ @Test
+ public void testImportWithUpsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+
+ Path upsertFolder = new Path(basePath, "testUpsertSrc");
+ List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ cfg.command = "upsert";
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+
+ // construct result, remove top 10 and add upsert data.
+ List<GenericRecord> expectData = insertData.subList(11, 96);
+ expectData.addAll(upsertData);
+
+ // read latest data
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ // get expected result
+ List<HoodieModel> expected = expectData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ private List<GenericRecord> createRecords(Path srcFolder) throws ParseException, IOException {
Review comment:
Does this method use to init records for inserting? IMO, we should distinguish it with upsert.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] yanghua commented on a change in pull request
#1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409399060
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -275,4 +403,44 @@ private JavaSparkContext getJavaSparkContext() {
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
}
+
+ /**
+ * Class used for compare result and expected.
+ */
+ private class HoodieModel {
+ double timestamp;
+ String rowKey;
+ String rider;
+ String driver;
+ double beginLat;
+ double beginLon;
+ double endLat;
+ double endLon;
+
+ private HoodieModel(double timestamp, String rowKey, String rider, String driver, double beginLat,
+ double beginLon, double endLat, double endLon) {
+ this.timestamp = timestamp;
+ this.rowKey = rowKey;
+ this.rider = rider;
+ this.driver = driver;
+ this.beginLat = beginLat;
+ this.beginLon = beginLon;
+ this.endLat = endLat;
+ this.endLon = endLon;
+ }
+
+ @Override
+ public boolean equals(Object o) {
Review comment:
As a good habit, when we override the `equals` method, it would also be better to override the `hashCode` even though we did use it here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] hddong commented on issue #1511: [HUDI-789]Adjust
logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
hddong commented on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#issuecomment-612930401
@hmatu Before change, `upsert` is equivalent is `insert`, because target path must not present(will be delete if present). We can do `upsert` based on existing data after change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] yanghua commented on a change in pull request
#1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409396839
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
}
}
- private void createRecords(Path srcFolder) throws ParseException, IOException {
+ private void insert(JavaSparkContext jsc) throws IOException, ParseException {
Review comment:
`ParseException` would never be thrown.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] yanghua commented on a change in pull request
#1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409396156
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
}
}
- private void createRecords(Path srcFolder) throws ParseException, IOException {
+ private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportInsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
Review comment:
ditto, `try-with-resource`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [incubator-hudi] codecov-io edited a comment on issue #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #1511:
URL: https://github.com/apache/incubator-hudi/pull/1511#issuecomment-612848674
# [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=h1) Report
> Merging [#1511](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/f5f34bb1c16e6d070668486eba2a29f554c0bbc7&el=desc) will **decrease** coverage by `0.49%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1511/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #1511 +/- ##
============================================
- Coverage 72.15% 71.66% -0.50%
- Complexity 290 294 +4
============================================
Files 338 378 +40
Lines 15929 16535 +606
Branches 1625 1672 +47
============================================
+ Hits 11494 11849 +355
- Misses 3704 3954 +250
- Partials 731 732 +1
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
|---|---|---|---|
| [.../org/apache/hudi/table/HoodieCopyOnWriteTable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvSG9vZGllQ29weU9uV3JpdGVUYWJsZS5qYXZh) | `61.62% <0.00%> (-27.66%)` | `0.00% <0.00%> (ø%)` | |
| [.../org/apache/hudi/table/HoodieMergeOnReadTable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvSG9vZGllTWVyZ2VPblJlYWRUYWJsZS5qYXZh) | `60.00% <0.00%> (-23.13%)` | `0.00% <0.00%> (ø%)` | |
| [...hudi/common/fs/inline/InLineFsDataInputStream.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9JbkxpbmVGc0RhdGFJbnB1dFN0cmVhbS5qYXZh) | `38.46% <0.00%> (-15.39%)` | `0.00% <0.00%> (ø%)` | |
| [...di/hadoop/realtime/HoodieRealtimeRecordReader.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL3JlYWx0aW1lL0hvb2RpZVJlYWx0aW1lUmVjb3JkUmVhZGVyLmphdmE=) | `70.00% <0.00%> (-14.22%)` | `0.00% <0.00%> (ø%)` | |
| [...ache/hudi/common/fs/inline/InMemoryFileSystem.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL2ZzL2lubGluZS9Jbk1lbW9yeUZpbGVTeXN0ZW0uamF2YQ==) | `79.31% <0.00%> (-10.35%)` | `0.00% <0.00%> (ø%)` | |
| [.../apache/hudi/client/AbstractHoodieWriteClient.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpZW50L0Fic3RyYWN0SG9vZGllV3JpdGVDbGllbnQuamF2YQ==) | `68.33% <0.00%> (-5.83%)` | `0.00% <0.00%> (ø%)` | |
| [...lities/checkpointing/KafkaConnectHdfsProvider.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2NoZWNrcG9pbnRpbmcvS2Fma2FDb25uZWN0SGRmc1Byb3ZpZGVyLmphdmE=) | `89.28% <0.00%> (-3.03%)` | `14.00% <0.00%> (+2.00%)` | :arrow_down: |
| [...n/java/org/apache/hudi/index/hbase/HBaseIndex.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaW5kZXgvaGJhc2UvSEJhc2VJbmRleC5qYXZh) | `83.25% <0.00%> (-0.96%)` | `0.00% <0.00%> (ø%)` | |
| [...oop/realtime/HoodieParquetRealtimeInputFormat.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS1oYWRvb3AtbXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaGFkb29wL3JlYWx0aW1lL0hvb2RpZVBhcnF1ZXRSZWFsdGltZUlucHV0Rm9ybWF0LmphdmE=) | `72.34% <0.00%> (-0.78%)` | `0.00% <0.00%> (ø%)` | |
| [...in/java/org/apache/hudi/utilities/UtilHelpers.java](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL1V0aWxIZWxwZXJzLmphdmE=) | `64.70% <0.00%> (-0.71%)` | `22.00% <0.00%> (+1.00%)` | :arrow_down: |
| ... and [77 more](https://codecov.io/gh/apache/incubator-hudi/pull/1511/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=footer). Last update [f5f34bb...9381b9b](https://codecov.io/gh/apache/incubator-hudi/pull/1511?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #1511:
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r411053511
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -275,4 +380,70 @@ private JavaSparkContext getJavaSparkContext() {
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
}
+
+ /**
+ * Class used for compare result and expected.
+ */
+ private class HoodieModel {
Review comment:
Can we mark this class to be a `static` class and rename it to `HoodieTripModel`?
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -275,4 +380,70 @@ private JavaSparkContext getJavaSparkContext() {
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
}
+
+ /**
+ * Class used for compare result and expected.
+ */
+ private class HoodieModel {
+ double timestamp;
+ String rowKey;
+ String rider;
+ String driver;
+ double beginLat;
+ double beginLon;
+ double endLat;
+ double endLon;
+
+ private HoodieModel(double timestamp, String rowKey, String rider, String driver, double beginLat,
+ double beginLon, double endLat, double endLon) {
+ this.timestamp = timestamp;
+ this.rowKey = rowKey;
+ this.rider = rider;
+ this.driver = driver;
+ this.beginLat = beginLat;
+ this.beginLon = beginLon;
+ this.endLat = endLat;
+ this.endLon = endLon;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HoodieModel other = (HoodieModel) o;
+ return timestamp == other.timestamp && rowKey.equals(other.rowKey) && rider.equals(other.rider)
+ && driver.equals(other.driver) && beginLat == other.beginLat && beginLon == other.beginLon
+ && endLat == other.endLat && endLon == other.endLon;
+ }
+
+ @Override
+ public int hashCode() {
Review comment:
It's a bit long. Here, I would suggest using `Objects.hash(...)`.
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
}
}
- private void createRecords(Path srcFolder) throws ParseException, IOException {
+ private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportInsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ List<HoodieModel> expected = insertData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ /**
+ * Test upsert data and verify data consistency.
+ */
+ @Test
+ public void testImportWithUpsert() throws IOException, ParseException {
+ JavaSparkContext jsc = null;
+ try {
+ jsc = getJavaSparkContext();
+ insert(jsc);
+
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+
+ Path upsertFolder = new Path(basePath, "testUpsertSrc");
+ List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ cfg.command = "upsert";
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+
+ // construct result, remove top 10 and add upsert data.
+ List<GenericRecord> expectData = insertData.subList(11, 96);
+ expectData.addAll(upsertData);
+
+ // read latest data
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieModel> result = readData.stream().map(row ->
+ new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ // get expected result
+ List<HoodieModel> expected = expectData.stream().map(g ->
+ new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+ Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+ Double.valueOf(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+ } finally {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+ }
+
+ private List<GenericRecord> createRecords(Path srcFolder) throws ParseException, IOException {
Review comment:
I mean for matching purpose, can we rename it to `createInsertRecords`?
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -171,6 +277,30 @@ private void createRecords(Path srcFolder) throws ParseException, IOException {
writer.write(record);
}
writer.close();
+ return records;
+ }
+
+ private List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException {
+ Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
+ long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
+ List<GenericRecord> records = new ArrayList<GenericRecord>();
+ // 10 for update
+ for (long recordNum = 0; recordNum < 11; recordNum++) {
+ records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
+ "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
+ }
+ // 4 for insert
+ for (long recordNum = 96; recordNum < 100; recordNum++) {
+ records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
+ "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
+ }
+ ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
+ .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
+ for (GenericRecord record : records) {
+ writer.write(record);
Review comment:
This method throws `IOException`, so the `.close` method should be wrapped into a `finally` or `try-with-resource` block. The same issue exists in `createRecords` method.
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -150,14 +166,104 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
for (Entry<String, Long> e : recordCounts.entrySet()) {
assertEquals("missing records", 24, e.getValue().longValue());
}
- } finally {
- if (jsc != null) {
- jsc.stop();
- }
}
}
- private void createRecords(Path srcFolder) throws ParseException, IOException {
+ private void insert(JavaSparkContext jsc) throws IOException {
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportInsert() throws IOException, ParseException {
Review comment:
rename to `testImportWithInsert`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-hudi] hddong commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
Posted by GitBox <gi...@apache.org>.
hddong commented on a change in pull request #1511:
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r411258007
##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
##########
@@ -275,4 +380,70 @@ private JavaSparkContext getJavaSparkContext() {
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
}
+
+ /**
+ * Class used for compare result and expected.
+ */
+ private class HoodieModel {
Review comment:
> Can we mark this class to be a `static` class and rename it to `HoodieTripModel`?
Yes, and change it `public`, `testHDFSParquetImportCommand` maybe reuse.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org