You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/03/17 11:18:26 UTC
[hudi] branch master updated: [HUDI-2439] Replace RDD with HoodieData in HoodieSparkTable and commit executors (#4856)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7446ff9 [HUDI-2439] Replace RDD with HoodieData in HoodieSparkTable and commit executors (#4856)
7446ff9 is described below
commit 7446ff95a7b099d8016b7ca48fdeed9b290d710f
Author: Raymond Xu <27...@users.noreply.github.com>
AuthorDate: Thu Mar 17 19:17:56 2022 +0800
[HUDI-2439] Replace RDD with HoodieData in HoodieSparkTable and commit executors (#4856)
- Adopt HoodieData in Spark action commit executors
- Make Spark independent DeleteHelper, WriteHelper, MergeHelper in hudi-client-common
- Make HoodieTable in WriteClient APIs have raw type to decouple with Client's generic types
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 41 ++---
.../java/org/apache/hudi/table/HoodieTable.java | 4 +-
.../action/cluster/strategy/UpdateStrategy.java | 2 +-
.../table/action/commit/BaseBulkInsertHelper.java | 4 +-
.../table/action/commit/HoodieDeleteHelper.java} | 68 ++++----
.../table/action/commit/HoodieMergeHelper.java} | 21 ++-
.../table/action/commit/HoodieWriteHelper.java} | 40 ++---
.../apache/hudi/client/HoodieFlinkWriteClient.java | 24 +--
.../hudi/table/HoodieFlinkCopyOnWriteTable.java | 5 +-
.../apache/hudi/client/HoodieJavaWriteClient.java | 21 +--
.../hudi/table/HoodieJavaCopyOnWriteTable.java | 4 +-
.../hudi/table/HoodieJavaMergeOnReadTable.java | 2 +-
.../commit/JavaBulkInsertCommitActionExecutor.java | 6 +-
.../table/action/commit/JavaBulkInsertHelper.java | 5 +-
.../JavaBulkInsertPreppedCommitActionExecutor.java | 4 +-
.../org/apache/hudi/client/HoodieReadClient.java | 2 +-
.../apache/hudi/client/SparkRDDWriteClient.java | 111 ++++++------
.../MultipleSparkJobExecutionStrategy.java | 33 ++--
.../SparkSingleFileSortExecutionStrategy.java | 6 +-
.../SparkSortAndSizeExecutionStrategy.java | 6 +-
.../update/strategy/BaseSparkUpdateStrategy.java | 9 +-
.../update/strategy/SparkAllowUpdateStrategy.java | 5 +-
.../update/strategy/SparkRejectUpdateStrategy.java | 4 +-
.../hudi/client/utils/SparkValidatorUtils.java | 24 +--
.../client/validator/SparkPreCommitValidator.java | 7 +-
.../SqlQueryEqualityPreCommitValidator.java | 4 +-
.../SqlQueryInequalityPreCommitValidator.java | 4 +-
.../validator/SqlQueryPreCommitValidator.java | 5 +-
.../SqlQuerySingleResultPreCommitValidator.java | 4 +-
.../org/apache/hudi/data/HoodieJavaPairRDD.java | 6 +
.../java/org/apache/hudi/data/HoodieJavaRDD.java | 22 +++
.../hudi/table/HoodieSparkCopyOnWriteTable.java | 56 +++---
.../hudi/table/HoodieSparkMergeOnReadTable.java | 54 +++---
.../org/apache/hudi/table/HoodieSparkTable.java | 13 +-
.../SparkBootstrapCommitActionExecutor.java | 53 +++---
.../SparkBootstrapDeltaCommitActionExecutor.java | 8 +-
...SparkExecuteClusteringCommitActionExecutor.java | 9 +-
.../commit/BaseSparkCommitActionExecutor.java | 105 +++++------
.../SparkBulkInsertCommitActionExecutor.java | 17 +-
.../table/action/commit/SparkBulkInsertHelper.java | 59 ++++---
...SparkBulkInsertPreppedCommitActionExecutor.java | 13 +-
.../commit/SparkDeleteCommitActionExecutor.java | 11 +-
.../SparkDeletePartitionCommitActionExecutor.java | 19 +-
.../commit/SparkInsertCommitActionExecutor.java | 11 +-
.../SparkInsertOverwriteCommitActionExecutor.java | 25 +--
...rkInsertOverwriteTableCommitActionExecutor.java | 26 ++-
.../SparkInsertPreppedCommitActionExecutor.java | 9 +-
.../commit/SparkUpsertCommitActionExecutor.java | 11 +-
.../SparkUpsertPreppedCommitActionExecutor.java | 9 +-
.../HoodieSparkMergeOnReadTableCompactor.java | 8 +-
.../BaseSparkDeltaCommitActionExecutor.java | 8 +-
.../SparkBulkInsertDeltaCommitActionExecutor.java | 17 +-
...BulkInsertPreppedDeltaCommitActionExecutor.java | 13 +-
.../SparkDeleteDeltaCommitActionExecutor.java | 13 +-
.../SparkInsertDeltaCommitActionExecutor.java | 13 +-
...parkInsertPreppedDeltaCommitActionExecutor.java | 9 +-
.../SparkUpsertDeltaCommitActionExecutor.java | 14 +-
...parkUpsertPreppedDeltaCommitActionExecutor.java | 9 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 30 +++-
.../hudi/table/TestHoodieMergeOnReadTable.java | 3 +-
.../commit/TestCopyOnWriteActionExecutor.java | 18 +-
.../hudi/table/action/commit/TestDeleteHelper.java | 194 ---------------------
.../table/action/compact/TestHoodieCompactor.java | 5 +-
.../org/apache/hudi/common/data/HoodieData.java | 8 +
.../org/apache/hudi/common/data/HoodieList.java | 26 +++
.../org/apache/hudi/common/data/HoodieMapPair.java | 11 ++
.../apache/hudi/common/data/HoodiePairData.java | 3 +
.../apache/hudi/common/data/TestHoodieList.java | 50 ++++++
.../apache/hudi/common/data/TestHoodieMapPair.java | 29 +++
69 files changed, 723 insertions(+), 769 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 9bad2e3..a6a7e18 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,8 +18,6 @@
package org.apache.hudi.client;
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -78,6 +76,9 @@ import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -242,11 +243,11 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}
- protected HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
+ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return createTable(config, hadoopConf, false);
}
- protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
+ protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {
@@ -397,7 +398,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @return Collection of WriteStatus to inspect errors and counts
*/
public abstract O bulkInsert(I records, final String instantTime,
- Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
+ Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner);
/**
@@ -417,7 +418,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @return Collection of WriteStatus to inspect errors and counts
*/
public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instantTime,
- Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
+ Option<BulkInsertPartitioner> bulkInsertPartitioner);
/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
@@ -458,7 +459,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param hoodieTable Hoodie Table
* @return Write Status
*/
- protected abstract O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTable<T, I, K, O> hoodieTable);
+ protected abstract O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTable hoodieTable);
/**
* Post Commit Hook. Derived classes use this method to perform post-commit processing
@@ -468,7 +469,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param instantTime Instant Time
* @param extraMetadata Additional Metadata passed by user
*/
- protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
+ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
try {
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
@@ -480,7 +481,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
}
}
- protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+ protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
if (!tableServicesEnabled(config)) {
return;
}
@@ -524,7 +525,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
}
}
- protected void runAnyPendingCompactions(HoodieTable<T, I, K, O> table) {
+ protected void runAnyPendingCompactions(HoodieTable table) {
table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
.forEach(instant -> {
LOG.info("Running previously failed inflight compaction at instant " + instant);
@@ -532,7 +533,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
});
}
- protected void runAnyPendingClustering(HoodieTable<T, I, K, O> table) {
+ protected void runAnyPendingClustering(HoodieTable table) {
table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> {
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
if (instantPlan.isPresent()) {
@@ -558,7 +559,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
}
}
- protected void autoArchiveOnCommit(HoodieTable<T, I, K, O> table) {
+ protected void autoArchiveOnCommit(HoodieTable table) {
if (!config.isAutoArchive()) {
return;
}
@@ -808,7 +809,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* and keep increasing unbounded over time.
* @param table table to commit on.
*/
- protected void archive(HoodieTable<T, I, K, O> table) {
+ protected void archive(HoodieTable table) {
if (!tableServicesEnabled(config)) {
return;
}
@@ -937,7 +938,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
/**
* Commit Compaction and track metrics.
*/
- protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable<T, I, K, O> table, String compactionCommitTime);
+ protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime);
/**
* Get inflight time line exclude compaction and clustering.
@@ -1223,7 +1224,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
return scheduleClustering(extraMetadata);
}
- protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
+ protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
@@ -1238,7 +1239,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param instantTime Instant Time
* @param stats Hoodie Write Stat
*/
- protected void finalizeWrite(HoodieTable<T, I, K, O> table, String instantTime, List<HoodieWriteStat> stats) {
+ protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieWriteStat> stats) {
try {
final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
table.finalizeWrite(context, instantTime, stats);
@@ -1273,7 +1274,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* @param instantTime current inflight instant time
* @return instantiated {@link HoodieTable}
*/
- protected abstract HoodieTable<T, I, K, O> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);
+ protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);
/**
* Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
@@ -1288,14 +1289,14 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* <li>Initializing metrics contexts</li>
* </ul>
*/
- protected final HoodieTable<T, I, K, O> initTable(WriteOperationType operationType, Option<String> instantTime) {
+ protected final HoodieTable initTable(WriteOperationType operationType, Option<String> instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
// Setup write schemas for deletes
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaForDeletes(metaClient);
}
- HoodieTable<T, I, K, O> table;
+ HoodieTable table;
this.txnManager.beginTransaction();
try {
@@ -1381,7 +1382,7 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
this.txnManager.close();
}
- private void setWriteTimer(HoodieTable<T, I, K, O> table) {
+ private void setWriteTimer(HoodieTable table) {
String commitType = table.getMetaClient().getCommitActionType();
if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) {
writeTimer = metrics.getCommitCtx();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index e5262ad..62a4f08 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -180,7 +180,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
- I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
+ I records, Option<BulkInsertPartitioner> bulkInsertPartitioner);
/**
* Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied instantTime {@link HoodieKey}s will be
@@ -237,7 +237,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata<O> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
- I preppedRecords, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
+ I preppedRecords, Option<BulkInsertPartitioner> bulkInsertPartitioner);
/**
* Replaces all the existing records and inserts the specified new records into Hoodie table at the supplied instantTime,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
index 0097908..4e33eb0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
@@ -28,7 +28,7 @@ import java.util.Set;
/**
* When file groups in clustering, write records to these file group need to check.
*/
-public abstract class UpdateStrategy<T extends HoodieRecordPayload<T>, I> {
+public abstract class UpdateStrategy<T extends HoodieRecordPayload, I> {
protected final HoodieEngineContext engineContext;
protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
index dffd926..ad2145c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseBulkInsertHelper.java
@@ -34,7 +34,7 @@ public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K,
public abstract HoodieWriteMetadata<O> bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
BaseCommitActionExecutor<T, I, K, O, R> executor, boolean performDedupe,
- Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner);
+ Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner);
/**
* Only write input records. Does not change timeline/index. Return information about new files created.
@@ -42,7 +42,7 @@ public abstract class BaseBulkInsertHelper<T extends HoodieRecordPayload, I, K,
public abstract O bulkInsert(I inputRecords, String instantTime,
HoodieTable<T, I, K, O> table, HoodieWriteConfig config,
boolean performDedupe,
- Option<BulkInsertPartitioner<I>> userDefinedBulkInsertPartitioner,
+ Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
boolean addMetadataFields,
int parallelism,
WriteHandleFactory writeHandleFactory);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
similarity index 53%
rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteHelper.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
index 381c115..fff52eb 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
@@ -7,19 +7,20 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -28,16 +29,12 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
@@ -48,69 +45,64 @@ import java.util.HashMap;
* @param <T>
*/
@SuppressWarnings("checkstyle:LineLength")
-public class SparkDeleteHelper<T extends HoodieRecordPayload,R> extends
- BaseDeleteHelper<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
- private SparkDeleteHelper() {
+public class HoodieDeleteHelper<T extends HoodieRecordPayload, R> extends
+ BaseDeleteHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
+ private HoodieDeleteHelper() {
}
private static class DeleteHelperHolder {
- private static final SparkDeleteHelper SPARK_DELETE_HELPER = new SparkDeleteHelper();
+ private static final HoodieDeleteHelper HOODIE_DELETE_HELPER = new HoodieDeleteHelper<>();
}
- public static SparkDeleteHelper newInstance() {
- return DeleteHelperHolder.SPARK_DELETE_HELPER;
+ public static HoodieDeleteHelper newInstance() {
+ return DeleteHelperHolder.HOODIE_DELETE_HELPER;
}
@Override
- public JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, int parallelism) {
+ public HoodieData<HoodieKey> deduplicateKeys(HoodieData<HoodieKey> keys, HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table, int parallelism) {
boolean isIndexingGlobal = table.getIndex().isGlobal();
if (isIndexingGlobal) {
- return keys.keyBy(HoodieKey::getRecordKey)
- .reduceByKey((key1, key2) -> key1, parallelism)
- .values();
+ return keys.distinctWithKey(HoodieKey::getRecordKey, parallelism);
} else {
return keys.distinct(parallelism);
}
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(String instantTime,
- JavaRDD<HoodieKey> keys,
- HoodieEngineContext context,
- HoodieWriteConfig config,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
- BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> deleteExecutor) {
- JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(String instantTime,
+ HoodieData<HoodieKey> keys,
+ HoodieEngineContext context,
+ HoodieWriteConfig config,
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
+ BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> deleteExecutor) {
try {
- HoodieWriteMetadata result = null;
- JavaRDD<HoodieKey> dedupedKeys = keys;
+ HoodieData<HoodieKey> dedupedKeys = keys;
final int parallelism = config.getDeleteShuffleParallelism();
if (config.shouldCombineBeforeDelete()) {
// De-dupe/merge if needed
dedupedKeys = deduplicateKeys(keys, table, parallelism);
- } else if (!keys.partitions().isEmpty()) {
+ } else if (!keys.isEmpty()) {
dedupedKeys = keys.repartition(parallelism);
}
- JavaRDD<HoodieRecord<T>> dedupedRecords =
+ HoodieData<HoodieRecord<T>> dedupedRecords =
dedupedKeys.map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload()));
Instant beginTag = Instant.now();
// perform index loop up to get existing location of records
- JavaRDD<HoodieRecord<T>> taggedRecords = HoodieJavaRDD.getJavaRDD(
- table.getIndex().tagLocation(HoodieJavaRDD.of(dedupedRecords), context, table));
+ HoodieData<HoodieRecord<T>> taggedRecords = table.getIndex().tagLocation(dedupedRecords, context, table);
Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
// filter out non existent keys/records
- JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
+ HoodieData<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result;
if (!taggedValidRecords.isEmpty()) {
result = deleteExecutor.execute(taggedValidRecords);
result.setIndexLookupDuration(tagLocationDuration);
} else {
// if entire set of keys are non existent
deleteExecutor.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
- result = new HoodieWriteMetadata();
- result.setWriteStatuses(jsc.emptyRDD());
+ result = new HoodieWriteMetadata<>();
+ result.setWriteStatuses(context.emptyHoodieData());
deleteExecutor.commitOnAutoCommit(result);
}
return result;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
similarity index 81%
rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index e87c3ef..2b4a5d1 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -37,31 +38,29 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
import java.util.Iterator;
-public class SparkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHelper<T, JavaRDD<HoodieRecord<T>>,
- JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
+ BaseMergeHelper<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
- private SparkMergeHelper() {
+ private HoodieMergeHelper() {
}
private static class MergeHelperHolder {
- private static final SparkMergeHelper SPARK_MERGE_HELPER = new SparkMergeHelper();
+ private static final HoodieMergeHelper HOODIE_MERGE_HELPER = new HoodieMergeHelper<>();
}
- public static SparkMergeHelper newInstance() {
- return SparkMergeHelper.MergeHelperHolder.SPARK_MERGE_HELPER;
+ public static HoodieMergeHelper newInstance() {
+ return MergeHelperHolder.HOODIE_MERGE_HELPER;
}
@Override
- public void runMerge(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
- HoodieMergeHandle<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> upsertHandle) throws IOException {
+ public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
+ HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException {
final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
- HoodieMergeHandle<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> mergeHandle = upsertHandle;
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
final GenericDatumWriter<GenericRecord> gWriter;
@@ -78,7 +77,7 @@ public class SparkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHe
}
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
- HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
+ HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
try {
final Iterator<GenericRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
similarity index 60%
rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java
rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 23dceb1..b56d39b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -19,60 +19,52 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaRDD;
+public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWriteHelper<T, HoodieData<HoodieRecord<T>>,
+ HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
-import scala.Tuple2;
-
-/**
- * A spark implementation of {@link BaseWriteHelper}.
- *
- * @param <T>
- */
-public class SparkWriteHelper<T extends HoodieRecordPayload,R> extends BaseWriteHelper<T, JavaRDD<HoodieRecord<T>>,
- JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
- private SparkWriteHelper() {
+ private HoodieWriteHelper() {
}
private static class WriteHelperHolder {
- private static final SparkWriteHelper SPARK_WRITE_HELPER = new SparkWriteHelper();
+ private static final HoodieWriteHelper HOODIE_WRITE_HELPER = new HoodieWriteHelper<>();
}
- public static SparkWriteHelper newInstance() {
- return WriteHelperHolder.SPARK_WRITE_HELPER;
+ public static HoodieWriteHelper newInstance() {
+ return WriteHelperHolder.HOODIE_WRITE_HELPER;
}
@Override
- protected JavaRDD<HoodieRecord<T>> tag(JavaRDD<HoodieRecord<T>> dedupedRecords, HoodieEngineContext context,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table) {
- return HoodieJavaRDD.getJavaRDD(
- table.getIndex().tagLocation(HoodieJavaRDD.of(dedupedRecords), context, table));
+ protected HoodieData<HoodieRecord<T>> tag(HoodieData<HoodieRecord<T>> dedupedRecords, HoodieEngineContext context,
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table) {
+ return table.getIndex().tagLocation(dedupedRecords, context, table);
}
@Override
- public JavaRDD<HoodieRecord<T>> deduplicateRecords(
- JavaRDD<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
+ public HoodieData<HoodieRecord<T>> deduplicateRecords(
+ HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
boolean isIndexingGlobal = index.isGlobal();
return records.mapToPair(record -> {
HoodieKey hoodieKey = record.getKey();
// If index used is global, then records are expected to differ in their partitionPath
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
- return new Tuple2<>(key, record);
+ return Pair.of(key, record);
}).reduceByKey((rec1, rec2) -> {
@SuppressWarnings("unchecked")
T reducedData = (T) rec2.getData().preCombine(rec1.getData());
HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
- return new HoodieAvroRecord<T>(reducedKey, reducedData);
- }, parallelism).map(Tuple2::_2);
+ return new HoodieAvroRecord<>(reducedKey, reducedData);
+ }, parallelism).map(Pair::getRight);
}
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index fb61330..4523705 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -18,8 +18,6 @@
package org.apache.hudi.client;
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieList;
@@ -55,6 +53,7 @@ import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
@@ -64,6 +63,9 @@ import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,8 +113,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
}
@Override
- protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createTable(HoodieWriteConfig config, Configuration hadoopConf,
- boolean refreshTimeline) {
+ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf,
+ boolean refreshTimeline) {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}
@@ -226,12 +228,12 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
}
@Override
- public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+ public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
}
@Override
- public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
throw new HoodieNotSupportedException("BulkInsertPrepped operation is not supported yet");
}
@@ -304,7 +306,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
@Override
protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
String instantTime,
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
+ HoodieTable hoodieTable) {
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
}
@@ -324,7 +326,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
* @param extraMetadata Additional Metadata passed by user
*/
@Override
- protected void postCommit(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+ protected void postCommit(HoodieTable table,
HoodieCommitMetadata metadata,
String instantTime,
Option<Map<String, String>> extraMetadata) {
@@ -351,7 +353,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
@Override
public void completeCompaction(
HoodieCommitMetadata metadata,
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+ HoodieTable table,
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
@@ -363,7 +365,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(
- w -> w.update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
+ w -> ((HoodieTableMetadataWriter) w).update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
} finally {
@@ -396,7 +398,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
}
@Override
- protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
+ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
// Create a Hoodie table which encapsulated the commits and files visible
return getHoodieTable();
}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 7e41ab1..14937d6 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -68,6 +68,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
@@ -231,7 +232,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> records,
- Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ Option<BulkInsertPartitioner> bulkInsertPartitioner) {
throw new HoodieNotSupportedException("BulkInsert is not supported yet");
}
@@ -264,7 +265,7 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
public HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> preppedRecords,
- Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ Option<BulkInsertPartitioner> bulkInsertPartitioner) {
throw new HoodieNotSupportedException("BulkInsertPrepped is not supported yet");
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 9de9298..faf46e0 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -18,8 +18,6 @@
package org.apache.hudi.client;
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.data.HoodieList;
@@ -43,6 +41,9 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.upgrade.JavaUpgradeDowngradeHelper;
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -88,9 +89,9 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
}
@Override
- protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> createTable(HoodieWriteConfig config,
- Configuration hadoopConf,
- boolean refreshTimeline) {
+ protected HoodieTable createTable(HoodieWriteConfig config,
+ Configuration hadoopConf,
+ boolean refreshTimeline) {
return HoodieJavaTable.create(config, context);
}
@@ -152,7 +153,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
@Override
public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records,
String instantTime,
- Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+ Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
throw new HoodieNotSupportedException("BulkInsert is not supported in HoodieJavaClient");
}
@@ -166,7 +167,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
@Override
public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords,
String instantTime,
- Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ Option<BulkInsertPartitioner> bulkInsertPartitioner) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime));
table.validateInsertSchema();
@@ -188,7 +189,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
@Override
protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result,
String instantTime,
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
+ HoodieTable hoodieTable) {
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
}
@@ -215,7 +216,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
@Override
protected void completeCompaction(HoodieCommitMetadata metadata,
- HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+ HoodieTable table,
String compactionCommitTime) {
throw new HoodieNotSupportedException("CompleteCompaction is not supported in HoodieJavaClient");
}
@@ -232,7 +233,7 @@ public class HoodieJavaWriteClient<T extends HoodieRecordPayload> extends
}
@Override
- protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
+ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
// new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
// Create a Hoodie table which encapsulated the commits and files visible
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
index 447ed3e..06c2304 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java
@@ -114,7 +114,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
public HoodieWriteMetadata<List<WriteStatus>> bulkInsert(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> records,
- Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ Option<BulkInsertPartitioner> bulkInsertPartitioner) {
return new JavaBulkInsertCommitActionExecutor((HoodieJavaEngineContext) context, config,
this, instantTime, records, bulkInsertPartitioner).execute();
}
@@ -152,7 +152,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload>
public HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> preppedRecords,
- Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ Option<BulkInsertPartitioner> bulkInsertPartitioner) {
return new JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, config,
this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
index 136c25b..32d30f7 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java
@@ -61,7 +61,7 @@ public class HoodieJavaMergeOnReadTable<T extends HoodieRecordPayload> extends H
public HoodieWriteMetadata<List<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context,
String instantTime,
List<HoodieRecord<T>> preppedRecords,
- Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ Option<BulkInsertPartitioner> bulkInsertPartitioner) {
return new JavaBulkInsertPreppedCommitActionExecutor((HoodieJavaEngineContext) context, config,
this, instantTime, preppedRecords, bulkInsertPartitioner).execute();
}
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java
index 9780262..d5c7a0b 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertCommitActionExecutor.java
@@ -36,17 +36,17 @@ import java.util.Map;
public class JavaBulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseJavaCommitActionExecutor<T> {
private final List<HoodieRecord<T>> inputRecords;
- private final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner;
+ private final Option<BulkInsertPartitioner> bulkInsertPartitioner;
public JavaBulkInsertCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> inputRecords,
- Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ Option<BulkInsertPartitioner> bulkInsertPartitioner) {
this(context, config, table, instantTime, inputRecords, bulkInsertPartitioner, Option.empty());
}
public JavaBulkInsertCommitActionExecutor(HoodieJavaEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> inputRecords,
- Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> bulkInsertPartitioner,
+ Option<BulkInsertPartitioner> bulkInsertPartitioner,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
this.inputRecords = inputRecords;
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
index de7afdf..30f1d93 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertHelper.java
@@ -65,7 +65,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
final HoodieWriteConfig config,
final BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, R> executor,
final boolean performDedupe,
- final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+ final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
// It's possible the transition to inflight could have already happened.
@@ -89,7 +89,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieWriteConfig config,
boolean performDedupe,
- Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner,
+ Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
boolean useWriterSchema,
int parallelism,
WriteHandleFactory writeHandleFactory) {
@@ -106,6 +106,7 @@ public class JavaBulkInsertHelper<T extends HoodieRecordPayload, R> extends Base
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
? userDefinedBulkInsertPartitioner.get()
: JavaBulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
+ // only List is supported for Java partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
repartitionedRecords = (List<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
FileIdPrefixProvider fileIdPrefixProvider = (FileIdPrefixProvider) ReflectionUtils.loadClass(
diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
index ed72fbe..14c4c8a 100644
--- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
+++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBulkInsertPreppedCommitActionExecutor.java
@@ -36,12 +36,12 @@ public class JavaBulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPay
extends BaseJavaCommitActionExecutor<T> {
private final List<HoodieRecord<T>> preppedInputRecord;
- private final Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner;
+ private final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner;
public JavaBulkInsertPreppedCommitActionExecutor(HoodieJavaEngineContext context,
HoodieWriteConfig config, HoodieTable table,
String instantTime, List<HoodieRecord<T>> preppedInputRecord,
- Option<BulkInsertPartitioner<List<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+ Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecord = preppedInputRecord;
this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index e9bdc42..37a78a4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -68,7 +68,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload<T>> implements Seria
* base path pointing to the table. Until, then just always assume a BloomIndex
*/
private final transient HoodieIndex<?, ?> index;
- private HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable;
+ private HoodieTable hoodieTable;
private transient Option<SQLContext> sqlContextOpt;
private final transient HoodieSparkEngineContext context;
private final transient Configuration hadoopConf;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index ec1ecd6..ac9259c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -18,11 +18,10 @@
package org.apache.hudi.client;
-import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.utils.TransactionUtils;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.metrics.Registry;
@@ -45,6 +44,7 @@ import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.table.BulkInsertPartitioner;
@@ -54,6 +54,9 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
@@ -120,9 +123,9 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
}
@Override
- protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> createTable(HoodieWriteConfig config,
- Configuration hadoopConf,
- boolean refreshTimeline) {
+ protected HoodieTable createTable(HoodieWriteConfig config,
+ Configuration hadoopConf,
+ boolean refreshTimeline) {
return HoodieSparkTable.create(config, context, refreshTimeline);
}
@@ -147,45 +150,49 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
- HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsert(context, instantTime, records);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsert(context, instantTime, HoodieJavaRDD.of(records));
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
- return postWrite(result, instantTime, table);
+ return postWrite(resultRDD, instantTime, table);
}
@Override
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime) {
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime));
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
- HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsertPrepped(context,instantTime, preppedRecords);
- return postWrite(result, instantTime, table);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsertPrepped(context,instantTime, HoodieJavaRDD.of(preppedRecords));
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ return postWrite(resultRDD, instantTime, table);
}
@Override
public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, String instantTime) {
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
- HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insert(context,instantTime, records);
- return postWrite(result, instantTime, table);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.insert(context,instantTime, HoodieJavaRDD.of(records));
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ return postWrite(resultRDD, instantTime, table);
}
@Override
public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime) {
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
initTable(WriteOperationType.INSERT_PREPPED, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_PREPPED, table.getMetaClient());
- HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.insertPrepped(context,instantTime, preppedRecords);
- return postWrite(result, instantTime, table);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.insertPrepped(context,instantTime, HoodieJavaRDD.of(preppedRecords));
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ return postWrite(resultRDD, instantTime, table);
}
/**
@@ -196,11 +203,12 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public HoodieWriteResult insertOverwrite(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
- HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime));
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
- HoodieWriteMetadata result = table.insertOverwrite(context, instantTime, records);
- return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.insertOverwrite(context, instantTime, HoodieJavaRDD.of(records));
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), result.getPartitionToReplaceFileIds());
}
/**
@@ -211,11 +219,12 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
* @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
*/
public HoodieWriteResult insertOverwriteTable(JavaRDD<HoodieRecord<T>> records, final String instantTime) {
- HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime));
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
- HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records);
- return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.insertOverwriteTable(context, instantTime, HoodieJavaRDD.of(records));
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), result.getPartitionToReplaceFileIds());
}
@Override
@@ -224,44 +233,48 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
}
@Override
- public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+ public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
initTable(WriteOperationType.BULK_INSERT, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.BULK_INSERT, table.getMetaClient());
- HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsert(context,instantTime, records, userDefinedBulkInsertPartitioner);
- return postWrite(result, instantTime, table);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.bulkInsert(context,instantTime, HoodieJavaRDD.of(records), userDefinedBulkInsertPartitioner);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ return postWrite(resultRDD, instantTime, table);
}
@Override
- public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table =
+ public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table =
initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime));
table.validateInsertSchema();
preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient());
- HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.bulkInsertPrepped(context,instantTime, preppedRecords, bulkInsertPartitioner);
- return postWrite(result, instantTime, table);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.bulkInsertPrepped(context,instantTime, HoodieJavaRDD.of(preppedRecords), bulkInsertPartitioner);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ return postWrite(resultRDD, instantTime, table);
}
@Override
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, String instantTime) {
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
- HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.delete(context,instantTime, keys);
- return postWrite(result, instantTime, table);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.delete(context,instantTime, HoodieJavaRDD.of(keys));
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ return postWrite(resultRDD, instantTime, table);
}
public HoodieWriteResult deletePartitions(List<String> partitions, String instantTime) {
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient());
- HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.deletePartitions(context, instantTime, partitions);
- return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds());
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.deletePartitions(context, instantTime, partitions);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
+ return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), result.getPartitionToReplaceFileIds());
}
@Override
protected JavaRDD<WriteStatus> postWrite(HoodieWriteMetadata<JavaRDD<WriteStatus>> result,
String instantTime,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> hoodieTable) {
+ HoodieTable hoodieTable) {
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
}
@@ -288,7 +301,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
@Override
protected void completeCompaction(HoodieCommitMetadata metadata,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
+ HoodieTable table,
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
@@ -329,8 +342,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
table.getMetaClient().reloadActiveTimeline();
}
compactionTimer = metrics.getCompactionCtx();
- HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata =
- table.compact(context, compactionInstantTime);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.compact(context, compactionInstantTime);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime);
}
@@ -349,7 +362,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
}
clusteringTimer = metrics.getClusteringCtx();
LOG.info("Starting clustering at " + clusteringInstant);
- HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = table.cluster(context, clusteringInstant);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = table.cluster(context, clusteringInstant);
+ HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses()));
// TODO : Where is shouldComplete used ?
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
@@ -358,9 +372,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
}
private void completeClustering(HoodieReplaceCommitMetadata metadata,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
- String clusteringCommitTime) {
-
+ HoodieTable table,
+ String clusteringCommitTime) {
List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());
@@ -405,17 +418,17 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}
- private void updateTableMetadata(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata commitMetadata,
+ private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter(hoodieInstant.getTimestamp())
- .ifPresent(writer -> writer.update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
+ .ifPresent(writer -> ((HoodieTableMetadataWriter) writer).update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction));
}
@Override
- protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
+ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
// if it didn't exist before
// See https://issues.apache.org/jira/browse/HUDI-3343 for more details
@@ -440,7 +453,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
// TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy
private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
+ HoodieTable table,
String commitInstant) {
switch (tableServiceType) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 54c1c9f..5a03cdf 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -18,10 +18,6 @@
package org.apache.hudi.client.clustering.run.strategy;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -62,6 +58,11 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -94,7 +95,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
// execute clustering for each group async and collect WriteStatus
- Stream<JavaRDD<WriteStatus>> writeStatusRDDStream = FutureUtils.allOf(
+ Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
@@ -103,7 +104,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
.collect(Collectors.toList()))
.join()
.stream();
- JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusRDDStream);
+ JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
@@ -125,7 +126,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
* @param preserveHoodieMetadata Whether to preserve commit metadata while clustering.
* @return RDD of {@link WriteStatus}.
*/
- public abstract JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups, final String instantTime,
+ public abstract HoodieData<WriteStatus> performClusteringWithRecordsRDD(final HoodieData<HoodieRecord<T>> inputRecords, final int numOutputGroups, final String instantTime,
final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata);
@@ -164,11 +165,11 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
/**
* Submit job to execute clustering for the group.
*/
- private CompletableFuture<JavaRDD<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
+ private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsync(HoodieClusteringGroup clusteringGroup, Map<String, String> strategyParams,
boolean preserveHoodieMetadata, String instantTime) {
return CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
- JavaRDD<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime);
+ HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, clusteringGroup, instantTime);
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
.map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))
@@ -180,7 +181,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
/**
* Get RDD of all records for the group. This includes all records from file slice (Apply updates from log files, if any).
*/
- private JavaRDD<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
+ private HoodieData<HoodieRecord<T>> readRecordsForGroup(JavaSparkContext jsc, HoodieClusteringGroup clusteringGroup, String instantTime) {
List<ClusteringOperation> clusteringOps = clusteringGroup.getSlices().stream().map(ClusteringOperation::create).collect(Collectors.toList());
boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
if (hasLogFiles) {
@@ -195,12 +196,12 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
/**
* Read records from baseFiles, apply updates and convert to RDD.
*/
- private JavaRDD<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext jsc,
+ private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps,
String instantTime) {
HoodieWriteConfig config = getWriteConfig();
HoodieTable table = getHoodieTable();
- return jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
+ return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, clusteringOps.size()).mapPartitions(clusteringOpsPartition -> {
List<Iterator<HoodieRecord<T>>> recordIterators = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(new SparkTaskContextSupplier(), config);
@@ -237,20 +238,20 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
});
return new ConcatenatingIterator<>(recordIterators);
- });
+ }));
}
/**
* Read records from baseFiles and convert to RDD.
*/
- private JavaRDD<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc,
+ private HoodieData<HoodieRecord<T>> readRecordsForGroupBaseFiles(JavaSparkContext jsc,
List<ClusteringOperation> clusteringOps) {
SerializableConfiguration hadoopConf = new SerializableConfiguration(getHoodieTable().getHadoopConf());
HoodieWriteConfig writeConfig = getWriteConfig();
// NOTE: It's crucial to make sure that we don't capture whole "this" object into the
// closure, as this might lead to issues attempting to serialize its nested fields
- return jsc.parallelize(clusteringOps, clusteringOps.size())
+ return HoodieJavaRDD.of(jsc.parallelize(clusteringOps, clusteringOps.size())
.mapPartitions(clusteringOpsPartition -> {
List<Iterator<IndexedRecord>> iteratorsForPartition = new ArrayList<>();
clusteringOpsPartition.forEachRemaining(clusteringOp -> {
@@ -266,7 +267,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
return new ConcatenatingIterator<>(iteratorsForPartition);
})
- .map(record -> transform(record, writeConfig));
+ .map(record -> transform(record, writeConfig)));
}
/**
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
index 2a50393..4dedaba 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
@@ -20,6 +20,7 @@
package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
@@ -34,7 +35,6 @@ import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
import java.util.List;
import java.util.Map;
@@ -56,7 +56,7 @@ public class SparkSingleFileSortExecutionStrategy<T extends HoodieRecordPayload<
}
@Override
- public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(JavaRDD<HoodieRecord<T>> inputRecords,
+ public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> inputRecords,
int numOutputGroups,
String instantTime,
Map<String, String> strategyParams,
@@ -74,7 +74,7 @@ public class SparkSingleFileSortExecutionStrategy<T extends HoodieRecordPayload<
// Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value.
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(Long.MAX_VALUE));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
- return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
+ return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), preserveHoodieMetadata));
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index 22d5300..d664c83 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
@@ -32,7 +33,6 @@ import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import org.apache.avro.Schema;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
import java.util.List;
import java.util.Map;
@@ -54,7 +54,7 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
}
@Override
- public JavaRDD<WriteStatus> performClusteringWithRecordsRDD(final JavaRDD<HoodieRecord<T>> inputRecords, final int numOutputGroups,
+ public HoodieData<WriteStatus> performClusteringWithRecordsRDD(final HoodieData<HoodieRecord<T>> inputRecords, final int numOutputGroups,
final String instantTime, final Map<String, String> strategyParams, final Schema schema,
final List<HoodieFileGroupId> fileGroupIdList, final boolean preserveHoodieMetadata) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
@@ -64,7 +64,7 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString());
props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build();
- return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance()
+ return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance()
.bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata));
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/BaseSparkUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/BaseSparkUpdateStrategy.java
index 655c119..3eadba2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/BaseSparkUpdateStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/BaseSparkUpdateStrategy.java
@@ -19,13 +19,12 @@
package org.apache.hudi.client.clustering.update.strategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
-import org.apache.spark.api.java.JavaRDD;
-
import java.util.List;
import java.util.Set;
@@ -33,7 +32,7 @@ import java.util.Set;
* Spark base update strategy, write records to the file groups which are in clustering
* need to check. Spark relate implementations should extend this base class.
*/
-public abstract class BaseSparkUpdateStrategy<T extends HoodieRecordPayload<T>> extends UpdateStrategy<T, JavaRDD<HoodieRecord<T>>> {
+public abstract class BaseSparkUpdateStrategy<T extends HoodieRecordPayload<T>> extends UpdateStrategy<T, HoodieData<HoodieRecord<T>>> {
public BaseSparkUpdateStrategy(HoodieSparkEngineContext engineContext,
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
@@ -45,9 +44,9 @@ public abstract class BaseSparkUpdateStrategy<T extends HoodieRecordPayload<T>>
* @param inputRecords the records to write, tagged with target file id
* @return the records matched file group ids
*/
- protected List<HoodieFileGroupId> getGroupIdsWithUpdate(JavaRDD<HoodieRecord<T>> inputRecords) {
+ protected List<HoodieFileGroupId> getGroupIdsWithUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
return inputRecords
.filter(record -> record.getCurrentLocation() != null)
- .map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collect();
+ .map(record -> new HoodieFileGroupId(record.getPartitionPath(), record.getCurrentLocation().getFileId())).distinct().collectAsList();
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
index 92a5fb6..5904062 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkAllowUpdateStrategy.java
@@ -19,13 +19,12 @@
package org.apache.hudi.client.clustering.update.strategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.spark.api.java.JavaRDD;
-
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -42,7 +41,7 @@ public class SparkAllowUpdateStrategy<T extends HoodieRecordPayload<T>> extends
}
@Override
- public Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
+ public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);
Set<HoodieFileGroupId> fileGroupIdsWithUpdatesAndPendingClustering = fileGroupIdsWithRecordUpdate.stream()
.filter(f -> fileGroupsInPendingClustering.contains(f))
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
index ac058a4..d09422e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client.clustering.update.strategy;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -27,7 +28,6 @@ import org.apache.hudi.exception.HoodieClusteringUpdateException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
import java.util.Collections;
import java.util.HashSet;
@@ -47,7 +47,7 @@ public class SparkRejectUpdateStrategy<T extends HoodieRecordPayload<T>> extends
}
@Override
- public Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(JavaRDD<HoodieRecord<T>> taggedRecordsRDD) {
+ public Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> handleUpdate(HoodieData<HoodieRecord<T>> taggedRecordsRDD) {
List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = getGroupIdsWithUpdate(taggedRecordsRDD);
fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate)) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index 9e72390..fd083f2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -21,7 +21,9 @@ package org.apache.hudi.client.utils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -34,13 +36,11 @@ import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -61,7 +61,7 @@ public class SparkValidatorUtils {
* Throw error if there are validation failures.
*/
public static void runValidators(HoodieWriteConfig config,
- HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata,
+ HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata,
HoodieEngineContext context,
HoodieTable table,
String instantTime) {
@@ -69,10 +69,10 @@ public class SparkValidatorUtils {
LOG.info("no validators configured.");
} else {
if (!writeMetadata.getWriteStats().isPresent()) {
- writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collect());
+ writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
}
- Set<String> partitionsModified = new HashSet<>(writeMetadata.getWriteStats().get().stream().map(writeStats ->
- writeStats.getPartitionPath()).collect(Collectors.toList()));
+ Set<String> partitionsModified = writeMetadata.getWriteStats().get().stream().map(writeStats ->
+ writeStats.getPartitionPath()).collect(Collectors.toSet());
SQLContext sqlContext = new SQLContext(HoodieSparkEngineContext.getSparkContext(context));
// Refresh timeline to ensure validator sees the any other operations done on timeline (async operations such as other clustering/compaction/rollback)
table.getMetaClient().reloadActiveTimeline();
@@ -101,8 +101,8 @@ public class SparkValidatorUtils {
/**
* Run validators in a separate thread pool for parallelism. Each of validator can submit a distributed spark job if needed.
*/
- private static CompletableFuture<Boolean> runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata writeMetadata,
- Dataset<Row> beforeState, Dataset<Row> afterState, String instantTime) {
+ private static CompletableFuture<Boolean> runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata,
+ Dataset<Row> beforeState, Dataset<Row> afterState, String instantTime) {
return CompletableFuture.supplyAsync(() -> {
try {
validator.validate(instantTime, writeMetadata, beforeState, afterState);
@@ -120,10 +120,10 @@ public class SparkValidatorUtils {
* Note that this only works for COW tables.
*/
public static Dataset<Row> getRecordsFromCommittedFiles(SQLContext sqlContext,
- Set<String> partitionsAffected, HoodieTable table) {
+ Set<String> partitionsAffected, HoodieTable table) {
List<String> committedFiles = partitionsAffected.stream()
- .flatMap(partition -> table.getBaseFileOnlyView().getLatestBaseFiles(partition).map(bf -> bf.getPath()))
+ .flatMap(partition -> table.getBaseFileOnlyView().getLatestBaseFiles(partition).map(BaseFile::getPath))
.collect(Collectors.toList());
if (committedFiles.isEmpty()) {
@@ -145,7 +145,7 @@ public class SparkValidatorUtils {
*/
public static Dataset<Row> getRecordsFromPendingCommits(SQLContext sqlContext,
Set<String> partitionsAffected,
- HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata,
+ HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata,
HoodieTable table,
String instantTime) {
@@ -157,7 +157,7 @@ public class SparkValidatorUtils {
instantTime);
List<String> newFiles = partitionsAffected.stream()
- .flatMap(partition -> fsView.getLatestBaseFiles(partition).map(bf -> bf.getPath()))
+ .flatMap(partition -> fsView.getLatestBaseFiles(partition).map(BaseFile::getPath))
.collect(Collectors.toList());
if (newFiles.isEmpty()) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
index f12d337..f08d11b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client.validator;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -28,9 +29,9 @@ import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -41,7 +42,7 @@ import java.util.stream.Collectors;
/**
* Validator can be configured pre-commit.
*/
-public abstract class SparkPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> {
+public abstract class SparkPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SparkPreCommitValidator.class);
private HoodieSparkTable<T> table;
@@ -59,7 +60,7 @@ public abstract class SparkPreCommitValidator<T extends HoodieRecordPayload, I,
if (writeResult.getWriteStats().isPresent()) {
partitionsModified = writeResult.getWriteStats().get().stream().map(HoodieWriteStat::getPartitionPath).collect(Collectors.toSet());
} else {
- partitionsModified = new HashSet<>(writeResult.getWriteStatuses().map(WriteStatus::getPartitionPath).collect());
+ partitionsModified = new HashSet<>(writeResult.getWriteStatuses().map(WriteStatus::getPartitionPath).collectAsList());
}
return partitionsModified;
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java
index b27f84e..2506d52 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client.validator;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
@@ -28,7 +29,6 @@ import org.apache.hudi.table.HoodieSparkTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
@@ -40,7 +40,7 @@ import org.apache.spark.sql.SQLContext;
*
* Expects both queries to return same result.
*/
-public class SqlQueryEqualityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
+public class SqlQueryEqualityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(SqlQueryEqualityPreCommitValidator.class);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java
index 026334f..8a25150 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client.validator;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
@@ -28,7 +29,6 @@ import org.apache.hudi.table.HoodieSparkTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
@@ -40,7 +40,7 @@ import org.apache.spark.sql.SQLContext;
* <p>
* Expects query results do not match.
*/
-public class SqlQueryInequalityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
+public class SqlQueryInequalityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(SqlQueryInequalityPreCommitValidator.class);
public SqlQueryInequalityPreCommitValidator(HoodieSparkTable<T> table, HoodieEngineContext engineContext, HoodieWriteConfig config) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java
index 122cf2b..3a88d54 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java
@@ -20,15 +20,16 @@ package org.apache.hudi.client.validator;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.table.HoodieSparkTable;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -41,7 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* Validator framework to run sql queries and compare table state at different locations.
*/
-public abstract class SqlQueryPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
+public abstract class SqlQueryPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(SqlQueryPreCommitValidator.class);
private static final AtomicInteger TABLE_COUNTER = new AtomicInteger(0);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java
index 66e956d..b194224 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client.validator;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
@@ -28,7 +29,6 @@ import org.apache.hudi.table.HoodieSparkTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
@@ -40,7 +40,7 @@ import java.util.List;
* <p>
* Example configuration: "query1#expectedResult1;query2#expectedResult2;"
*/
-public class SqlQuerySingleResultPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
+public class SqlQuerySingleResultPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(SqlQueryInequalityPreCommitValidator.class);
public SqlQuerySingleResultPreCommitValidator(HoodieSparkTable<T> table, HoodieEngineContext engineContext, HoodieWriteConfig config) {
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
index ffa1a35..ddcaaec 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
@@ -21,6 +21,7 @@ package org.apache.hudi.data;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
@@ -104,6 +105,11 @@ public class HoodieJavaPairRDD<K, V> extends HoodiePairData<K, V> {
}
@Override
+ public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> func, int parallelism) {
+ return HoodieJavaPairRDD.of(pairRDDData.reduceByKey(func::apply, parallelism));
+ }
+
+ @Override
public <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func) {
return HoodieJavaRDD.of(pairRDDData.map(
tuple -> func.apply(new ImmutablePair<>(tuple._1, tuple._2))));
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 1381ea8..66edf60 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -131,6 +131,23 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
}
@Override
+ public HoodieData<T> distinct(int parallelism) {
+ return HoodieJavaRDD.of(rddData.distinct(parallelism));
+ }
+
+ @Override
+ public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> keyGetter, int parallelism) {
+ return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
+ .reduceByKey((value1, value2) -> value1, parallelism)
+ .values();
+ }
+
+ @Override
+ public HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc) {
+ return HoodieJavaRDD.of(rddData.filter(filterFunc::apply));
+ }
+
+ @Override
public HoodieData<T> union(HoodieData<T> other) {
return HoodieJavaRDD.of(rddData.union((JavaRDD<T>) other.get()));
}
@@ -139,4 +156,9 @@ public class HoodieJavaRDD<T> extends HoodieData<T> {
public List<T> collectAsList() {
return rddData.collect();
}
+
+ @Override
+ public HoodieData<T> repartition(int parallelism) {
+ return HoodieJavaRDD.of(rddData.repartition(parallelism));
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
index 31bd436..8f52112 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java
@@ -32,6 +32,7 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
@@ -61,6 +62,7 @@ import org.apache.hudi.table.action.clean.CleanActionExecutor;
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
import org.apache.hudi.table.action.cluster.SparkExecuteClusteringCommitActionExecutor;
+import org.apache.hudi.table.action.commit.HoodieMergeHelper;
import org.apache.hudi.table.action.commit.SparkBulkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkBulkInsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkDeleteCommitActionExecutor;
@@ -69,7 +71,6 @@ import org.apache.hudi.table.action.commit.SparkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertOverwriteTableCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkInsertPreppedCommitActionExecutor;
-import org.apache.hudi.table.action.commit.SparkMergeHelper;
import org.apache.hudi.table.action.commit.SparkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.SparkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.restore.CopyOnWriteRestoreActionExecutor;
@@ -82,7 +83,6 @@ import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
import javax.annotation.Nonnull;
@@ -117,58 +117,58 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> insert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> insert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkInsertCommitActionExecutor<>((HoodieSparkEngineContext)context, config, this, instantTime, records).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records,
- Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
- return new SparkBulkInsertCommitActionExecutor((HoodieSparkEngineContext) context, config,
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records,
+ Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
+ return new SparkBulkInsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config,
this, instantTime, records, userDefinedBulkInsertPartitioner).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> delete(HoodieEngineContext context, String instantTime, JavaRDD<HoodieKey> keys) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> delete(HoodieEngineContext context, String instantTime, HoodieData<HoodieKey> keys) {
return new SparkDeleteCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, keys).execute();
}
@Override
- public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
- return new SparkDeletePartitionCommitActionExecutor(context, config, this, instantTime, partitions).execute();
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
+ return new SparkDeletePartitionCommitActionExecutor<>(context, config, this, instantTime, partitions).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
- JavaRDD<HoodieRecord<T>> preppedRecords) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
+ HoodieData<HoodieRecord<T>> preppedRecords) {
return new SparkUpsertPreppedCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> insertPrepped(HoodieEngineContext context, String instantTime,
- JavaRDD<HoodieRecord<T>> preppedRecords) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> insertPrepped(HoodieEngineContext context, String instantTime,
+ HoodieData<HoodieRecord<T>> preppedRecords) {
return new SparkInsertPreppedCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
- JavaRDD<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
+ HoodieData<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
return new SparkBulkInsertPreppedCommitActionExecutor((HoodieSparkEngineContext) context, config,
this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute();
}
@Override
- public HoodieWriteMetadata insertOverwrite(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+ public HoodieWriteMetadata insertOverwrite(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkInsertOverwriteCommitActionExecutor(context, config, this, instantTime, records).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> insertOverwriteTable(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> insertOverwriteTable(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkInsertOverwriteTableCommitActionExecutor(context, config, this, instantTime, records).execute();
}
@@ -235,7 +235,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> compact(
HoodieEngineContext context, String compactionInstantTime) {
throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");
}
@@ -248,20 +248,20 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(HoodieEngineContext context,
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> cluster(HoodieEngineContext context,
String clusteringInstantTime) {
return new SparkExecuteClusteringCommitActionExecutor<>(context, config, this, clusteringInstantTime).execute();
}
@Override
- public HoodieBootstrapWriteMetadata<JavaRDD<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) {
+ public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) {
return new SparkBootstrapCommitActionExecutor((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();
}
@Override
public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
new RestorePlanActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
- new CopyOnWriteRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
+ new CopyOnWriteRestoreActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
}
@Override
@@ -292,7 +292,7 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
- SparkMergeHelper.newInstance().runMerge(this, upsertHandle);
+ HoodieMergeHelper.newInstance().runMerge(this, upsertHandle);
}
// TODO(vc): This needs to be revisited
@@ -336,28 +336,28 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
@Override
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) {
- return new CleanActionExecutor(context, config, this, cleanInstantTime, skipLocking).execute();
+ return new CleanActionExecutor<>(context, config, this, cleanInstantTime, skipLocking).execute();
}
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
boolean deleteInstants, boolean skipLocking) {
- return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant,
+ return new CopyOnWriteRollbackActionExecutor<>(context, config, this, rollbackInstantTime, commitInstant,
deleteInstants, skipLocking).execute();
}
@Override
public HoodieSavepointMetadata savepoint(HoodieEngineContext context, String instantToSavepoint, String user, String comment) {
- return new SavepointActionExecutor(context, config, this, instantToSavepoint, user, comment).execute();
+ return new SavepointActionExecutor<>(context, config, this, instantToSavepoint, user, comment).execute();
}
@Override
public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
- return new CopyOnWriteRestoreActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+ return new CopyOnWriteRestoreActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
}
@Override
public Option<HoodieRestorePlan> scheduleRestore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
- return new RestorePlanActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+ return new RestorePlanActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
index 334efa7..efc667a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java
@@ -24,6 +24,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -31,14 +32,13 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor;
import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
+import org.apache.hudi.table.action.bootstrap.SparkBootstrapDeltaCommitActionExecutor;
import org.apache.hudi.table.action.compact.HoodieSparkMergeOnReadTableCompactor;
import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
@@ -54,8 +54,6 @@ import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.RestorePlanActionExecutor;
-import org.apache.spark.api.java.JavaRDD;
-
import java.util.List;
import java.util.Map;
@@ -87,72 +85,72 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> insert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> insert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkInsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(HoodieEngineContext context, String instantTime, JavaRDD<HoodieRecord<T>> records,
- Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
- return new SparkBulkInsertDeltaCommitActionExecutor((HoodieSparkEngineContext) context, config,
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records,
+ Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
+ return new SparkBulkInsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config,
this, instantTime, records, userDefinedBulkInsertPartitioner).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> delete(HoodieEngineContext context, String instantTime, JavaRDD<HoodieKey> keys) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> delete(HoodieEngineContext context, String instantTime, HoodieData<HoodieKey> keys) {
return new SparkDeleteDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, keys).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
- JavaRDD<HoodieRecord<T>> preppedRecords) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> upsertPrepped(HoodieEngineContext context, String instantTime,
+ HoodieData<HoodieRecord<T>> preppedRecords) {
return new SparkUpsertPreppedDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> insertPrepped(HoodieEngineContext context, String instantTime,
- JavaRDD<HoodieRecord<T>> preppedRecords) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> insertPrepped(HoodieEngineContext context, String instantTime,
+ HoodieData<HoodieRecord<T>> preppedRecords) {
return new SparkInsertPreppedDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, preppedRecords).execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
- JavaRDD<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsertPrepped(HoodieEngineContext context, String instantTime,
+ HoodieData<HoodieRecord<T>> preppedRecords, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
return new SparkBulkInsertPreppedDeltaCommitActionExecutor((HoodieSparkEngineContext) context, config,
this, instantTime, preppedRecords, userDefinedBulkInsertPartitioner).execute();
}
@Override
public Option<HoodieCompactionPlan> scheduleCompaction(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
- ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
+ ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor<>(
context, config, this, instantTime, extraMetadata,
- new HoodieSparkMergeOnReadTableCompactor());
+ new HoodieSparkMergeOnReadTableCompactor<>());
return scheduleCompactionExecutor.execute();
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> compact(
HoodieEngineContext context, String compactionInstantTime) {
- RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(
- context, config, this, compactionInstantTime, new HoodieSparkMergeOnReadTableCompactor(),
- new HoodieSparkCopyOnWriteTable(config, context, getMetaClient()));
- return convertMetadata(compactionExecutor.execute());
+ RunCompactionActionExecutor<T> compactionExecutor = new RunCompactionActionExecutor<>(
+ context, config, this, compactionInstantTime, new HoodieSparkMergeOnReadTableCompactor<>(),
+ new HoodieSparkCopyOnWriteTable<>(config, context, getMetaClient()));
+ return compactionExecutor.execute();
}
@Override
- public HoodieBootstrapWriteMetadata<JavaRDD<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) {
- return new SparkBootstrapDeltaCommitActionExecutor((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();
+ public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> bootstrap(HoodieEngineContext context, Option<Map<String, String>> extraMetadata) {
+ return new SparkBootstrapDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, extraMetadata).execute();
}
@Override
public void rollbackBootstrap(HoodieEngineContext context, String instantTime) {
new RestorePlanActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
- new MergeOnReadRestoreActionExecutor(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
+ new MergeOnReadRestoreActionExecutor<>(context, config, this, instantTime, HoodieTimeline.INIT_INSTANT_TS).execute();
}
@Override
@@ -169,12 +167,12 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipLocking) {
- return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
+ return new MergeOnReadRollbackActionExecutor<>(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
}
@Override
public HoodieRestoreMetadata restore(HoodieEngineContext context, String restoreInstantTime, String instantToRestore) {
- return new MergeOnReadRestoreActionExecutor(context, config, this, restoreInstantTime, instantToRestore).execute();
+ return new MergeOnReadRestoreActionExecutor<>(context, config, this, restoreInstantTime, instantToRestore).execute();
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index bb8c95d..ce14d43 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -36,20 +36,16 @@ import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
-import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.fs.Path;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
-import org.apache.spark.api.java.JavaRDD;
import java.io.IOException;
-import static org.apache.hudi.data.HoodieJavaRDD.getJavaRDD;
-
public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
- extends HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+ extends HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
private volatile boolean isMetadataTableExists = false;
@@ -81,7 +77,7 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
HoodieSparkEngineContext context,
HoodieTableMetaClient metaClient,
boolean refreshTimeline) {
- HoodieSparkTable hoodieSparkTable;
+ HoodieSparkTable<T> hoodieSparkTable;
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
hoodieSparkTable = new HoodieSparkCopyOnWriteTable<>(config, context, metaClient);
@@ -98,11 +94,6 @@ public abstract class HoodieSparkTable<T extends HoodieRecordPayload>
return hoodieSparkTable;
}
- public static HoodieWriteMetadata<JavaRDD<WriteStatus>> convertMetadata(
- HoodieWriteMetadata<HoodieData<WriteStatus>> metadata) {
- return metadata.clone(getJavaRDD(metadata.getWriteStatuses()));
- }
-
@Override
protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return SparkHoodieIndexFactory.createIndex(config);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index a970e8f..504da8a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -28,10 +28,10 @@ import org.apache.hudi.client.bootstrap.HoodieSparkBootstrapSchemaProvider;
import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -68,7 +68,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -80,10 +79,11 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
import static org.apache.hudi.table.action.bootstrap.MetadataBootstrapHandlerFactory.getMetadataHandler;
public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>>
- extends BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieBootstrapWriteMetadata> {
+ extends BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>>> {
private static final Logger LOG = LogManager.getLogger(SparkBootstrapCommitActionExecutor.class);
protected String bootstrapSchema = null;
@@ -91,7 +91,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
public SparkBootstrapCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
+ HoodieTable table,
Option<Map<String, String>> extraMetadata) {
super(context, new HoodieWriteConfig.Builder().withProps(config.getProps())
.withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class)
@@ -109,7 +109,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
}
@Override
- public HoodieBootstrapWriteMetadata execute() {
+ public HoodieBootstrapWriteMetadata<HoodieData<WriteStatus>> execute() {
validate();
try {
HoodieTableMetaClient metaClient = table.getMetaClient();
@@ -121,9 +121,9 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = listAndProcessSourcePartitions();
// First run metadata bootstrap which will auto commit
- Option<HoodieWriteMetadata> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
+ Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult = metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));
// if there are full bootstrap to be performed, perform that too
- Option<HoodieWriteMetadata> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
+ Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult = fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));
// Delete the marker directory for the instant
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
@@ -142,7 +142,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
* Perform Metadata Bootstrap.
* @param partitionFilesList List of partitions and files within that partitions
*/
- protected Option<HoodieWriteMetadata> metadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
+ protected Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
if (null == partitionFilesList || partitionFilesList.isEmpty()) {
return Option.empty();
}
@@ -155,43 +155,42 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
metaClient.getCommitActionType(), HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS), Option.empty());
- JavaRDD<BootstrapWriteStatus> bootstrapWriteStatuses = runMetadataBootstrap(partitionFilesList);
+ HoodieData<BootstrapWriteStatus> bootstrapWriteStatuses = runMetadataBootstrap(partitionFilesList);
- HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<>();
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
updateIndexAndCommitIfNeeded(bootstrapWriteStatuses.map(w -> w), result);
return Option.of(result);
}
- private void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
+ private void updateIndexAndCommitIfNeeded(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
// cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
// RDD actions that are performed after updating the index.
- writeStatusRDD = writeStatusRDD.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
+ writeStatuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE));
Instant indexStartTime = Instant.now();
// Update the index back
- JavaRDD<WriteStatus> statuses = HoodieJavaRDD.getJavaRDD(
- table.getIndex().updateLocation(HoodieJavaRDD.of(writeStatusRDD), context, table));
+ HoodieData<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
commitOnAutoCommit(result);
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(JavaRDD<HoodieRecord<T>> inputRecords) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
// NO_OP
return null;
}
@Override
- protected void setCommitMetadata(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
+ protected void setCommitMetadata(HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
result.setCommitMetadata(Option.of(new HoodieCommitMetadata()));
}
@Override
- protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
+ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
// Perform bootstrap index write and then commit. Make sure both record-key and bootstrap-index
// is all done in a single job DAG.
Map<String, List<Pair<BootstrapFileMapping, HoodieWriteStat>>> bootstrapSourceAndStats =
- result.getWriteStatuses().collect().stream()
+ result.getWriteStatuses().collectAsList().stream()
.map(w -> {
BootstrapWriteStatus ws = (BootstrapWriteStatus) w;
return Pair.of(ws.getBootstrapSourceFileMapping(), ws.getStat());
@@ -214,7 +213,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
LOG.info("Committing metadata bootstrap !!");
}
- protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result, List<HoodieWriteStat> stats) {
+ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat> stats) {
String actionType = table.getMetaClient().getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType);
// Create a Hoodie table which encapsulated the commits and files visible
@@ -253,7 +252,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
* Perform Full Bootstrap.
* @param partitionFilesList List of partitions and files within that partitions
*/
- protected Option<HoodieWriteMetadata> fullBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
+ protected Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) {
if (null == partitionFilesList || partitionFilesList.isEmpty()) {
return Option.empty();
}
@@ -271,10 +270,10 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
table.getActiveTimeline().createNewInstant(requested);
// Setup correct schema and run bulk insert.
- return Option.of(getBulkInsertActionExecutor(inputRecordsRDD).execute());
+ return Option.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());
}
- protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(JavaRDD<HoodieRecord> inputRecordsRDD) {
+ protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
return new SparkBulkInsertCommitActionExecutor((HoodieSparkEngineContext) context, new HoodieWriteConfig.Builder().withProps(config.getProps())
.withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
inputRecordsRDD, Option.empty(), extraMetadata);
@@ -310,10 +309,9 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
- private JavaRDD<BootstrapWriteStatus> runMetadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitions) {
- JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+ private HoodieData<BootstrapWriteStatus> runMetadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitions) {
if (null == partitions || partitions.isEmpty()) {
- return jsc.emptyRDD();
+ return context.emptyHoodieData();
}
TypedProperties properties = new TypedProperties();
@@ -336,7 +334,8 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
})
.collect(Collectors.toList());
- return jsc.parallelize(bootstrapPaths, config.getBootstrapParallelism())
+ context.setJobStatus(this.getClass().getSimpleName(), "Bootstrap metadata table.");
+ return context.parallelize(bootstrapPaths, config.getBootstrapParallelism())
.map(partitionFsPair -> getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(),
partitionFsPair.getRight().getLeft(), keyGenerator));
}
@@ -352,7 +351,7 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
}
@Override
- protected void runPrecommitValidators(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
+ protected void runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
SparkValidatorUtils.runValidators(config, writeMetadata, context, table, instantTime);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
index 59f8666..d712ca4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapDeltaCommitActionExecutor.java
@@ -18,9 +18,8 @@
package org.apache.hudi.table.action.bootstrap;
-import java.util.Map;
-
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -29,7 +28,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkBulkInsertDeltaCommitActionExecutor;
-import org.apache.spark.api.java.JavaRDD;
+
+import java.util.Map;
public class SparkBootstrapDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends SparkBootstrapCommitActionExecutor<T> {
@@ -41,7 +41,7 @@ public class SparkBootstrapDeltaCommitActionExecutor<T extends HoodieRecordPaylo
}
@Override
- protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(JavaRDD<HoodieRecord> inputRecordsRDD) {
+ protected BaseSparkCommitActionExecutor<T> getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD) {
return new SparkBulkInsertDeltaCommitActionExecutor((HoodieSparkEngineContext) context, new HoodieWriteConfig.Builder().withProps(config.getProps())
.withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
inputRecordsRDD, extraMetadata);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
index 594a910..7d2a4c0 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java
@@ -28,14 +28,11 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
-import org.apache.spark.api.java.JavaRDD;
-
public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
@@ -52,10 +49,8 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
- HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = executeClustering(clusteringPlan);
- JavaRDD<WriteStatus> transformedWriteStatuses = HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses());
- return writeMetadata.clone(transformedWriteStatuses);
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ return executeClustering(clusteringPlan);
}
@Override
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index ba3b0be..ade5508 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -19,8 +19,8 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.client.utils.SparkValidatorUtils;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
@@ -37,6 +37,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieIOException;
@@ -54,13 +55,13 @@ import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.storage.StorageLevel;
-import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
@@ -76,10 +77,13 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import scala.Tuple2;
+
import static org.apache.hudi.common.util.ClusteringUtils.getAllFileGroupsInPendingClusteringPlans;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayload> extends
- BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, HoodieWriteMetadata> {
+ BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {
private static final Logger LOG = LogManager.getLogger(BaseSparkCommitActionExecutor.class);
protected final Option<BaseKeyGenerator> keyGeneratorOpt;
@@ -97,7 +101,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
HoodieTable table,
String instantTime,
WriteOperationType operationType,
- Option extraMetadata) {
+ Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, operationType, extraMetadata);
try {
keyGeneratorOpt = config.populateMetaFields()
@@ -108,14 +112,14 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
}
}
- private JavaRDD<HoodieRecord<T>> clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering");
Set<HoodieFileGroupId> fileGroupsInPendingClustering =
- table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet());
- UpdateStrategy updateStrategy = (UpdateStrategy) ReflectionUtils
+ table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
+ UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = (UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
- Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
- (Pair<JavaRDD<HoodieRecord<T>>, Set<HoodieFileGroupId>>) updateStrategy.handleUpdate(inputRecordsRDD);
+ Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
+ updateStrategy.handleUpdate(inputRecords);
Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight();
if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
return recordsAndPendingClusteringFileGroups.getLeft();
@@ -138,20 +142,20 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
- HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new HoodieWriteMetadata<>();
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(HoodieData<HoodieRecord<T>> inputRecords) {
// Cache the tagged records, so we don't end up computing both
// TODO: Consistent contract in HoodieWriteClient regarding preppedRecord storage level handling
- if (inputRecordsRDD.getStorageLevel() == StorageLevel.NONE()) {
- inputRecordsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
+ JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
+ if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
+ inputRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
} else {
- LOG.info("RDD PreppedRecords was persisted at: " + inputRecordsRDD.getStorageLevel());
+ LOG.info("RDD PreppedRecords was persisted at: " + inputRDD.getStorageLevel());
}
WorkloadProfile workloadProfile = null;
if (isWorkloadProfileNeeded()) {
context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile");
- workloadProfile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType, table.getIndex().canIndexLogFiles());
+ workloadProfile = new WorkloadProfile(buildProfile(inputRecords), operationType, table.getIndex().canIndexLogFiles());
LOG.info("Input workload profile :" + workloadProfile);
}
@@ -162,30 +166,23 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
}
// handle records update with clustering
- JavaRDD<HoodieRecord<T>> inputRecordsRDDWithClusteringUpdate = clusteringHandleUpdate(inputRecordsRDD);
+ HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = clusteringHandleUpdate(inputRecords);
context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data");
- JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDDWithClusteringUpdate, partitioner);
- JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
- if (WriteOperationType.isChangingRecords(operationType)) {
- return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
- } else {
- return handleInsertPartition(instantTime, partition, recordItr, partitioner);
- }
- }, true).flatMap(List::iterator);
-
- updateIndexAndCommitIfNeeded(writeStatusRDD, result);
+ HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
+ updateIndexAndCommitIfNeeded(writeStatuses, result);
return result;
}
- private Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ private Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(HoodieData<HoodieRecord<T>> inputRecords) {
HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
WorkloadStat globalStat = new WorkloadStat();
// group the records by partitionPath + currentLocation combination, count the number of
// records in each partition
- Map<Tuple2<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecordsRDD
- .mapToPair(record -> new Tuple2<>(
+ Map<Tuple2<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecords
+ .mapToPair(record -> Pair.of(
new Tuple2<>(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
.countByKey();
@@ -223,9 +220,9 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
}
}
- private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
- JavaPairRDD<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> mappedRDD = dedupedRecords.mapToPair(
- record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record));
+ private HoodieData<WriteStatus> mapPartitionsAsRDD(HoodieData<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
+ JavaPairRDD<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> mappedRDD = HoodieJavaPairRDD.getJavaPairRDD(
+ dedupedRecords.mapToPair(record -> Pair.of(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record)));
JavaPairRDD<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> partitionedRDD;
if (table.requireSortedRecords()) {
@@ -242,24 +239,28 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
// Partition only
partitionedRDD = mappedRDD.partitionBy(partitioner);
}
-
- return partitionedRDD.map(Tuple2::_2);
+ return HoodieJavaRDD.of(partitionedRDD.map(Tuple2::_2).mapPartitionsWithIndex((partition, recordItr) -> {
+ if (WriteOperationType.isChangingRecords(operationType)) {
+ return handleUpsertPartition(instantTime, partition, recordItr, partitioner);
+ } else {
+ return handleInsertPartition(instantTime, partition, recordItr, partitioner);
+ }
+ }, true).flatMap(List::iterator));
}
- protected JavaRDD<WriteStatus> updateIndex(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
+ protected HoodieData<WriteStatus> updateIndex(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
// cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
// RDD actions that are performed after updating the index.
- writeStatusRDD = writeStatusRDD.persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
+ writeStatuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE));
Instant indexStartTime = Instant.now();
// Update the index back
- JavaRDD<WriteStatus> statuses = HoodieJavaRDD.getJavaRDD(
- table.getIndex().updateLocation(HoodieJavaRDD.of(writeStatusRDD), context, table));
+ HoodieData<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
result.setWriteStatuses(statuses);
return statuses;
}
- protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, HoodieWriteMetadata result) {
+ protected void updateIndexAndCommitIfNeeded(HoodieData<WriteStatus> writeStatusRDD, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
updateIndex(writeStatusRDD, result);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));
commitOnAutoCommit(result);
@@ -271,19 +272,19 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
}
@Override
- protected void setCommitMetadata(HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
- result.setCommitMetadata(Option.of(CommitUtils.buildMetadata(result.getWriteStatuses().map(WriteStatus::getStat).collect(),
+ protected void setCommitMetadata(HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
+ result.setCommitMetadata(Option.of(CommitUtils.buildMetadata(result.getWriteStatuses().map(WriteStatus::getStat).collectAsList(),
result.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType())));
}
@Override
- protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result) {
+ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result) {
context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect");
- commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collect());
+ commit(extraMetadata, result, result.getWriteStatuses().map(WriteStatus::getStat).collectAsList());
}
- protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<JavaRDD<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
+ protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<HoodieData<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
String actionType = getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType + ", operation Type " + operationType);
result.setCommitted(true);
@@ -304,7 +305,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
}
}
- protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeStatuses) {
+ protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeStatuses) {
return Collections.emptyMap();
}
@@ -341,20 +342,20 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition with fileId => " + fileId);
- return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
+ return Collections.emptyIterator();
}
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(partitionPath, fileId, recordItr);
return handleUpdateInternal(upsertHandle, fileId);
}
- protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String fileId)
+ protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
- SparkMergeHelper.newInstance().runMerge(table, upsertHandle);
+ HoodieMergeHelper.newInstance().runMerge(table, upsertHandle);
}
// TODO(vc): This needs to be revisited
@@ -383,9 +384,9 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition");
- return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
+ return Collections.emptyIterator();
}
- return new SparkLazyInsertIterable(recordItr, true, config, instantTime, table, idPfx,
+ return new SparkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
taskContextSupplier, new CreateHandleFactory<>());
}
@@ -393,7 +394,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
}
- return new UpsertPartitioner(profile, context, table, config);
+ return new UpsertPartitioner<>(profile, context, table, config);
}
public Partitioner getInsertPartitioner(WorkloadProfile profile) {
@@ -407,7 +408,7 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
}
@Override
- protected void runPrecommitValidators(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
+ protected void runPrecommitValidators(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
SparkValidatorUtils.runValidators(config, writeMetadata, context, table, instantTime);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
index f4f1d3a..f4b01c8 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -30,24 +31,22 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
-
import java.util.Map;
public class SparkBulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>> extends BaseSparkCommitActionExecutor<T> {
- private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
- private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner;
+ private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
+ private final Option<BulkInsertPartitioner> bulkInsertPartitioner;
public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
- Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD,
+ Option<BulkInsertPartitioner> bulkInsertPartitioner) {
this(context, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty());
}
public SparkBulkInsertCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
- Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner,
+ String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD,
+ Option<BulkInsertPartitioner> bulkInsertPartitioner,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
this.inputRecordsRDD = inputRecordsRDD;
@@ -55,7 +54,7 @@ public class SparkBulkInsertCommitActionExecutor<T extends HoodieRecordPayload<T
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
try {
return SparkBulkInsertHelper.newInstance().bulkInsert(inputRecordsRDD, instantTime, table, config,
this, true, bulkInsertPartitioner);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
index d0c5dde..38e3810 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java
@@ -19,6 +19,7 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -26,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
import org.apache.hudi.execution.bulkinsert.BulkInsertMapFunction;
import org.apache.hudi.io.CreateHandleFactory;
@@ -46,28 +48,28 @@ import java.util.stream.IntStream;
* @param <T>
*/
@SuppressWarnings("checkstyle:LineLength")
-public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends BaseBulkInsertHelper<T, JavaRDD<HoodieRecord<T>>,
- JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> {
+public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends BaseBulkInsertHelper<T, HoodieData<HoodieRecord<T>>,
+ HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> {
private SparkBulkInsertHelper() {
}
private static class BulkInsertHelperHolder {
- private static final SparkBulkInsertHelper SPARK_BULK_INSERT_HELPER = new SparkBulkInsertHelper();
+ private static final SparkBulkInsertHelper HOODIE_BULK_INSERT_HELPER = new SparkBulkInsertHelper<>();
}
public static SparkBulkInsertHelper newInstance() {
- return BulkInsertHelperHolder.SPARK_BULK_INSERT_HELPER;
+ return BulkInsertHelperHolder.HOODIE_BULK_INSERT_HELPER;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> bulkInsert(final JavaRDD<HoodieRecord<T>> inputRecords,
- final String instantTime,
- final HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
- final HoodieWriteConfig config,
- final BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, R> executor,
- final boolean performDedupe,
- final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> bulkInsert(final HoodieData<HoodieRecord<T>> inputRecords,
+ final String instantTime,
+ final HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
+ final HoodieWriteConfig config,
+ final BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, R> executor,
+ final boolean performDedupe,
+ final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
HoodieWriteMetadata result = new HoodieWriteMetadata();
//transition bulk_insert state to inflight
@@ -75,7 +77,7 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas
executor.getCommitActionType(), instantTime), Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());
// write new files
- JavaRDD<WriteStatus> writeStatuses =
+ HoodieData<WriteStatus> writeStatuses =
bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), new CreateHandleFactory(false));
//update index
((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);
@@ -83,39 +85,40 @@ public class SparkBulkInsertHelper<T extends HoodieRecordPayload, R> extends Bas
}
@Override
- public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> inputRecords,
- String instantTime,
- HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table,
- HoodieWriteConfig config,
- boolean performDedupe,
- Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner,
- boolean useWriterSchema,
- int parallelism,
- WriteHandleFactory writeHandleFactory) {
+ public HoodieData<WriteStatus> bulkInsert(HoodieData<HoodieRecord<T>> inputRecords,
+ String instantTime,
+ HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
+ HoodieWriteConfig config,
+ boolean performDedupe,
+ Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner,
+ boolean useWriterSchema,
+ int parallelism,
+ WriteHandleFactory writeHandleFactory) {
// De-dupe/merge if needed
- JavaRDD<HoodieRecord<T>> dedupedRecords = inputRecords;
+ HoodieData<HoodieRecord<T>> dedupedRecords = inputRecords;
if (performDedupe) {
- dedupedRecords = (JavaRDD<HoodieRecord<T>>) SparkWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
+ dedupedRecords = (HoodieData<HoodieRecord<T>>) HoodieWriteHelper.newInstance().combineOnCondition(config.shouldCombineBeforeInsert(), inputRecords,
parallelism, table);
}
- final JavaRDD<HoodieRecord<T>> repartitionedRecords;
+ final HoodieData<HoodieRecord<T>> repartitionedRecords;
BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.isPresent()
? userDefinedBulkInsertPartitioner.get()
: BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode());
- repartitionedRecords = (JavaRDD<HoodieRecord<T>>) partitioner.repartitionRecords(dedupedRecords, parallelism);
+ // only JavaRDD is supported for Spark partitioner, but it is not enforced by BulkInsertPartitioner API. To improve this, TODO HUDI-3463
+ repartitionedRecords = HoodieJavaRDD.of((JavaRDD<HoodieRecord<T>>) partitioner.repartitionRecords(HoodieJavaRDD.getJavaRDD(dedupedRecords), parallelism));
// generate new file ID prefixes for each output partition
final List<String> fileIDPrefixes =
IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
- JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
- .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime,
+ JavaRDD<WriteStatus> writeStatusRDD = HoodieJavaRDD.getJavaRDD(repartitionedRecords)
+ .mapPartitionsWithIndex(new BulkInsertMapFunction<>(instantTime,
partitioner.arePartitionRecordsSorted(), config, table, fileIDPrefixes, useWriterSchema, writeHandleFactory), true)
.flatMap(List::iterator);
- return writeStatusRDD;
+ return HoodieJavaRDD.of(writeStatusRDD);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
index 28d8cb0..8862981 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertPreppedCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -30,25 +31,23 @@ import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
-
public class SparkBulkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
- private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
- private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner;
+ private final HoodieData<HoodieRecord<T>> preppedInputRecordRdd;
+ private final Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner;
public SparkBulkInsertPreppedCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
- Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> userDefinedBulkInsertPartitioner) {
+ String instantTime, HoodieData<HoodieRecord<T>> preppedInputRecordRdd,
+ Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd;
this.userDefinedBulkInsertPartitioner = userDefinedBulkInsertPartitioner;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
try {
return SparkBulkInsertHelper.newInstance().bulkInsert(preppedInputRecordRdd, instantTime, table, config,
this, false, userDefinedBulkInsertPartitioner);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteCommitActionExecutor.java
index 997c7bf..a6fc996 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeleteCommitActionExecutor.java
@@ -20,29 +20,28 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
public class SparkDeleteCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
- private final JavaRDD<HoodieKey> keys;
+ private final HoodieData<HoodieKey> keys;
public SparkDeleteCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieKey> keys) {
+ String instantTime, HoodieData<HoodieKey> keys) {
super(context, config, table, instantTime, WriteOperationType.DELETE);
this.keys = keys;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
- return SparkDeleteHelper.newInstance().execute(instantTime, keys, context, config, table, this);
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ return HoodieDeleteHelper.newInstance().execute(instantTime, keys, context, config, table, this);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
index 90bcdc9..b31eb7b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
@@ -19,20 +19,18 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import scala.Tuple2;
import java.time.Duration;
import java.util.HashMap;
@@ -51,16 +49,15 @@ public class SparkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayl
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
- JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
HoodieTimer timer = new HoodieTimer().startTimer();
- Map<String, List<String>> partitionToReplaceFileIds = jsc.parallelize(partitions, partitions.size()).distinct()
- .mapToPair(partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
- HoodieWriteMetadata result = new HoodieWriteMetadata();
+ context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
+ Map<String, List<String>> partitionToReplaceFileIds = HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitions).distinct()
+ .mapToPair(partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>();
result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
-
- result.setWriteStatuses(jsc.emptyRDD());
+ result.setWriteStatuses(context.emptyHoodieData());
this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
this.commitOnAutoCommit(result);
return result;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java
index ba91fe1..479b513 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertCommitActionExecutor.java
@@ -20,30 +20,29 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
public class SparkInsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
- private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+ private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
public SparkInsertCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
super(context, config, table, instantTime, WriteOperationType.INSERT);
this.inputRecordsRDD = inputRecordsRDD;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
- return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
index 7a3549c..518063e 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
@@ -19,18 +19,21 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+
import org.apache.spark.Partitioner;
-import org.apache.spark.api.java.JavaRDD;
-import scala.Tuple2;
import java.util.List;
import java.util.Map;
@@ -39,25 +42,25 @@ import java.util.stream.Collectors;
public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
- private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+ private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
this(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE);
}
public SparkInsertOverwriteCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
+ String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD,
WriteOperationType writeOperationType) {
super(context, config, table, instantTime, writeOperationType);
this.inputRecordsRDD = inputRecordsRDD;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
- return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(), this, operationType);
}
@@ -74,13 +77,13 @@ public class SparkInsertOverwriteCommitActionExecutor<T extends HoodieRecordPayl
}
@Override
- protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
- return writeMetadata.getWriteStatuses().map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
- new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
+ protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
+ return HoodieJavaPairRDD.getJavaPairRDD(writeMetadata.getWriteStatuses().map(status -> status.getStat().getPartitionPath()).distinct().mapToPair(partitionPath ->
+ Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
}
protected List<String> getAllExistingFileIds(String partitionPath) {
// because new commit is not complete. it is safe to mark all existing file Ids as old files
- return table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList());
+ return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
index f7c98d5..93d0a81 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteTableCommitActionExecutor.java
@@ -19,20 +19,19 @@
package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaPairRDD;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import scala.Tuple2;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -41,21 +40,18 @@ public class SparkInsertOverwriteTableCommitActionExecutor<T extends HoodieRecor
public SparkInsertOverwriteTableCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
super(context, config, table, instantTime, inputRecordsRDD, WriteOperationType.INSERT_OVERWRITE_TABLE);
}
@Override
- protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
- Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
+ protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata) {
List<String> partitionPaths = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
- JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
- if (partitionPaths != null && partitionPaths.size() > 0) {
- context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
- JavaRDD<String> partitionPathRdd = jsc.parallelize(partitionPaths, partitionPaths.size());
- partitionToExistingFileIds = partitionPathRdd.mapToPair(
- partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
+ if (partitionPaths == null || partitionPaths.isEmpty()) {
+ return Collections.emptyMap();
}
- return partitionToExistingFileIds;
+ context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
+ return HoodieJavaPairRDD.getJavaPairRDD(context.parallelize(partitionPaths, partitionPaths.size()).mapToPair(
+ partitionPath -> Pair.of(partitionPath, getAllExistingFileIds(partitionPath)))).collectAsMap();
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertPreppedCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertPreppedCommitActionExecutor.java
index 400147b..ff1a7e2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertPreppedCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertPreppedCommitActionExecutor.java
@@ -20,29 +20,28 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
public class SparkInsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
- private final JavaRDD<HoodieRecord<T>> preppedRecords;
+ private final HoodieData<HoodieRecord<T>> preppedRecords;
public SparkInsertPreppedCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+ String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
super(context, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
this.preppedRecords = preppedRecords;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
return super.execute(preppedRecords);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java
index c914384..ccee9cf 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java
@@ -20,30 +20,29 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
public class SparkUpsertCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
- private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+ private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
public SparkUpsertCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
super(context, config, table, instantTime, WriteOperationType.UPSERT);
this.inputRecordsRDD = inputRecordsRDD;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
- return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, operationType);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertPreppedCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertPreppedCommitActionExecutor.java
index e36073f..73d4085 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertPreppedCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertPreppedCommitActionExecutor.java
@@ -20,29 +20,28 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
-
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
public class SparkUpsertPreppedCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
- private final JavaRDD<HoodieRecord<T>> preppedRecords;
+ private final HoodieData<HoodieRecord<T>> preppedRecords;
public SparkUpsertPreppedCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+ String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
super(context, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
this.preppedRecords = preppedRecords;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
return super.execute(preppedRecords);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index 6ca4408..61cb1ff 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -19,7 +19,6 @@
package org.apache.hudi.table.action.compact;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -27,10 +26,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.table.HoodieTable;
-import org.apache.spark.api.java.JavaRDD;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_STATUS_STORAGE_LEVEL_VALUE;
/**
* Compacts a hoodie table with merge on read storage. Computes all possible compactions,
@@ -39,7 +37,7 @@ import org.apache.spark.api.java.JavaRDD;
*/
@SuppressWarnings("checkstyle:LineLength")
public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
- extends HoodieCompactor<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+ extends HoodieCompactor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> {
@Override
public void preCompact(
@@ -53,6 +51,6 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
@Override
public void maybePersist(HoodieData<WriteStatus> writeStatus, HoodieWriteConfig config) {
- HoodieJavaRDD.getJavaRDD(writeStatus).persist(SparkMemoryUtils.getWriteStatusStorageLevel(config.getProps()));
+ writeStatus.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE));
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
index 222506e..61e6f25 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseSparkDeltaCommitActionExecutor.java
@@ -18,7 +18,6 @@
package org.apache.hudi.table.action.deltacommit;
-import java.util.Map;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
@@ -42,13 +41,14 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
public abstract class BaseSparkDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkCommitActionExecutor<T> {
private static final Logger LOG = LogManager.getLogger(BaseSparkDeltaCommitActionExecutor.class);
// UpsertPartitioner for MergeOnRead table type
- private SparkUpsertDeltaCommitPartitioner mergeOnReadUpsertPartitioner;
+ private SparkUpsertDeltaCommitPartitioner<T> mergeOnReadUpsertPartitioner;
public BaseSparkDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
String instantTime, WriteOperationType operationType) {
@@ -66,7 +66,7 @@ public abstract class BaseSparkDeltaCommitActionExecutor<T extends HoodieRecordP
if (profile == null) {
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
}
- mergeOnReadUpsertPartitioner = new SparkUpsertDeltaCommitPartitioner(profile, (HoodieSparkEngineContext) context, table, config);
+ mergeOnReadUpsertPartitioner = new SparkUpsertDeltaCommitPartitioner<>(profile, (HoodieSparkEngineContext) context, table, config);
return mergeOnReadUpsertPartitioner;
}
@@ -79,7 +79,7 @@ public abstract class BaseSparkDeltaCommitActionExecutor<T extends HoodieRecordP
LOG.info("Small file corrections for updates for commit " + instantTime + " for file " + fileId);
return super.handleUpdate(partitionPath, fileId, recordItr);
} else {
- HoodieAppendHandle<?,?,?,?> appendHandle = new HoodieAppendHandle<>(config, instantTime, table,
+ HoodieAppendHandle<?, ?, ?, ?> appendHandle = new HoodieAppendHandle<>(config, instantTime, table,
partitionPath, fileId, recordItr, taskContextSupplier);
appendHandle.doAppend();
return Collections.singletonList(appendHandle.close()).iterator();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
index 6f23e41..190a714 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertDeltaCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -31,25 +32,23 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
-import org.apache.spark.api.java.JavaRDD;
-
import java.util.Map;
public class SparkBulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkDeltaCommitActionExecutor<T> {
- private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
- private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner;
+ private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
+ private final Option<BulkInsertPartitioner> bulkInsertPartitioner;
public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
- Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD,
+ Option<BulkInsertPartitioner> bulkInsertPartitioner) {
this(context, config, table, instantTime, inputRecordsRDD, bulkInsertPartitioner, Option.empty());
}
public SparkBulkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context, HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD,
- Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner,
+ String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD,
+ Option<BulkInsertPartitioner> bulkInsertPartitioner,
Option<Map<String, String>> extraMetadata) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT, extraMetadata);
this.inputRecordsRDD = inputRecordsRDD;
@@ -57,7 +56,7 @@ public class SparkBulkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayl
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
try {
return SparkBulkInsertHelper.newInstance().bulkInsert(inputRecordsRDD, instantTime, table, config,
this, true, bulkInsertPartitioner);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
index be5b903..c01bce2 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkBulkInsertPreppedDeltaCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -31,25 +32,23 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
-import org.apache.spark.api.java.JavaRDD;
-
public class SparkBulkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkDeltaCommitActionExecutor<T> {
- private final JavaRDD<HoodieRecord<T>> preppedInputRecordRdd;
- private final Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner;
+ private final HoodieData<HoodieRecord<T>> preppedInputRecordRdd;
+ private final Option<BulkInsertPartitioner> bulkInsertPartitioner;
public SparkBulkInsertPreppedDeltaCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> preppedInputRecordRdd,
- Option<BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>>> bulkInsertPartitioner) {
+ String instantTime, HoodieData<HoodieRecord<T>> preppedInputRecordRdd,
+ Option<BulkInsertPartitioner> bulkInsertPartitioner) {
super(context, config, table, instantTime, WriteOperationType.BULK_INSERT);
this.preppedInputRecordRdd = preppedInputRecordRdd;
this.bulkInsertPartitioner = bulkInsertPartitioner;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
try {
return SparkBulkInsertHelper.newInstance().bulkInsert(preppedInputRecordRdd, instantTime, table, config,
this, false, bulkInsertPartitioner);
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkDeleteDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkDeleteDeltaCommitActionExecutor.java
index 7cff563..9a5b08d 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkDeleteDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkDeleteDeltaCommitActionExecutor.java
@@ -20,30 +20,29 @@ package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.commit.SparkDeleteHelper;
-
-import org.apache.spark.api.java.JavaRDD;
+import org.apache.hudi.table.action.commit.HoodieDeleteHelper;
public class SparkDeleteDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkDeltaCommitActionExecutor<T> {
- private final JavaRDD<HoodieKey> keys;
+ private final HoodieData<HoodieKey> keys;
public SparkDeleteDeltaCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieKey> keys) {
+ String instantTime, HoodieData<HoodieKey> keys) {
super(context, config, table, instantTime, WriteOperationType.DELETE);
this.keys = keys;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
- return SparkDeleteHelper.newInstance().execute(instantTime, keys, context, config, table, this);
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ return HoodieDeleteHelper.newInstance().execute(instantTime, keys, context, config, table, this);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java
index 7e38823..4889460 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertDeltaCommitActionExecutor.java
@@ -20,31 +20,30 @@ package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.commit.SparkWriteHelper;
-
-import org.apache.spark.api.java.JavaRDD;
+import org.apache.hudi.table.action.commit.HoodieWriteHelper;
public class SparkInsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkDeltaCommitActionExecutor<T> {
- private final JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+ private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
public SparkInsertDeltaCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
super(context, config, table, instantTime, WriteOperationType.INSERT);
this.inputRecordsRDD = inputRecordsRDD;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
- return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(),this, operationType);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertPreppedDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertPreppedDeltaCommitActionExecutor.java
index e401d95..dbf0cbc 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkInsertPreppedDeltaCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -27,22 +28,20 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
-
public class SparkInsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkDeltaCommitActionExecutor<T> {
- private final JavaRDD<HoodieRecord<T>> preppedRecords;
+ private final HoodieData<HoodieRecord<T>> preppedRecords;
public SparkInsertPreppedDeltaCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+ String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
super(context, config, table, instantTime, WriteOperationType.INSERT_PREPPED);
this.preppedRecords = preppedRecords;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
return super.execute(preppedRecords);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
index c63be62..67ecb9a 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
@@ -18,32 +18,32 @@
package org.apache.hudi.table.action.deltacommit;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.commit.SparkWriteHelper;
-
-import org.apache.spark.api.java.JavaRDD;
+import org.apache.hudi.table.action.commit.HoodieWriteHelper;
public class SparkUpsertDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkDeltaCommitActionExecutor<T> {
- private JavaRDD<HoodieRecord<T>> inputRecordsRDD;
+ private final HoodieData<HoodieRecord<T>> inputRecordsRDD;
public SparkUpsertDeltaCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ String instantTime, HoodieData<HoodieRecord<T>> inputRecordsRDD) {
super(context, config, table, instantTime, WriteOperationType.UPSERT);
this.inputRecordsRDD = inputRecordsRDD;
}
@Override
- public HoodieWriteMetadata execute() {
- return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table,
config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),this, operationType);
}
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertPreppedDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertPreppedDeltaCommitActionExecutor.java
index f593fea..9540030 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertPreppedDeltaCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertPreppedDeltaCommitActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.deltacommit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
@@ -27,22 +28,20 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.spark.api.java.JavaRDD;
-
public class SparkUpsertPreppedDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseSparkDeltaCommitActionExecutor<T> {
- private final JavaRDD<HoodieRecord<T>> preppedRecords;
+ private final HoodieData<HoodieRecord<T>> preppedRecords;
public SparkUpsertPreppedDeltaCommitActionExecutor(HoodieSparkEngineContext context,
HoodieWriteConfig config, HoodieTable table,
- String instantTime, JavaRDD<HoodieRecord<T>> preppedRecords) {
+ String instantTime, HoodieData<HoodieRecord<T>> preppedRecords) {
super(context, config, table, instantTime, WriteOperationType.UPSERT_PREPPED);
this.preppedRecords = preppedRecords;
}
@Override
- public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
+ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
return super.execute(preppedRecords);
}
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 6e67bd6..ce0cc37 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -32,6 +32,7 @@ import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator;
import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
@@ -76,6 +77,7 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
@@ -83,17 +85,19 @@ import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.table.action.commit.SparkWriteHelper;
+import org.apache.hudi.table.action.commit.HoodieWriteHelper;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
@@ -451,13 +455,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
HoodieRecord recordThree =
new HoodieAvroRecord(keyTwo, dataGen.generateRandomValue(keyTwo, newCommitTime));
- JavaRDD<HoodieRecord<RawTripTestPayload>> records =
- jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
+ HoodieData<HoodieRecord<RawTripTestPayload>> records = HoodieJavaRDD.of(
+ jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
// Global dedup should be done based on recordKey only
HoodieIndex index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(true);
- List<HoodieRecord<RawTripTestPayload>> dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
+ List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
assertEquals(1, dedupedRecs.size());
assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
assertNodupesWithinPartition(dedupedRecs);
@@ -465,7 +469,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// non-Global dedup should be done based on both recordKey and partitionPath
index = mock(HoodieIndex.class);
when(index.isGlobal()).thenReturn(false);
- dedupedRecs = SparkWriteHelper.newInstance().deduplicateRecords(records, index, 1).collect();
+ dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
assertEquals(2, dedupedRecs.size());
assertNodupesWithinPartition(dedupedRecs);
@@ -779,6 +783,20 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
firstInsertRecords + secondInsertRecords, 2, false, config.populateMetaFields());
}
+ @Test
+ public void testBulkInsertWithCustomPartitioner() {
+ HoodieWriteConfig config = getConfigBuilder().withRollbackUsingMarkers(true).build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
+ final String commitTime1 = "001";
+ client.startCommitWithTime(commitTime1);
+ List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 100);
+ JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 10);
+ BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new RDDCustomColumnsSortPartitioner(new String[]{"rider"}, HoodieTestDataGenerator.AVRO_SCHEMA, false);
+ List<WriteStatus> statuses = client.bulkInsert(insertRecordsRDD1, commitTime1, Option.of(partitioner)).collect();
+ assertNoWriteErrors(statuses);
+ }
+ }
+
/**
* Tests deletion of records.
*/
@@ -2594,7 +2612,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()).build();
}
- public static class FailingPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
+ public static class FailingPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends HoodieData<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
public FailingPreCommitValidator(HoodieSparkTable table, HoodieEngineContext context, HoodieWriteConfig config) {
super(table, context, config);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index dcc41ad..b9f0252 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
@@ -556,7 +557,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
// initialize partitioner
hoodieTable.getHoodieView().sync();
BaseSparkDeltaCommitActionExecutor actionExecutor = new SparkDeleteDeltaCommitActionExecutor(context(), cfg, hoodieTable,
- newDeleteTime, deleteRDD);
+ newDeleteTime, HoodieJavaRDD.of(deleteRDD));
actionExecutor.getUpsertPartitioner(new WorkloadProfile(buildProfile(deleteRDD)));
final List<List<WriteStatus>> deleteStatus = jsc().parallelize(Arrays.asList(1)).map(x -> {
return actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator());
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 53cd6e5..0b29cf2 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -38,6 +39,7 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLayoutConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hudi.index.HoodieIndex;
@@ -325,7 +327,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
// Insert new records
BaseSparkCommitActionExecutor actionExecutor = new SparkInsertCommitActionExecutor(context, config, table,
- firstCommitTime, jsc.parallelize(records));
+ firstCommitTime, context.parallelize(records));
List<WriteStatus> writeStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator());
}).flatMap(Transformations::flattenAsIterator).collect();
@@ -368,7 +370,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
// Insert new records
final List<HoodieRecord> recs2 = records;
BaseSparkCommitActionExecutor actionExecutor = new SparkInsertPreppedCommitActionExecutor(context, config, table,
- instantTime, jsc.parallelize(recs2));
+ instantTime, context.parallelize(recs2));
List<WriteStatus> returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
return actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs2.iterator());
}).flatMap(Transformations::flattenAsIterator).collect();
@@ -389,7 +391,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
// Insert new records
final List<HoodieRecord> recs3 = records;
BaseSparkCommitActionExecutor newActionExecutor = new SparkUpsertPreppedCommitActionExecutor(context, config, table,
- instantTime, jsc.parallelize(recs3));
+ instantTime, context.parallelize(recs3));
returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
return newActionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), recs3.iterator());
}).flatMap(Transformations::flattenAsIterator).collect();
@@ -422,7 +424,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
// Insert new records
BaseSparkCommitActionExecutor actionExecutor = new SparkUpsertCommitActionExecutor(context, config, table,
- instantTime, jsc.parallelize(records));
+ instantTime, context.parallelize(records));
jsc.parallelize(Arrays.asList(1))
.map(i -> actionExecutor.handleInsert(FSUtils.createNewFileIdPfx(), records.iterator()))
.map(Transformations::flatten).collect();
@@ -452,7 +454,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
// Perform inserts of 100 records to test CreateHandle and BufferedExecutor
final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
BaseSparkCommitActionExecutor actionExecutor = new SparkInsertCommitActionExecutor(context, config, table,
- instantTime, jsc.parallelize(inserts));
+ instantTime, context.parallelize(inserts));
final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> {
return actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator());
}).map(Transformations::flatten).collect();
@@ -466,7 +468,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
long numRecordsInPartition = updates.stream().filter(u -> u.getPartitionPath().equals(partitionPath)).count();
table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, HoodieTableMetaClient.reload(metaClient));
BaseSparkCommitActionExecutor newActionExecutor = new SparkUpsertCommitActionExecutor(context, config, table,
- instantTime, jsc.parallelize(updates));
+ instantTime, context.parallelize(updates));
final List<List<WriteStatus>> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> {
return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator());
}).map(Transformations::flatten).collect();
@@ -486,8 +488,8 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
// Insert new records
final JavaRDD<HoodieRecord> inputRecords = generateTestRecordsForBulkInsert(jsc);
SparkBulkInsertCommitActionExecutor bulkInsertExecutor = new SparkBulkInsertCommitActionExecutor(
- context, config, table, instantTime, inputRecords, Option.empty());
- List<WriteStatus> returnedStatuses = ((JavaRDD<WriteStatus>)bulkInsertExecutor.execute().getWriteStatuses()).collect();
+ context, config, table, instantTime, HoodieJavaRDD.of(inputRecords), Option.empty());
+ List<WriteStatus> returnedStatuses = ((HoodieData<WriteStatus>) bulkInsertExecutor.execute().getWriteStatuses()).collectAsList();
verifyStatusResult(returnedStatuses, generateExpectedPartitionNumRecords(inputRecords));
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestDeleteHelper.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestDeleteHelper.java
deleted file mode 100644
index 2d852f8..0000000
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestDeleteHelper.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.action.commit;
-
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
-import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.HoodieWriteMetadata;
-
-import org.apache.spark.Partition;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import java.util.Collections;
-import java.util.List;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class TestDeleteHelper {
-
- private enum CombineTestMode {
- None, GlobalIndex, NoneGlobalIndex;
- }
-
- private static final String BASE_PATH = "/tmp/";
- private static final boolean WITH_COMBINE = true;
- private static final boolean WITHOUT_COMBINE = false;
- private static final int DELETE_PARALLELISM = 200;
-
- @Mock
- private HoodieIndex index;
- @Mock
- private HoodieTable<EmptyHoodieRecordPayload, JavaRDD<HoodieRecord>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table;
- @Mock
- private BaseSparkCommitActionExecutor<EmptyHoodieRecordPayload> executor;
- @Mock
- private HoodieWriteMetadata metadata;
- @Mock
- private JavaPairRDD keyPairs;
- @Mock
- private JavaSparkContext jsc;
- @Mock
- private HoodieSparkEngineContext context;
-
- private JavaRDD<HoodieKey> rddToDelete;
- private HoodieWriteConfig config;
-
- @BeforeEach
- public void setUp() {
- when(table.getIndex()).thenReturn(index);
- when(context.getJavaSparkContext()).thenReturn(jsc);
- }
-
- @Test
- public void deleteWithEmptyRDDShouldNotExecute() {
- rddToDelete = mockEmptyHoodieKeyRdd();
- config = newWriteConfig(WITHOUT_COMBINE);
-
- SparkDeleteHelper.newInstance().execute("test-time", rddToDelete, context, config, table, executor);
-
- verify(rddToDelete, never()).repartition(DELETE_PARALLELISM);
- verifyNoDeleteExecution();
- }
-
- @Test
- public void deleteWithoutCombineShouldRepartitionForNonEmptyRdd() {
- rddToDelete = newHoodieKeysRddMock(2, CombineTestMode.None);
- config = newWriteConfig(WITHOUT_COMBINE);
-
- SparkDeleteHelper.newInstance().execute("test-time", rddToDelete, context, config, table, executor);
-
- verify(rddToDelete, times(1)).repartition(DELETE_PARALLELISM);
- verifyDeleteExecution();
- }
-
- @Test
- public void deleteWithCombineShouldRepartitionForNonEmptyRddAndNonGlobalIndex() {
- rddToDelete = newHoodieKeysRddMock(2, CombineTestMode.NoneGlobalIndex);
- config = newWriteConfig(WITH_COMBINE);
-
- SparkDeleteHelper.newInstance().execute("test-time", rddToDelete, context, config, table, executor);
-
- verify(rddToDelete, times(1)).distinct(DELETE_PARALLELISM);
- verifyDeleteExecution();
- }
-
- @Test
- public void deleteWithCombineShouldRepartitionForNonEmptyRddAndGlobalIndex() {
- rddToDelete = newHoodieKeysRddMock(2, CombineTestMode.GlobalIndex);
- config = newWriteConfig(WITH_COMBINE);
- when(index.isGlobal()).thenReturn(true);
-
- SparkDeleteHelper.newInstance().execute("test-time", rddToDelete, context, config, table, executor);
-
- verify(keyPairs, times(1)).reduceByKey(any(), eq(DELETE_PARALLELISM));
- verifyDeleteExecution();
- }
-
- private void verifyDeleteExecution() {
- verify(executor, times(1)).execute(any());
- verify(metadata, times(1)).setIndexLookupDuration(any());
- }
-
- private void verifyNoDeleteExecution() {
- verify(executor, never()).execute(any());
- }
-
- private HoodieWriteConfig newWriteConfig(boolean combine) {
- return HoodieWriteConfig.newBuilder()
- .combineDeleteInput(combine)
- .withPath(BASE_PATH)
- .withDeleteParallelism(DELETE_PARALLELISM)
- .build();
- }
-
- private JavaRDD<HoodieKey> newHoodieKeysRddMock(int howMany, CombineTestMode combineMode) {
- JavaRDD<HoodieKey> keysToDelete = mock(JavaRDD.class);
-
- JavaRDD recordsRdd = mock(JavaRDD.class);
- when(recordsRdd.filter(any())).thenReturn(recordsRdd);
- when(recordsRdd.isEmpty()).thenReturn(howMany <= 0);
- when(index.tagLocation(any(), any(), any())).thenReturn(HoodieJavaRDD.of(recordsRdd));
-
- if (combineMode == CombineTestMode.GlobalIndex) {
- when(keyPairs.reduceByKey(any(), anyInt())).thenReturn(keyPairs);
- when(keyPairs.values()).thenReturn(keysToDelete);
- when(keysToDelete.keyBy(any())).thenReturn(keyPairs);
- } else if (combineMode == CombineTestMode.NoneGlobalIndex) {
- when(keysToDelete.distinct(anyInt())).thenReturn(keysToDelete);
- } else if (combineMode == CombineTestMode.None) {
- List<Partition> parts = mock(List.class);
- when(parts.isEmpty()).thenReturn(howMany <= 0);
- when(keysToDelete.repartition(anyInt())).thenReturn(keysToDelete);
- when(keysToDelete.partitions()).thenReturn(parts);
- }
-
- when(keysToDelete.map(any())).thenReturn(recordsRdd);
- when(executor.execute(any())).thenReturn(metadata);
- return keysToDelete;
- }
-
- private JavaRDD<HoodieKey> mockEmptyHoodieKeyRdd() {
- JavaRDD<HoodieKey> emptyRdd = mock(JavaRDD.class);
- doReturn(true).when(emptyRdd).isEmpty();
- doReturn(Collections.emptyList()).when(emptyRdd).partitions();
- doReturn(emptyRdd).when(emptyRdd).map(any());
-
- doReturn(HoodieJavaRDD.of(emptyRdd)).when(index).tagLocation(any(), any(), any());
- doReturn(emptyRdd).when(emptyRdd).filter(any());
-
- doNothing().when(executor).saveWorkloadProfileMetadataToInflight(any(), anyString());
- doReturn(emptyRdd).when(jsc).emptyRDD();
- return emptyRdd;
- }
-
-}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 9afe5f3..59174a9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.compact;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
@@ -195,12 +196,12 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
String compactionInstantTime = "102";
table.scheduleCompaction(context, compactionInstantTime, Option.empty());
table.getMetaClient().reloadActiveTimeline();
- JavaRDD<WriteStatus> result = (JavaRDD<WriteStatus>) table.compact(
+ HoodieData<WriteStatus> result = (HoodieData<WriteStatus>) table.compact(
context, compactionInstantTime).getWriteStatuses();
// Verify that all partition paths are present in the WriteStatus result
for (String partitionPath : dataGen.getPartitionPaths()) {
- List<WriteStatus> writeStatuses = result.collect();
+ List<WriteStatus> writeStatuses = result.collectAsList();
assertTrue(writeStatuses.stream()
.filter(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)).count() > 0);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 093fd43..4e8d2b7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -97,6 +97,12 @@ public abstract class HoodieData<T> implements Serializable {
*/
public abstract HoodieData<T> distinct();
+ public abstract HoodieData<T> distinct(int parallelism);
+
+ public abstract <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> keyGetter, int parallelism);
+
+ public abstract HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc);
+
/**
* Unions this {@link HoodieData} with other {@link HoodieData}.
* @param other {@link HoodieData} of interest.
@@ -108,4 +114,6 @@ public abstract class HoodieData<T> implements Serializable {
* @return collected results in {@link List<T>}.
*/
public abstract List<T> collectAsList();
+
+ public abstract HoodieData<T> repartition(int parallelism);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
index 9441619..c23e712 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieList.java
@@ -133,6 +133,26 @@ public class HoodieList<T> extends HoodieData<T> {
}
@Override
+ public HoodieData<T> distinct(int parallelism) {
+ return distinct();
+ }
+
+ @Override
+ public <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> keyGetter, int parallelism) {
+ return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
+ .reduceByKey((value1, value2) -> value1, parallelism)
+ .values();
+ }
+
+ @Override
+ public HoodieData<T> filter(SerializableFunction<T, Boolean> filterFunc) {
+ return HoodieList.of(listData
+ .stream()
+ .filter(i -> throwingMapWrapper(filterFunc).apply(i))
+ .collect(Collectors.toList()));
+ }
+
+ @Override
public HoodieData<T> union(HoodieData<T> other) {
List<T> unionResult = new ArrayList<>();
unionResult.addAll(listData);
@@ -144,4 +164,10 @@ public class HoodieList<T> extends HoodieData<T> {
public List<T> collectAsList() {
return listData;
}
+
+ @Override
+ public HoodieData<T> repartition(int parallelism) {
+ // no op
+ return this;
+ }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
index c941231..1e125c9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieMapPair.java
@@ -20,6 +20,7 @@
package org.apache.hudi.common.data;
import org.apache.hudi.common.function.FunctionWrapper;
+import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
@@ -27,6 +28,7 @@ import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -111,6 +113,15 @@ public class HoodieMapPair<K, V> extends HoodiePairData<K, V> {
}
@Override
+ public HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> func, int parallelism) {
+ return HoodieMapPair.of(mapPairData.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> {
+ Option<V> reducedValue = Option.fromJavaOptional(e.getValue().stream().reduce(func::apply));
+ return reducedValue.isPresent() ? Collections.singletonList(reducedValue.get()) : Collections.emptyList();
+ })));
+ }
+
+ @Override
public <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func) {
Function<Pair<K, V>, O> throwableFunc = throwingMapWrapper(func);
return HoodieList.of(
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
index b9bdcb3..9ff5279 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.data;
+import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Option;
@@ -72,6 +73,8 @@ public abstract class HoodiePairData<K, V> implements Serializable {
*/
public abstract Map<K, Long> countByKey();
+ public abstract HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> func, int parallelism);
+
/**
* @param func serializable map function.
* @param <O> output object type.
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java
new file mode 100644
index 0000000..6130d4a
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieList.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.data;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestHoodieList {
+
+ private static Stream<Arguments> distinctWithKey() {
+ return Stream.of(
+ Arguments.of(
+ Arrays.asList(Pair.of("k1", 1), Pair.of("k2", 2)),
+ Arrays.asList(Pair.of("k1", 1), Pair.of("k1", 10), Pair.of("k1", 100), Pair.of("k2", 2)))
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void distinctWithKey(List<Pair<String, Integer>> expected, List<Pair<String, Integer>> originalList) {
+ List<Pair<String, Integer>> distinctList = HoodieList.of(originalList).distinctWithKey(Pair::getLeft, 1).collectAsList();
+ assertEquals(expected, distinctList);
+ }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java
index 86b1a21..20e9a8f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieMapPair.java
@@ -25,6 +25,9 @@ import org.apache.hudi.common.util.collection.Pair;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Arrays;
@@ -33,7 +36,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;
+import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestHoodieMapPair {
@@ -117,6 +123,29 @@ public class TestHoodieMapPair {
})));
}
+ private static Stream<Arguments> testReduceByKey() {
+ return Stream.of(
+ Arguments.of(
+ createImmutableMap(
+ Pair.of(1, createImmutableList(1001)),
+ Pair.of(2, createImmutableList(2001)),
+ Pair.of(3, createImmutableList(3001)),
+ Pair.of(4, createImmutableList())),
+ createImmutableMap(
+ Pair.of(1, createImmutableList(1001, 1002, 1003)),
+ Pair.of(2, createImmutableList(2001, 2002)),
+ Pair.of(3, createImmutableList(3001)),
+ Pair.of(4, createImmutableList())))
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ public void testReduceByKey(Map<Integer, List<Integer>> expected, Map<Integer, List<Integer>> original) {
+ HoodiePairData<Integer, Integer> reduced = HoodieMapPair.of(original).reduceByKey((a, b) -> a, 1);
+ assertEquals(expected, HoodieMapPair.getMapPair(reduced));
+ }
+
@Test
public void testLeftOuterJoinSingleValuePerKey() {
HoodiePairData<String, String> pairData1 = constructTestMapPairData(Arrays.asList(