You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/04/16 09:12:50 UTC

[GitHub] [incubator-hudi] yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

yanghua commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r409395668
 
 

 ##########
 File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
 ##########
 @@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+    // Create schema file.
+    String schemaFile = new Path(basePath, "file.schema").toString();
+    createSchemaFile(schemaFile);
+
+    HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+  }
+
+  /**
+   * Test successful insert and verify data consistency.
+   */
+  @Test
+  public void testImportInsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      List<HoodieModel> expected = insertData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  /**
+   * Test upsert data and verify data consistency.
+   */
+  @Test
+  public void testImportWithUpsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+
+      // Create schema file.
+      String schemaFile = new Path(basePath, "file.schema").toString();
+
+      Path upsertFolder = new Path(basePath, "testUpsertSrc");
+      List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+      HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+          "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+      cfg.command = "upsert";
+      HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+      dataImporter.dataImport(jsc, 0);
+
+      // construct result, remove top 10 and add upsert data.
+      List<GenericRecord> expectData = insertData.subList(11, 96);
+      expectData.addAll(upsertData);
+
+      // read latest data
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      // get expected result
+      List<HoodieModel> expected = expectData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
 
 Review comment:
   I would suggest one field one line for so many arguments.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services