You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/08/28 00:44:55 UTC
[hudi] branch master updated: [HUDI-531] Add java doc for hudi test
suite general classes (#1900)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fa81248 [HUDI-531] Add java doc for hudi test suite general classes (#1900)
fa81248 is described below
commit fa812482473f0cc8c2f34e2db07366cc3e5f7066
Author: Mathieu <wx...@126.com>
AuthorDate: Fri Aug 28 08:44:40 2020 +0800
[HUDI-531] Add java doc for hudi test suite general classes (#1900)
---
.../org/apache/hudi/client/HoodieWriteClient.java | 2 +-
hudi-integ-test/README.md | 6 +--
.../hudi/integ/testsuite/converter/Converter.java | 6 +++
.../integ/testsuite/dag/nodes/BulkInsertNode.java | 3 ++
.../hudi/integ/testsuite/dag/nodes/CleanNode.java | 4 ++
.../integ/testsuite/dag/nodes/CompactNode.java | 10 ++++
.../hudi/integ/testsuite/dag/nodes/DagNode.java | 8 ++-
.../integ/testsuite/dag/nodes/HiveQueryNode.java | 3 ++
.../integ/testsuite/dag/nodes/HiveSyncNode.java | 3 ++
.../hudi/integ/testsuite/dag/nodes/InsertNode.java | 3 ++
.../integ/testsuite/dag/nodes/RollbackNode.java | 9 ++++
.../testsuite/dag/nodes/ScheduleCompactNode.java | 3 ++
.../testsuite/dag/nodes/SparkSQLQueryNode.java | 9 ++++
.../hudi/integ/testsuite/dag/nodes/UpsertNode.java | 3 ++
.../integ/testsuite/dag/nodes/ValidateNode.java | 9 ++++
.../testsuite/dag/scheduler/DagScheduler.java | 21 ++++++++
.../integ/testsuite/generator/DeltaGenerator.java | 2 +-
.../GenericRecordFullPayloadGenerator.java | 58 ++++++++++++++++++++++
.../GenericRecordFullPayloadSizeEstimator.java | 12 +++++
.../generator/UpdateGeneratorIterator.java | 3 ++
.../integ/testsuite/writer/DeltaWriterAdapter.java | 3 ++
21 files changed, 174 insertions(+), 6 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 9f6df7b..142ff33 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -557,7 +557,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
- + " cleanerElaspsedMs" + durationMs);
+ + " cleanerElapsedMs" + durationMs);
}
return metadata;
}
diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md
index d87fec3..a497ad9 100644
--- a/hudi-integ-test/README.md
+++ b/hudi-integ-test/README.md
@@ -41,7 +41,7 @@ Depending on the type of workload generated, data is either ingested into the ta
dataset or the corresponding workload operation is executed. For example compaction does not necessarily need a workload
to be generated/ingested but can require an execution.
-## Other actions/operatons
+## Other actions/operations
The test suite supports different types of operations besides ingestion such as Hive Query execution, Clean action etc.
@@ -66,9 +66,9 @@ link#HudiDeltaStreamer page to learn about all the available configs applicable
There are 2 ways to generate a workload pattern
- 1.Programatically
+ 1.Programmatically
-Choose to write up the entire DAG of operations programatically, take a look at `WorkflowDagGenerator` class.
+Choose to write up the entire DAG of operations programmatically, take a look at `WorkflowDagGenerator` class.
Once you're ready with the DAG you want to execute, simply pass the class name as follows:
```
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/Converter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/Converter.java
index e4ad0a7..89f3b88 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/Converter.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/Converter.java
@@ -29,5 +29,11 @@ import org.apache.spark.api.java.JavaRDD;
*/
public interface Converter<I, O> extends Serializable {
+ /**
+ * Convert data from one format to another.
+ *
+ * @param inputRDD Input data
+ * @return Data in target format
+ */
JavaRDD<O> convert(JavaRDD<I> inputRDD);
}
\ No newline at end of file
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BulkInsertNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BulkInsertNode.java
index 7a8f405..bdf57f8 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BulkInsertNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BulkInsertNode.java
@@ -24,6 +24,9 @@ import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.spark.api.java.JavaRDD;
+/**
+ * Represents a bulk insert node in the DAG of operations for a workflow.
+ */
public class BulkInsertNode extends InsertNode {
public BulkInsertNode(Config config) {
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
index 7083b47..2c0fcba 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java
@@ -20,6 +20,10 @@ package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+/**
+ * Represents a clean node in the DAG of operations for a workflow. Clean up any stale/old files/data lying around
+ * (either on file storage or index storage) based on configurations and CleaningPolicy used.
+ */
public class CleanNode extends DagNode<Boolean> {
public CleanNode() {
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
index 92fe53c..4c3ad61 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
@@ -26,12 +26,22 @@ import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.spark.api.java.JavaRDD;
+/**
+ * Represents a compact node in the DAG of operations for a workflow.
+ */
public class CompactNode extends DagNode<JavaRDD<WriteStatus>> {
public CompactNode(Config config) {
this.config = config;
}
+ /**
+ * Method helps to start the compact operation. It will compact the last pending compact instant in the timeline
+ * if it has one.
+ *
+ * @param executionContext Execution context to run this compaction
+ * @throws Exception will be thrown if any error occurred.
+ */
@Override
public void execute(ExecutionContext executionContext) throws Exception {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(),
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
index aa54dc9..df54b4c 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java
@@ -29,7 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Represents a Node in the DAG of operations for a workflow.
+ * Base abstraction of an compute node in the DAG of operations for a workflow.
*/
public abstract class DagNode<O> implements Comparable<DagNode<O>> {
@@ -76,6 +76,12 @@ public abstract class DagNode<O> implements Comparable<DagNode<O>> {
this.parentNodes = parentNodes;
}
+ /**
+ * Execute the {@link DagNode}.
+ *
+ * @param context The context needed for an execution of a node.
+ * @throws Exception Thrown if the execution failed.
+ */
public abstract void execute(ExecutionContext context) throws Exception;
public boolean isCompleted() {
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
index 04f8c2e..f36b7d4 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
@@ -30,6 +30,9 @@ import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
+/**
+ * A hive query node in the DAG of operations for a workflow. used to perform a hive query with given config.
+ */
public class HiveQueryNode extends DagNode<Boolean> {
private HiveServiceProvider hiveServiceProvider;
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java
index f24da14..f52a918 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java
@@ -22,6 +22,9 @@ import org.apache.hudi.integ.testsuite.helpers.HiveServiceProvider;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+/**
+ * Represents a hive sync node in the DAG of operations for a workflow. Helps to sync hoodie data to hive table.
+ */
public class HiveSyncNode extends DagNode<Boolean> {
private HiveServiceProvider hiveServiceProvider;
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
index 23cb285..62db5b6 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java
@@ -26,6 +26,9 @@ import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.apache.spark.api.java.JavaRDD;
+/**
+ * An insert node in the DAG of operations for a workflow.
+ */
public class InsertNode extends DagNode<JavaRDD<WriteStatus>> {
public InsertNode(Config config) {
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
index b6d828a..cf96961 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
@@ -24,12 +24,21 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+/**
+ * A rollback node in the DAG helps to perform rollback operations.
+ */
public class RollbackNode extends DagNode<Option<HoodieInstant>> {
public RollbackNode(Config config) {
this.config = config;
}
+ /**
+ * Method helps to rollback the last commit instant in the timeline, if it has one.
+ *
+ * @param executionContext Execution context to perform this rollback
+ * @throws Exception will be thrown if any error occurred
+ */
@Override
public void execute(ExecutionContext executionContext) throws Exception {
log.info("Executing rollback node {}", this.getName());
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
index 93502ae..0aa67f4 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
@@ -25,6 +25,9 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+/**
+ * A schedule node in the DAG of operations for a workflow helps to schedule compact operation.
+ */
public class ScheduleCompactNode extends DagNode<Option<String>> {
public ScheduleCompactNode(Config config) {
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java
index a8a1b72..e06d6de 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java
@@ -26,6 +26,9 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+/**
+ * A SparkSQL query node in the DAG of operations for a workflow.
+ */
public class SparkSQLQueryNode extends DagNode<Boolean> {
HiveServiceProvider hiveServiceProvider;
@@ -35,6 +38,12 @@ public class SparkSQLQueryNode extends DagNode<Boolean> {
this.hiveServiceProvider = new HiveServiceProvider(config);
}
+ /**
+ * Method helps to execute a sparkSql query from a hive table.
+ *
+ * @param executionContext Execution context to perform this query.
+ * @throws Exception will be thrown if ant error occurred
+ */
@Override
public void execute(ExecutionContext executionContext) throws Exception {
log.info("Executing spark sql query node");
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/UpsertNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/UpsertNode.java
index 3872f76..1377a4d 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/UpsertNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/UpsertNode.java
@@ -25,6 +25,9 @@ import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.apache.spark.api.java.JavaRDD;
+/**
+ * Represents an upsert node in the DAG of operations for a workflow.
+ */
public class UpsertNode extends InsertNode {
public UpsertNode(Config config) {
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java
index 5ba0249..37244c0 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java
@@ -23,6 +23,9 @@ import java.util.function.Function;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+/**
+ * A validate node helps to validate its parent nodes with given function.
+ */
public class ValidateNode<R> extends DagNode {
protected Function<List<DagNode>, R> function;
@@ -32,6 +35,12 @@ public class ValidateNode<R> extends DagNode {
this.config = config;
}
+ /**
+ * Method to start the validate operation. Exceptions will be thrown if its parent nodes exist and WAIT_FOR_PARENTS
+ * was set to true or default, but the parent nodes have not completed yet.
+ *
+ * @param executionContext Context to execute this node
+ */
@Override
public void execute(ExecutionContext executionContext) {
if (this.getParentNodes().size() > 0 && (Boolean) this.config.getOtherConfigs().getOrDefault("WAIT_FOR_PARENTS",
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
index 7d78d83..280f301 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java
@@ -37,6 +37,10 @@ import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * The Dag scheduler schedules the workflow DAGs. It will convert DAG to node set and execute the nodes according to
+ * the relations between nodes.
+ */
public class DagScheduler {
private static Logger log = LoggerFactory.getLogger(DagScheduler.class);
@@ -48,6 +52,11 @@ public class DagScheduler {
this.executionContext = new ExecutionContext(null, hoodieTestSuiteWriter, deltaGenerator);
}
+ /**
+ * Method to start executing workflow DAGs.
+ *
+ * @throws Exception Thrown if schedule failed.
+ */
public void schedule() throws Exception {
ExecutorService service = Executors.newFixedThreadPool(2);
try {
@@ -61,6 +70,13 @@ public class DagScheduler {
}
}
+ /**
+ * Method to start executing the nodes in workflow DAGs.
+ *
+ * @param service ExecutorService
+ * @param nodes Nodes to be executed
+ * @throws Exception will be thrown if ant error occurred
+ */
private void execute(ExecutorService service, List<DagNode> nodes) throws Exception {
// Nodes at the same level are executed in parallel
Queue<DagNode> queue = new PriorityQueue<>(nodes);
@@ -84,6 +100,11 @@ public class DagScheduler {
log.info("Finished workloads");
}
+ /**
+ * Execute the given node.
+ *
+ * @param node The node to be executed
+ */
private void executeNode(DagNode node) {
if (node.isCompleted()) {
throw new RuntimeException("DagNode already completed! Cannot re-execute");
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
index c9d129e..3b7b114 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
@@ -57,7 +57,7 @@ import scala.Tuple2;
*/
public class DeltaGenerator implements Serializable {
- private static Logger log = LoggerFactory.getLogger(DFSHoodieDatasetInputReader.class);
+ private static Logger log = LoggerFactory.getLogger(DeltaGenerator.class);
private DeltaConfig deltaOutputConfig;
private transient JavaSparkContext jsc;
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
index 706ffd2..5ef77b2 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
@@ -98,14 +98,32 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
}
}
+ /**
+ * Create a new {@link GenericRecord} with random value according to given schema.
+ *
+ * @return {@link GenericRecord} with random value
+ */
public GenericRecord getNewPayload() {
return convert(baseSchema);
}
+ /**
+ * Update a given {@link GenericRecord} with random value. The fields in {@code blacklistFields} will not be updated.
+ *
+ * @param record GenericRecord to update
+ * @param blacklistFields Fields whose value should not be touched
+ * @return The updated {@link GenericRecord}
+ */
public GenericRecord getUpdatePayload(GenericRecord record, List<String> blacklistFields) {
return randomize(record, blacklistFields);
}
+ /**
+ * Create a {@link GenericRecord} with random value according to given schema.
+ *
+ * @param schema Schema to create record with
+ * @return {@link GenericRecord} with random value
+ */
protected GenericRecord convert(Schema schema) {
GenericRecord result = new GenericData.Record(schema);
for (Schema.Field f : schema.getFields()) {
@@ -114,6 +132,13 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
return result;
}
+ /**
+ * Create a new {@link GenericRecord} with random values. Not all the fields have value, it is random, and its value
+ * is random too.
+ *
+ * @param schema Schema to create with.
+ * @return A {@link GenericRecord} with random value.
+ */
protected GenericRecord convertPartial(Schema schema) {
GenericRecord result = new GenericData.Record(schema);
for (Schema.Field f : schema.getFields()) {
@@ -128,6 +153,14 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
return result;
}
+ /**
+ * Set random value to {@link GenericRecord} according to the schema type of field.
+ * The field in blacklist will not be set.
+ *
+ * @param record GenericRecord to randomize.
+ * @param blacklistFields blacklistFields where the filed will not be randomized.
+ * @return Randomized GenericRecord.
+ */
protected GenericRecord randomize(GenericRecord record, List<String> blacklistFields) {
for (Schema.Field f : record.getSchema().getFields()) {
if (blacklistFields == null || !blacklistFields.contains(f.name())) {
@@ -137,6 +170,9 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
return record;
}
+ /**
+ * Generate random value according to their type.
+ */
private Object typeConvert(Schema schema) {
Schema localSchema = schema;
if (isOption(schema)) {
@@ -215,10 +251,26 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
}
}
+ /**
+ * Validate whether the record match schema.
+ *
+ * @param record Record to validate.
+ * @return True if matches.
+ */
public boolean validate(GenericRecord record) {
return genericData.validate(baseSchema, record);
}
+ /**
+ * Check whether a schema is option.
+ * return true if it match the follows:
+ * 1. Its type is Type.UNION
+ * 2. Has two types
+ * 3. Has a NULL type.
+ *
+ * @param schema
+ * @return
+ */
protected boolean isOption(Schema schema) {
return schema.getType().equals(Schema.Type.UNION)
&& schema.getTypes().size() == 2
@@ -260,6 +312,12 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
}
}
+ /**
+ * Method help to calculate the number of entries to add.
+ *
+ * @param elementSchema
+ * @return Number of entries to add
+ */
private int numEntriesToAdd(Schema elementSchema) {
// Find the size of the primitive data type in bytes
int primitiveDataTypeSize = getSize(elementSchema);
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadSizeEstimator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadSizeEstimator.java
index c6a8f4e..e595c26 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadSizeEstimator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadSizeEstimator.java
@@ -61,6 +61,12 @@ public class GenericRecordFullPayloadSizeEstimator implements Serializable {
return (int) size;
}
+ /**
+ * Estimate the size of a given schema according to their type.
+ *
+ * @param schema schema to estimate.
+ * @return Size of the given schema.
+ */
private long typeEstimate(Schema schema) {
Schema localSchema = schema;
if (isOption(schema)) {
@@ -112,6 +118,12 @@ public class GenericRecordFullPayloadSizeEstimator implements Serializable {
|| schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
}
+ /**
+ * Get the nonNull Schema of a given UNION Schema.
+ *
+ * @param schema
+ * @return
+ */
protected Schema getNonNull(Schema schema) {
List<Schema> types = schema.getTypes();
return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
index 685a5c7..a33ef0c 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java
@@ -24,6 +24,9 @@ import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+/**
+ * A lazy update payload generator to generate {@link GenericRecord}s lazily.
+ */
public class UpdateGeneratorIterator implements Iterator<GenericRecord> {
// Use the full payload generator as default
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterAdapter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterAdapter.java
index c941458..3ce302f 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterAdapter.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterAdapter.java
@@ -23,6 +23,9 @@ import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
+/**
+ * Adapter use Delta Writer.
+ */
public interface DeltaWriterAdapter<I> {
List<DeltaWriteStats> write(Iterator<I> input) throws IOException;