You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/21 06:21:37 UTC

[incubator-hudi] branch master updated: [HUDI-789]Adjust logic of upsert in HDFSParquetImporter (#1511)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 84dd904  [HUDI-789]Adjust logic of upsert in HDFSParquetImporter (#1511)
84dd904 is described below

commit 84dd9047d3902650d7ff5bc95b9789d6880ca8e2
Author: hongdd <jn...@163.com>
AuthorDate: Tue Apr 21 14:21:30 2020 +0800

    [HUDI-789]Adjust logic of upsert in HDFSParquetImporter (#1511)
---
 .../apache/hudi/utilities/HDFSParquetImporter.java |  22 +-
 .../hudi/utilities/TestHDFSParquetImporter.java    | 255 ++++++++++++++++-----
 2 files changed, 217 insertions(+), 60 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index f389c58..4befaec 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -100,6 +100,10 @@ public class HDFSParquetImporter implements Serializable {
 
   }
 
+  private boolean isUpsert() {
+    return "upsert".equals(cfg.command.toLowerCase());
+  }
+
   public int dataImport(JavaSparkContext jsc, int retry) {
     this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
     this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
@@ -108,7 +112,7 @@ public class HDFSParquetImporter implements Serializable {
     int ret = -1;
     try {
       // Verify that targetPath is not present.
-      if (fs.exists(new Path(cfg.targetPath))) {
+      if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) {
         throw new HoodieIOException(String.format("Make sure %s is not present.", cfg.targetPath));
       }
       do {
@@ -122,20 +126,22 @@ public class HDFSParquetImporter implements Serializable {
 
   protected int dataImport(JavaSparkContext jsc) throws IOException {
     try {
-      if (fs.exists(new Path(cfg.targetPath))) {
+      if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) {
         // cleanup target directory.
         fs.delete(new Path(cfg.targetPath), true);
       }
 
+      if (!fs.exists(new Path(cfg.targetPath))) {
+        // Initialize target hoodie table.
+        Properties properties = new Properties();
+        properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName);
+        properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType);
+        HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), cfg.targetPath, properties);
+      }
+
       // Get schema.
       String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
 
-      // Initialize target hoodie table.
-      Properties properties = new Properties();
-      properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName);
-      properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType);
-      HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), cfg.targetPath, properties);
-
       HoodieWriteClient client =
           UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, cfg.parallelism, Option.empty(), props);
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
index c94edf3..a4711b5 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities;
 
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.minicluster.HdfsTestService;
 import org.apache.hudi.common.model.HoodieTestUtils;
@@ -37,8 +38,13 @@ import org.apache.parquet.avro.AvroParquetWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
+
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -50,8 +56,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -75,34 +83,43 @@ public class TestHDFSParquetImporter implements Serializable {
   }
 
   @AfterClass
