You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/21 06:21:37 UTC
[incubator-hudi] branch master updated: [HUDI-789]Adjust logic of
upsert in HDFSParquetImporter (#1511)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 84dd904 [HUDI-789]Adjust logic of upsert in HDFSParquetImporter (#1511)
84dd904 is described below
commit 84dd9047d3902650d7ff5bc95b9789d6880ca8e2
Author: hongdd <jn...@163.com>
AuthorDate: Tue Apr 21 14:21:30 2020 +0800
[HUDI-789]Adjust logic of upsert in HDFSParquetImporter (#1511)
---
.../apache/hudi/utilities/HDFSParquetImporter.java | 22 +-
.../hudi/utilities/TestHDFSParquetImporter.java | 255 ++++++++++++++++-----
2 files changed, 217 insertions(+), 60 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index f389c58..4befaec 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -100,6 +100,10 @@ public class HDFSParquetImporter implements Serializable {
}
+ private boolean isUpsert() {
+ return "upsert".equals(cfg.command.toLowerCase());
+ }
+
public int dataImport(JavaSparkContext jsc, int retry) {
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
@@ -108,7 +112,7 @@ public class HDFSParquetImporter implements Serializable {
int ret = -1;
try {
// Verify that targetPath is not present.
- if (fs.exists(new Path(cfg.targetPath))) {
+ if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) {
throw new HoodieIOException(String.format("Make sure %s is not present.", cfg.targetPath));
}
do {
@@ -122,20 +126,22 @@ public class HDFSParquetImporter implements Serializable {
protected int dataImport(JavaSparkContext jsc) throws IOException {
try {
- if (fs.exists(new Path(cfg.targetPath))) {
+ if (fs.exists(new Path(cfg.targetPath)) && !isUpsert()) {
// cleanup target directory.
fs.delete(new Path(cfg.targetPath), true);
}
+ if (!fs.exists(new Path(cfg.targetPath))) {
+ // Initialize target hoodie table.
+ Properties properties = new Properties();
+ properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName);
+ properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType);
+ HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), cfg.targetPath, properties);
+ }
+
// Get schema.
String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
- // Initialize target hoodie table.
- Properties properties = new Properties();
- properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, cfg.tableName);
- properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, cfg.tableType);
- HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), cfg.targetPath, properties);
-
HoodieWriteClient client =
UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr, cfg.parallelism, Option.empty(), props);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
index c94edf3..a4711b5 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHDFSParquetImporter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieTestUtils;
@@ -37,8 +38,13 @@ import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
+
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -50,8 +56,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -75,34 +83,43 @@ public class TestHDFSParquetImporter implements Serializable {
}
@AfterClass
- public static void cleanupClass() throws Exception {
+ public static void cleanupClass() {
if (hdfsTestService != null) {
hdfsTestService.stop();
}
}
+ private String basePath;
+ private transient Path hoodieFolder;
+ private transient Path srcFolder;
+ private transient List<GenericRecord> insertData;
+
+ @Before
+ public void init() throws IOException, ParseException {
+ basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
+
+ // Hoodie root folder.
+ hoodieFolder = new Path(basePath, "testTarget");
+
+ // Create generic records.
+ srcFolder = new Path(basePath, "testSrc");
+ insertData = createInsertRecords(srcFolder);
+ }
+
+ @After
+ public void clean() throws IOException {
+ dfs.delete(new Path(basePath), true);
+ }
+
/**
* Test successful data import with retries.
*/
@Test
public void testImportWithRetries() throws Exception {
- JavaSparkContext jsc = null;
- try {
- jsc = getJavaSparkContext();
-
- // Test root folder.
- String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
-
- // Hoodie root folder
- Path hoodieFolder = new Path(basePath, "testTarget");
-
+ try (JavaSparkContext jsc = getJavaSparkContext()) {
// Create schema file.
String schemaFile = new Path(basePath, "file.schema").toString();
- // Create generic records.
- Path srcFolder = new Path(basePath, "testSrc");
- createRecords(srcFolder);
-
HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
"testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
AtomicInteger retry = new AtomicInteger(3);
@@ -150,14 +167,104 @@ public class TestHDFSParquetImporter implements Serializable {
for (Entry<String, Long> e : recordCounts.entrySet()) {
assertEquals("missing records", 24, e.getValue().longValue());
}
- } finally {
- if (jsc != null) {
- jsc.stop();
- }
}
}
- private void createRecords(Path srcFolder) throws ParseException, IOException {
+ private void insert(JavaSparkContext jsc) throws IOException {
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+ createSchemaFile(schemaFile);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(srcFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+ }
+
+ /**
+ * Test successful insert and verify data consistency.
+ */
+ @Test
+ public void testImportWithInsert() throws IOException, ParseException {
+ try (JavaSparkContext jsc = getJavaSparkContext()) {
+ insert(jsc);
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieTripModel> result = readData.stream().map(row ->
+ new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ List<HoodieTripModel> expected = insertData.stream().map(g ->
+ new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
+ g.get("_row_key").toString(),
+ g.get("rider").toString(),
+ g.get("driver").toString(),
+ Double.parseDouble(g.get("begin_lat").toString()),
+ Double.parseDouble(g.get("begin_lon").toString()),
+ Double.parseDouble(g.get("end_lat").toString()),
+ Double.parseDouble(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+ }
+ }
+
+ /**
+ * Test upsert data and verify data consistency.
+ */
+ @Test
+ public void testImportWithUpsert() throws IOException, ParseException {
+ try (JavaSparkContext jsc = getJavaSparkContext()) {
+ insert(jsc);
+
+ // Create schema file.
+ String schemaFile = new Path(basePath, "file.schema").toString();
+
+ Path upsertFolder = new Path(basePath, "testUpsertSrc");
+ List<GenericRecord> upsertData = createUpsertRecords(upsertFolder);
+
+ HDFSParquetImporter.Config cfg = getHDFSParquetImporterConfig(upsertFolder.toString(), hoodieFolder.toString(),
+ "testTable", "COPY_ON_WRITE", "_row_key", "timestamp", 1, schemaFile);
+ cfg.command = "upsert";
+ HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg);
+
+ dataImporter.dataImport(jsc, 0);
+
+ // construct result, remove top 10 and add upsert data.
+ List<GenericRecord> expectData = insertData.subList(11, 96);
+ expectData.addAll(upsertData);
+
+ // read latest data
+ SQLContext sqlContext = new SQLContext(jsc);
+ Dataset<Row> ds = HoodieClientTestUtils.read(jsc, basePath + "/testTarget", sqlContext, dfs, basePath + "/testTarget/*/*/*/*");
+
+ List<Row> readData = ds.select("timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon").collectAsList();
+ List<HoodieTripModel> result = readData.stream().map(row ->
+ new HoodieTripModel(row.getDouble(0), row.getString(1), row.getString(2), row.getString(3), row.getDouble(4),
+ row.getDouble(5), row.getDouble(6), row.getDouble(7)))
+ .collect(Collectors.toList());
+
+ // get expected result.
+ List<HoodieTripModel> expected = expectData.stream().map(g ->
+ new HoodieTripModel(Double.parseDouble(g.get("timestamp").toString()),
+ g.get("_row_key").toString(),
+ g.get("rider").toString(),
+ g.get("driver").toString(),
+ Double.parseDouble(g.get("begin_lat").toString()),
+ Double.parseDouble(g.get("begin_lon").toString()),
+ Double.parseDouble(g.get("end_lat").toString()),
+ Double.parseDouble(g.get("end_lon").toString())))
+ .collect(Collectors.toList());
+
+ assertTrue(result.containsAll(expected) && expected.containsAll(result) && result.size() == expected.size());
+ }
+ }
+
+ private List<GenericRecord> createInsertRecords(Path srcFolder) throws ParseException, IOException {
Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
List<GenericRecord> records = new ArrayList<GenericRecord>();
@@ -165,12 +272,36 @@ public class TestHDFSParquetImporter implements Serializable {
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-" + recordNum,
"driver-" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
}
- ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
- .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build();
- for (GenericRecord record : records) {
- writer.write(record);
+ try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
+ .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
+ for (GenericRecord record : records) {
+ writer.write(record);
+ }
}
- writer.close();
+ return records;
+ }
+
+ private List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException {
+ Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
+ long startTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse("20170203000000").getTime() / 1000;
+ List<GenericRecord> records = new ArrayList<GenericRecord>();
+ // 10 for update
+ for (long recordNum = 0; recordNum < 11; recordNum++) {
+ records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
+ "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
+ }
+ // 4 for insert
+ for (long recordNum = 96; recordNum < 100; recordNum++) {
+ records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "rider-upsert-" + recordNum,
+ "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum)));
+ }
+ try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(srcFile)
+ .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
+ for (GenericRecord record : records) {
+ writer.write(record);
+ }
+ }
+ return records;
}
private void createSchemaFile(String schemaFile) throws IOException {
@@ -184,12 +315,7 @@ public class TestHDFSParquetImporter implements Serializable {
*/
@Test
public void testSchemaFile() throws Exception {
- JavaSparkContext jsc = null;
- try {
- jsc = getJavaSparkContext();
-
- // Test root folder.
- String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
+ try (JavaSparkContext jsc = getJavaSparkContext()) {
// Hoodie root folder
Path hoodieFolder = new Path(basePath, "testTarget");
Path srcFolder = new Path(basePath.toString(), "srcTest");
@@ -204,10 +330,6 @@ public class TestHDFSParquetImporter implements Serializable {
// Should fail - return : -1.
assertEquals(-1, dataImporter.dataImport(jsc, 0));
- } finally {
- if (jsc != null) {
- jsc.stop();
- }
}
}
@@ -216,19 +338,7 @@ public class TestHDFSParquetImporter implements Serializable {
*/
@Test
public void testRowAndPartitionKey() throws Exception {
- JavaSparkContext jsc = null;
- try {
- jsc = getJavaSparkContext();
-
- // Test root folder.
- String basePath = (new Path(dfsBasePath, Thread.currentThread().getStackTrace()[1].getMethodName())).toString();
- // Hoodie root folder
- Path hoodieFolder = new Path(basePath, "testTarget");
-
- // Create generic records.
- Path srcFolder = new Path(basePath, "testSrc");
- createRecords(srcFolder);
-
+ try (JavaSparkContext jsc = getJavaSparkContext()) {
// Create schema file.
Path schemaFile = new Path(basePath.toString(), "missingFile.schema");
createSchemaFile(schemaFile.toString());
@@ -248,10 +358,6 @@ public class TestHDFSParquetImporter implements Serializable {
dataImporter = new HDFSParquetImporter(cfg);
assertEquals(-1, dataImporter.dataImport(jsc, 0));
- } finally {
- if (jsc != null) {
- jsc.stop();
- }
}
}
@@ -275,4 +381,49 @@ public class TestHDFSParquetImporter implements Serializable {
sparkConf = HoodieWriteClient.registerClasses(sparkConf);
return new JavaSparkContext(HoodieReadClient.addHoodieSupport(sparkConf));
}
+
+ /**
+ * Class used for compare result and expected.
+ */
+ public static class HoodieTripModel {
+ double timestamp;
+ String rowKey;
+ String rider;
+ String driver;
+ double beginLat;
+ double beginLon;
+ double endLat;
+ double endLon;
+
+ private HoodieTripModel(double timestamp, String rowKey, String rider, String driver, double beginLat,
+ double beginLon, double endLat, double endLon) {
+ this.timestamp = timestamp;
+ this.rowKey = rowKey;
+ this.rider = rider;
+ this.driver = driver;
+ this.beginLat = beginLat;
+ this.beginLon = beginLon;
+ this.endLat = endLat;
+ this.endLon = endLon;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HoodieTripModel other = (HoodieTripModel) o;
+ return timestamp == other.timestamp && rowKey.equals(other.rowKey) && rider.equals(other.rider)
+ && driver.equals(other.driver) && beginLat == other.beginLat && beginLon == other.beginLon
+ && endLat == other.endLat && endLon == other.endLon;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timestamp, rowKey, rider, driver, beginLat, beginLon, endLat, endLon);
+ }
+ }
}