You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/01/04 19:07:37 UTC
[incubator-hudi] branch master updated: [HUDI-377] Adding Delete()
support to DeltaStreamer (#1073)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7031445 [HUDI-377] Adding Delete() support to DeltaStreamer (#1073)
7031445 is described below
commit 7031445eb3cae5a4557786c7eb080944320609aa
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Sat Jan 4 11:07:31 2020 -0800
[HUDI-377] Adding Delete() support to DeltaStreamer (#1073)
- Provides ability to perform hard deletes by writing delete marker records into the source data
- if the record contains a special field _hoodie_delete_marker set to true, deletes are performed
---
.../hudi/common/HoodieTestDataGenerator.java | 83 ++++++++++++++++++----
.../main/java/org/apache/hudi/BaseAvroPayload.java | 7 +-
.../main/java/org/apache/hudi/DataSourceUtils.java | 16 ++---
.../hudi/OverwriteWithLatestAvroPayload.java | 12 +++-
.../hudi/utilities/deltastreamer/DeltaSync.java | 14 ++--
.../deltastreamer/SourceFormatAdapter.java | 11 +--
.../hudi/utilities/TestHoodieDeltaStreamer.java | 31 ++++----
.../utilities/sources/AbstractBaseTestSource.java | 25 +++++--
.../resources/delta-streamer-config/source.avsc | 8 ++-
.../sql-transformer.properties | 2 +-
.../resources/delta-streamer-config/target.avsc | 12 +++-
11 files changed, 150 insertions(+), 71 deletions(-)
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index efc2444..ddeec6a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -57,6 +57,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+
/**
* Class to be used in tests to keep generating test inserts and updates against a corpus.
* <p>
@@ -73,14 +74,15 @@ public class HoodieTestDataGenerator {
public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static final int DEFAULT_PARTITION_DEPTH = 3;
- public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
- + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
- + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
- + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
- + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
- + "{\"name\":\"fare\",\"type\": \"double\"}]}";
+ public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
+ + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"},"
+ + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"},"
+ + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"
+ + "{\"name\":\"fare\",\"type\": \"double\"},"
+ + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
- public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double";
+ public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double,boolean";
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema);
@@ -118,6 +120,15 @@ public class HoodieTestDataGenerator {
}
/**
+ * Generates a new avro record of the above schema format for a delete.
+ */
+ public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, String commitTime) throws IOException {
+ GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0,
+ true);
+ return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
+ }
+
+ /**
* Generates a new avro record of the above schema format, retaining the key if optionally provided.
*/
public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String commitTime) {
@@ -126,7 +137,12 @@ public class HoodieTestDataGenerator {
}
public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
- double timestamp) {
+ double timestamp) {
+ return generateGenericRecord(rowKey, riderName, driverName, timestamp, false);
+ }
+
+ public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
+ double timestamp, boolean isDeleteRecord) {
GenericRecord rec = new GenericData.Record(avroSchema);
rec.put("_row_key", rowKey);
rec.put("timestamp", timestamp);
@@ -137,12 +153,18 @@ public class HoodieTestDataGenerator {
rec.put("end_lat", rand.nextDouble());
rec.put("end_lon", rand.nextDouble());
rec.put("fare", rand.nextDouble() * 100);
+ if (isDeleteRecord) {
+ rec.put("_hoodie_is_deleted", true);
+ } else {
+ rec.put("_hoodie_is_deleted", false);
+ }
return rec;
}
public static void createCommitFile(String basePath, String commitTime, Configuration configuration) {
Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
- HoodieTimeline.makeRequestedCommitFileName(commitTime)).forEach(f -> {
+ HoodieTimeline.makeRequestedCommitFileName(commitTime))
+ .forEach(f -> {
Path commitFile = new Path(
basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
FSDataOutputStream os = null;
@@ -176,7 +198,7 @@ public class HoodieTestDataGenerator {
}
public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant,
- Configuration configuration) throws IOException {
+ Configuration configuration) throws IOException {
Path commitFile =
new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName());
FileSystem fs = FSUtils.getFs(basePath, configuration);
@@ -332,7 +354,7 @@ public class HoodieTestDataGenerator {
* list
*
* @param commitTime Commit Timestamp
- * @param n Number of updates (including dups)
+ * @param n Number of updates (including dups)
* @return list of hoodie record updates
*/
public List<HoodieRecord> generateUpdates(String commitTime, Integer n) throws IOException {
@@ -349,7 +371,7 @@ public class HoodieTestDataGenerator {
* Generates deduped updates of keys previously inserted, randomly distributed across the keys above.
*
* @param commitTime Commit Timestamp
- * @param n Number of unique records
+ * @param n Number of unique records
* @return list of hoodie record updates
*/
public List<HoodieRecord> generateUniqueUpdates(String commitTime, Integer n) {
@@ -370,7 +392,7 @@ public class HoodieTestDataGenerator {
* Generates deduped updates of keys previously inserted, randomly distributed across the keys above.
*
* @param commitTime Commit Timestamp
- * @param n Number of unique records
+ * @param n Number of unique records
* @return stream of hoodie record updates
*/
public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, Integer n) {
@@ -418,11 +440,46 @@ public class HoodieTestDataGenerator {
index = (index + 1) % numExistingKeys;
kp = existingKeys.get(index);
}
+ existingKeys.remove(kp);
+ numExistingKeys--;
used.add(kp);
return kp.key;
});
}
+ /**
+ * Generates deduped delete records previously inserted, randomly distributed across the keys above.
+ *
+ * @param commitTime Commit Timestamp
+ * @param n Number of unique records
+ * @return stream of hoodie records for delete
+ */
+ public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String commitTime, Integer n) {
+ final Set<KeyPartition> used = new HashSet<>();
+
+ if (n > numExistingKeys) {
+ throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys");
+ }
+
+ return IntStream.range(0, n).boxed().map(i -> {
+ int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1);
+ KeyPartition kp = existingKeys.get(index);
+ // Find the available keyPartition starting from randomly chosen one.
+ while (used.contains(kp)) {
+ index = (index + 1) % numExistingKeys;
+ kp = existingKeys.get(index);
+ }
+ existingKeys.remove(kp);
+ numExistingKeys--;
+ used.add(kp);
+ try {
+ return new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime));
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ });
+ }
+
public String[] getPartitionPaths() {
return partitionPaths;
}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java
index b133560..3bf07d9 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java
@@ -31,7 +31,6 @@ import java.io.Serializable;
* Base class for all AVRO record based payloads, that can be ordered based on a field.
*/
public abstract class BaseAvroPayload implements Serializable {
-
/**
* Avro data extracted from the source converted to bytes.
*/
@@ -43,8 +42,10 @@ public abstract class BaseAvroPayload implements Serializable {
protected final Comparable orderingVal;
/**
- * @param record
- * @param orderingVal
+ * Instantiate {@link BaseAvroPayload}.
+ *
+ * @param record Generic record for the payload.
+ * @param orderingVal {@link Comparable} to be used in pre combine.
*/
public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
try {
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 1dbd944..b06a9ae 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -103,7 +103,7 @@ public class DataSourceUtils {
/**
* Create a key generator class via reflection, passing in any configs needed.
- *
+ * <p>
* If the class name of key generator is configured through the properties file, i.e., {@code props}, use the
* corresponding key generator class; otherwise, use the default key generator class specified in {@code
* DataSourceWriteOptions}.
@@ -125,7 +125,7 @@ public class DataSourceUtils {
throws IOException {
try {
return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
- new Class<?>[]{GenericRecord.class, Comparable.class}, record, orderingVal);
+ new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
} catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e);
}
@@ -140,7 +140,7 @@ public class DataSourceUtils {
}
public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
- String tblName, Map<String, String> parameters) {
+ String tblName, Map<String, String> parameters) {
// inline compaction is on by default for MOR
boolean inlineCompact = parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
@@ -162,7 +162,7 @@ public class DataSourceUtils {
}
public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
- String commitTime, String operation) {
+ String commitTime, String operation) {
if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) {
return client.bulkInsert(hoodieRecords, commitTime);
} else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) {
@@ -174,19 +174,19 @@ public class DataSourceUtils {
}
public static JavaRDD<WriteStatus> doDeleteOperation(HoodieWriteClient client, JavaRDD<HoodieKey> hoodieKeys,
- String commitTime) {
+ String commitTime) {
return client.delete(hoodieKeys, commitTime);
}
public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
- String payloadClass) throws IOException {
+ String payloadClass) throws IOException {
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
return new HoodieRecord<>(hKey, payload);
}
@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
- HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) {
+ HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) {
HoodieReadClient client = null;
try {
client = new HoodieReadClient<>(jssc, writeConfig, timelineService);
@@ -205,7 +205,7 @@ public class DataSourceUtils {
@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
- Map<String, String> parameters, Option<EmbeddedTimelineService> timelineService) {
+ Map<String, String> parameters, Option<EmbeddedTimelineService> timelineService) {
HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, timelineService);
diff --git a/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java
index e860837..32d584e 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java
@@ -38,8 +38,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
/**
- * @param record
- * @param orderingVal
+ *
*/
public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
@@ -61,8 +60,15 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
+
+ GenericRecord genericRecord = (GenericRecord) getInsertValue(schema).get();
// combining strategy here trivially ignores currentValue on disk and writes this record
- return getInsertValue(schema);
+ Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+ if (deleteMarker instanceof Boolean && (boolean) deleteMarker) {
+ return Option.empty();
+ } else {
+ return Option.of(genericRecord);
+ }
}
@Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 7dfb015..a001323 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -158,8 +158,8 @@ public class DeltaSync implements Serializable {
private final HoodieTableType tableType;
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
- HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
- Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
+ HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
+ Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
this.cfg = cfg;
this.jssc = jssc;
@@ -288,7 +288,7 @@ public class DeltaSync implements Serializable {
// default to RowBasedSchemaProvider
schemaProvider = this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null
? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())).orElse(
- dataAndCheckpoint.getSchemaProvider())
+ dataAndCheckpoint.getSchemaProvider())
: this.schemaProvider;
} else {
// Pull the data from the source & prepare the write
@@ -316,22 +316,22 @@ public class DeltaSync implements Serializable {
(Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField));
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
});
+
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
}
/**
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed.
*
- * @param records Input Records
+ * @param records Input Records
* @param checkpointStr Checkpoint String
- * @param metrics Metrics
+ * @param metrics Metrics
* @return Option Compaction instant if one is scheduled
*/
private Option<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
- HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {
+ HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {
Option<String> scheduledCompactionInstant = Option.empty();
-
// filter dupes if needed
if (cfg.filterDupes) {
// turn upserts to insert
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index a21d263..dd266ed 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -50,10 +50,6 @@ public final class SourceFormatAdapter {
/**
* Fetch new data in avro format. If the source provides data in different format, they are translated to Avro format
- *
- * @param lastCkptStr
- * @param sourceLimit
- * @return
*/
public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
@@ -78,10 +74,6 @@ public final class SourceFormatAdapter {
/**
* Fetch new data in row format. If the source provides data in different format, they are translated to Row format
- *
- * @param lastCkptStr
- * @param sourceLimit
- * @return
*/
public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
switch (source.getSourceType()) {
@@ -95,7 +87,8 @@ public final class SourceFormatAdapter {
.ofNullable(
r.getBatch()
.map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
- source.getSparkSession()))
+ source.getSparkSession())
+ )
.orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index c5f6c76..51e5b3c 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -179,7 +179,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
- String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) {
+ String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips";
@@ -198,7 +198,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
}
static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op,
- boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
+ boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
cfg.targetBasePath = basePath;
cfg.targetTableName = "hoodie_trips_copy";
@@ -352,11 +352,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
new HoodieDeltaStreamer(cfg, jsc).sync();
- TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
- TestHelpers.assertDistanceCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
+ TestHelpers.assertRecordCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
+ TestHelpers.assertDistanceCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
- assertEquals(2000, counts.get(0).getLong(1));
+ assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
}
@Test
@@ -396,8 +396,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
} else {
TestHelpers.assertAtleastNCompactionCommits(5, datasetBasePath, dfs);
}
- TestHelpers.assertRecordCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext);
- TestHelpers.assertDistanceCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext);
+ TestHelpers.assertRecordCount(totalRecords + 200, datasetBasePath + "/*/*.parquet", sqlContext);
+ TestHelpers.assertDistanceCount(totalRecords + 200, datasetBasePath + "/*/*.parquet", sqlContext);
return true;
}, 180);
ds.shutdownGracefully();
@@ -457,12 +457,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
cfg.sourceLimit = 2000;
cfg.operation = Operation.UPSERT;
new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
- TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
- TestHelpers.assertDistanceCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
- TestHelpers.assertDistanceCountWithExactValue(2000, datasetBasePath + "/*/*.parquet", sqlContext);
+ TestHelpers.assertRecordCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
+ TestHelpers.assertDistanceCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
+ TestHelpers.assertDistanceCountWithExactValue(1950, datasetBasePath + "/*/*.parquet", sqlContext);
lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
- assertEquals(2000, counts.get(0).getLong(1));
+ assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
// Incrementally pull changes in upstream hudi table and apply to downstream table
downstreamCfg =
@@ -476,7 +476,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
String finalInstant =
TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamDatasetBasePath, dfs, 2);
counts = TestHelpers.countsPerCommit(downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
- assertEquals(2000, counts.get(0).getLong(1));
+ assertEquals(2000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
// Test Hive integration
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
@@ -566,12 +566,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
/**
* Returns some random number as distance between the points.
- *
+ *
* @param lat1 Latitiude of source
* @param lat2 Latitude of destination
* @param lon1 Longitude of source
* @param lon2 Longitude of destination
- * @return
*/
@Override
public Double call(Double lat1, Double lat2, Double lon1, Double lon2) {
@@ -586,7 +585,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
- TypedProperties properties) {
+ TypedProperties properties) {
rowDataset.sqlContext().udf().register("distance_udf", new DistanceUDF(), DataTypes.DoubleType);
return rowDataset.withColumn("haversine_distance", functions.callUDF("distance_udf", functions.col("begin_lat"),
functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat")));
@@ -607,7 +606,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
@Override
public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
- TypedProperties properties) {
+ TypedProperties properties) {
System.out.println("DropAllTransformer called !!");
return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
index 745b0f0..4eab369 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
@@ -76,12 +76,12 @@ public abstract class AbstractBaseTestSource extends AvroSource {
}
protected AbstractBaseTestSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
- SchemaProvider schemaProvider) {
+ SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
}
protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime,
- int partition) {
+ int partition) {
int maxUniqueKeys =
props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
@@ -94,10 +94,12 @@ public abstract class AbstractBaseTestSource extends AvroSource {
int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
int numInserts = sourceLimit - numUpdates;
LOG.info("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates);
+ boolean reachedMax = false;
if (numInserts + numExistingKeys > maxUniqueKeys) {
// Limit inserts so that maxUniqueRecords is maintained
numInserts = Math.max(0, maxUniqueKeys - numExistingKeys);
+ reachedMax = true;
}
if ((numInserts + numUpdates) < sourceLimit) {
@@ -105,16 +107,25 @@ public abstract class AbstractBaseTestSource extends AvroSource {
numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts);
}
- LOG.info("NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
+ Stream<GenericRecord> deleteStream = Stream.empty();
+ Stream<GenericRecord> updateStream;
long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
LOG.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory()
+ ", Free Memory=" + Runtime.getRuntime().freeMemory());
-
- Stream<GenericRecord> updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
- .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
+ if (!reachedMax && numUpdates >= 50) {
+ LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords="
+ + maxUniqueKeys);
+ // if we generate update followed by deletes -> some keys in update batch might be picked up for deletes. Hence generating delete batch followed by updates
+ deleteStream = dataGenerator.generateUniqueDeleteRecordStream(commitTime, 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
+ updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates - 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
+ } else {
+ LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
+ updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
+ .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
+ }
Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts)
.map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
- return Stream.concat(updateStream, insertStream);
+ return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream));
}
private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) {
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
index 2796e08..95757a3 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
@@ -43,9 +43,15 @@
}, {
"name" : "end_lon",
"type" : "double"
- }, {
+ },
+ {
"name" : "fare",
"type" : "double"
+ },
+ {
+ "name" : "_hoodie_is_deleted",
+ "type" : "boolean",
+ "default" : false
} ]
}
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
index 16a09ed..e8b2857 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
@@ -16,4 +16,4 @@
# limitations under the License.
###
include=base.properties
-hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
+hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
index 11e23a4..38e7255 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
@@ -46,9 +46,15 @@
}, {
"name" : "fare",
"type" : "double"
- }, {
- "name" : "haversine_distance",
- "type" : "double"
+ },
+ {
+ "name" : "_hoodie_is_deleted",
+ "type" : "boolean",
+ "default" : false
+ },
+ {
+ "name" : "haversine_distance",
+ "type" : "double"
}]
}