You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/04/17 05:48:48 UTC

[GitHub] [incubator-hudi] hddong commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter

hddong commented on a change in pull request #1511: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter
URL: https://github.com/apache/incubator-hudi/pull/1511#discussion_r410005068
 
 

 ##########
 File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
 ##########
 @@ -157,7 +176,103 @@ protected int dataImport(JavaSparkContext jsc) throws IOException {
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException, ParseException {
+    // Create schema file.
+    String schemaFile = new Path(basePath, "file.schema").toString();
+    createSchemaFile(schemaFile);
+
+    HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+  }
+
+  /**
+   * Test successful insert and verify data consistency.
+   */
+  @Test
+  public void testImportInsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      List<HoodieModel> expected = insertData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  /**
+   * Test upsert data and verify data consistency.
+   */
+  @Test
+  public void testImportWithUpsert() throws IOException, ParseException {
+    JavaSparkContext jsc = null;
+    try {
+      jsc = getJavaSparkContext();
+      insert(jsc);
+
+      // Create schema file.
+      String schemaFile = new Path(basePath, "file.schema").toString();
+
+      Path upsertFolder = new Path(basePath, "testUpsertSrc");
+      List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+      HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+          "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+      cfg.command = "upsert";
+      HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+      dataImporter.dataImport(jsc, 0);
+
+      // construct result, remove top 10 and add upsert data.
+      List<GenericRecord> expectData = insertData.subList(11, 96);
+      expectData.addAll(upsertData);
+
+      // read latest data
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieModel> result = readData.stream().map(row ->
+          new HoodieModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      // get expected result
+      List<HoodieModel> expected = expectData.stream().map(g ->
+          new HoodieModel(Double.valueOf(g.get("timestamp").toString()), g.get("_row_key").toString(), g.get("rider").toString(), g.get("driver").toString(),
+              Double.valueOf(g.get("begin_lat").toString()), Double.valueOf(g.get("begin_lon").toString()), Double.valueOf(g.get("end_lat").toString()),
+              Double.valueOf(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    } finally {
+      if (jsc != null) {
+        jsc.stop();
+      }
+    }
+  }
+
+  private List<GenericRecord> createRecords(Path srcFolder) throws ParseException, IOException {
 
 Review comment:
   > Does this method use to init records for inserting? IMO, we should distinguish it with upsert.
   
   Yes, it is for inserting only, `upsert` has it's own method.
   https://github.com/apache/incubator-hudi/blob/e1a47ff32f900d9c723dc907784210ce756915f9/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java#L284-L286

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services