-  public static void cleanupClass() throws Exception {
+  public static void cleanupClass() {
     if (hdfsTestService != null) {
       hdfsTestService.stop();
     }
   }
 
+  private String basePath;
+  private transient Path hoodieFolder;
+  private transient Path srcFolder;
+  private transient List<GenericRecord> insertData;
+
+  @Before
+  public void init() throws IOException, ParseException {
+    basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
+
+    // Hoodie root folder.
+    hoodieFolder = new Path(basePath, "testTarget");
+
+    // Create generic records.
+    srcFolder = new Path(basePath, "testSrc");
+    insertData = createInsertRecords(srcFolder);
+  }
+
+  @After
+  public void clean() throws IOException {
+    dfs.delete(new Path(basePath), true);
+  }
+
   /**
    * Test successful data import with retries.
    */
   @Test
   public void testImportWithRetries() throws Exception {
-    JavaSparkContext jsc = null;
-    try {
-      jsc = getJavaSparkContext();
-
-      // Test root folder.
-      String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
-
-      // Hoodie root folder
-      Path hoodieFolder = new Path(basePath, "testTarget");
-
+    try (JavaSparkContext jsc = getJavaSparkContext()) {
       // Create schema file.
       String schemaFile = new Path(basePath, "file.schema").toString();
 
-      // Create generic records.
-      Path srcFolder = new Path(basePath, "testSrc");
-      createRecords(srcFolder);
-
       HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
           "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
       AtomicInteger retry = new AtomicInteger(3);
@@ -150,14 +167,104 @@ public class TestHDFSParquetImporter implements Serializable {
       for (Entry<String, Long> e : recordCounts.entrySet()) {
         assertEquals("missing records", 24, e.getValue().longValue());
       }
-    } finally {
-      if (jsc != null) {
-        jsc.stop();
-      }
     }
   }
 
-  private void createRecords(Path srcFolder) throws ParseException, IOException {
+  private void insert(JavaSparkContext jsc) throws IOException {
+    // Create schema file.
+    String schemaFile = new Path(basePath, "file.schema").toString();
+    createSchemaFile(schemaFile);
+
+    HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+        "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+    HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+    dataImporter.dataImport(jsc, 0);
+  }
+
+  /**
+   * Test successful insert and verify data consistency.
+   */
+  @Test
+  public void testImportWithInsert() throws IOException, ParseException {
+    try (JavaSparkContext jsc = getJavaSparkContext()) {
+      insert(jsc);
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieTripModel> result = readData.stream().map(row ->
+          new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      List<HoodieTripModel> expected = insertData.stream().map(g ->
+          new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
+              g.get("_row_key").toString(),
+              g.get("rider").toString(),
+              g.get("driver").toString(),
+              Double.parseDouble(g.get("begin_lat").toString()),
+              Double.parseDouble(g.get("begin_lon").toString()),
+              Double.parseDouble(g.get("end_lat").toString()),
+              Double.parseDouble(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    }
+  }
+
+  /**
+   * Test upsert data and verify data consistency.
+   */
+  @Test
+  public void testImportWithUpsert() throws IOException, ParseException {
+    try (JavaSparkContext jsc = getJavaSparkContext()) {
+      insert(jsc);
+
+      // Create schema file.
+      String schemaFile = new Path(basePath, "file.schema").toString();
+
+      Path upsertFolder = new Path(basePath, "testUpsertSrc");
+      List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+      HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+          "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+      cfg.command = "upsert";
+      HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+      dataImporter.dataImport(jsc, 0);
+
+      // construct result, remove top 10 and add upsert data.
+      List<GenericRecord> expectData = insertData.subList(11, 96);
+      expectData.addAll(upsertData);
+
+      // read latest data
+      SQLContext sqlContext = new SQLContext(jsc);
+      Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+      List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+      List<HoodieTripModel> result = readData.stream().map(row ->
+          new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+              row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+          .collect(Collectors.toList());
+
+      // get expected result.
+      List<HoodieTripModel> expected = expectData.stream().map(g ->
+          new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
+              g.get("_row_key").toString(),
+              g.get("rider").toString(),
+              g.get("driver").toString(),
+              Double.parseDouble(g.get("begin_lat").toString()),
+              Double.parseDouble(g.get("begin_lon").toString()),
+              Double.parseDouble(g.get("end_lat").toString()),
+              Double.parseDouble(g.get("end_lon").toString())))
+          .collect(Collectors.toList());
+
+      assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+    }
+  }
+
+  private List<GenericRecord> createInsertRecords(Path srcFolder) throws ParseException, IOException {
     Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
     long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
     List<GenericRecord> records = new ArrayList<GenericRecord>();
@@ -165,12 +272,36 @@ public class TestHDFSParquetImporter implements Serializable {
       records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-" + recordNum,
           "driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
     }
-    ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
-        .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
-    for (GenericRecord record : records) {
-      writer.write(record);
+    try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
+        .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
+      for (GenericRecord record : records) {
+        writer.write(record);
+      }
     }
-    writer.close();
+    return records;
+  }
+
+  private List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException {
+    Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
+    long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
+    List<GenericRecord> records = new ArrayList<GenericRecord>();
+    // 10 for update
+    for (long recordNum = 0; recordNum < 11; recordNum++) {
+      records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
+          "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
+    }
+    // 4 for insert
+    for (long recordNum = 96; recordNum < 100; recordNum++) {
+      records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
+          "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
+    }
+    try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
+        .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
+      for (GenericRecord record : records) {
+        writer.write(record);
+      }
+    }
+    return records;
   }
 
   private void createSchemaFile(String schemaFile) throws IOException {
@@ -184,12 +315,7 @@ public class TestHDFSParquetImporter implements Serializable {
    */
   @Test
   public void testSchemaFile() throws Exception {
-    JavaSparkContext jsc = null;
-    try {
-      jsc = getJavaSparkContext();
-
-      // Test root folder.
-      String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
+    try (JavaSparkContext jsc = getJavaSparkContext()) {
       // Hoodie root folder
       Path hoodieFolder = new Path(basePath, "testTarget");
       Path srcFolder = new Path(basePath.toString(), "srcTest");
@@ -204,10 +330,6 @@ public class TestHDFSParquetImporter implements Serializable {
       // Should fail - return : -1.
       assertEquals(-1, dataImporter.dataImport(jsc, 0));
 
-    } finally {
-      if (jsc != null) {
-        jsc.stop();
-      }
     }
   }
 
@@ -216,19 +338,7 @@ public class TestHDFSParquetImporter implements Serializable {
    */
   @Test
   public void testRowAndPartitionKey() throws Exception {
-    JavaSparkContext jsc = null;
-    try {
-      jsc = getJavaSparkContext();
-
-      // Test root folder.
-      String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
-      // Hoodie root folder
-      Path hoodieFolder = new Path(basePath, "testTarget");
-
-      // Create generic records.
-      Path srcFolder = new Path(basePath, "testSrc");
-      createRecords(srcFolder);
-
+    try (JavaSparkContext jsc = getJavaSparkContext()) {
       // Create schema file.
       Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
       createSchemaFile(schemaFile.toString());
@@ -248,10 +358,6 @@ public class TestHDFSParquetImporter implements Serializable {
       dataImporter = new HDFSParquetImporter(cfg);
       assertEquals(-1, dataImporter.dataImport(jsc, 0));
 
-    } finally {
-      if (jsc != null) {
-        jsc.stop();
-      }
     }
   }
 
@@ -275,4 +381,49 @@ public class TestHDFSParquetImporter implements Serializable {
     sparkConf = HoodieWriteClient.registerClasses(sparkConf);
     return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
   }
+
+  /**
+   * Class used for compare result and expected.
+   */
+  public static class HoodieTripModel {
+    double timestamp;
+    String rowKey;
+    String rider;
+    String driver;
+    double beginLat;
+    double beginLon;
+    double endLat;
+    double endLon;
+
+    private HoodieTripModel(double timestamp, String rowKey, String rider, String driver, double beginLat,
+        double beginLon, double endLat, double endLon) {
+      this.timestamp = timestamp;
+      this.rowKey = rowKey;
+      this.rider = rider;
+      this.driver = driver;
+      this.beginLat = beginLat;
+      this.beginLon = beginLon;
+      this.endLat = endLat;
+      this.endLon = endLon;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      HoodieTripModel other = (HoodieTripModel) o;
+      return timestamp == other.timestamp && rowKey.equals(other.rowKey) && rider.equals(other.rider)
+          && driver.equals(other.driver) && beginLat == other.beginLat && beginLon == other.beginLon
+          && endLat == other.endLat && endLon == other.endLon;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(timestamp, rowKey, rider, driver, beginLat, beginLon, endLat, endLon);
+    }
+  }
 }