You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/05/28 00:18:46 UTC
[hudi] 03/40: [HUDI-681]Remove embeddedTimelineService from
HoodieReadClient (#1388)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 1d16b151b9079c4ad9bfb08bfa6114c3ad585e0f
Author: hongdd <jn...@163.com>
AuthorDate: Mon Mar 9 18:31:04 2020 +0800
[HUDI-681]Remove embeddedTimelineService from HoodieReadClient (#1388)
* [HUDI-681]Remove embeddedTimelineService from HoodieReadClient
---
.../org/apache/hudi/client/HoodieReadClient.java | 21 ++-------------------
.../main/java/org/apache/hudi/DataSourceUtils.java | 10 ++++------
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +----
.../hudi/utilities/deltastreamer/DeltaSync.java | 2 +-
4 files changed, 8 insertions(+), 30 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index 33d661b..d1e92b5 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -19,7 +19,6 @@
package org.apache.hudi.client;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -72,18 +71,10 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
/**
* @param basePath path to Hoodie table
*/
- public HoodieReadClient(JavaSparkContext jsc, String basePath, Option<EmbeddedTimelineService> timelineService) {
+ public HoodieReadClient(JavaSparkContext jsc, String basePath) {
this(jsc, HoodieWriteConfig.newBuilder().withPath(basePath)
// by default we use HoodieBloomIndex
- .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build(),
- timelineService);
- }
-
- /**
- * @param basePath path to Hoodie table
- */
- public HoodieReadClient(JavaSparkContext jsc, String basePath) {
- this(jsc, basePath, Option.empty());
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build());
}
/**
@@ -100,14 +91,6 @@ public class HoodieReadClient<T extends HoodieRecordPayload> implements Serializ
* @param clientConfig instance of HoodieWriteConfig
*/
public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) {
- this(jsc, clientConfig, Option.empty());
- }
-
- /**
- * @param clientConfig instance of HoodieWriteConfig
- */
- public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
- Option<EmbeddedTimelineService> timelineService) {
this.jsc = jsc;
final String basePath = clientConfig.getBasePath();
// Create a Hoodie table which encapsulated the commits and files visible
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 35c5955..475a925 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -21,11 +21,9 @@ package org.apache.hudi;
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -184,9 +182,9 @@ public class DataSourceUtils {
@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
- HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) {
+ HoodieWriteConfig writeConfig) {
try {
- HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig, timelineService);
+ HoodieReadClient client = new HoodieReadClient<>(jssc, writeConfig);
return client.tagLocation(incomingHoodieRecords)
.filter(r -> !((HoodieRecord<HoodieRecordPayload>) r).isCurrentLocationKnown());
} catch (TableNotFoundException e) {
@@ -198,10 +196,10 @@ public class DataSourceUtils {
@SuppressWarnings("unchecked")
public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
- Map<String, String> parameters, Option<EmbeddedTimelineService> timelineService) {
+ Map<String, String> parameters) {
HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
- return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, timelineService);
+ return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
}
public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath) {
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 80a01d3..326595f 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -131,10 +131,7 @@ private[hudi] object HoodieSparkSqlWriter {
val hoodieRecords =
if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
- DataSourceUtils.dropDuplicates(
- jsc,
- hoodieAllIncomingRecords,
- mapAsJavaMap(parameters), client.getTimelineServer)
+ DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
} else {
hoodieAllIncomingRecords
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 97d3d42..4b69d22 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -357,7 +357,7 @@ public class DeltaSync implements Serializable {
if (cfg.filterDupes) {
// turn upserts to insert
cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : cfg.operation;
- records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig(), writeClient.getTimelineServer());
+ records = DataSourceUtils.dropDuplicates(jssc, records, writeClient.getConfig());
}
boolean isEmpty = records.isEmpty();