You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/06/13 23:45:48 UTC
[hudi] 01/01: [HUDI-3730] Improve meta sync class design and hierarchies (#5754)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch release-feature-rfc55
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c88813d8a2b6f04d087fd575ebff35b8dd0da748
Author: 冯健 <fe...@gmail.com>
AuthorDate: Mon Jun 13 01:55:09 2022 +0800
[HUDI-3730] Improve meta sync class design and hierarchies (#5754)
Co-authored-by: jian.feng <fe...@gmial.com>
Co-authored-by: jian.feng <ji...@shopee.com>
---
.../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 20 +-
.../hudi/aws/sync/AwsGlueCatalogSyncTool.java | 4 +-
.../apache/hudi/sink/utils/HiveSyncContext.java | 46 ++---
.../hudi/sink/utils/TestHiveSyncContext.java | 4 +-
.../apache/hudi/gcp/bigquery/BigQuerySyncTool.java | 4 +-
.../gcp/bigquery/HoodieBigQuerySyncClient.java | 41 +---
.../integ/testsuite/dag/nodes/HiveQueryNode.java | 4 +-
.../writers/KafkaConnectTransactionServices.java | 3 +-
.../main/java/org/apache/hudi/DataSourceUtils.java | 67 +++----
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 78 +++----
.../hudi/command/MergeIntoHoodieTableCommand.scala | 6 +-
.../java/org/apache/hudi/TestDataSourceUtils.java | 12 +-
.../hudi/sync/adb/AbstractAdbSyncHoodieClient.java | 27 +--
.../org/apache/hudi/sync/adb/AdbSyncConfig.java | 215 ++++++++++----------
.../java/org/apache/hudi/sync/adb/AdbSyncTool.java | 63 +++---
.../apache/hudi/sync/adb/HoodieAdbJdbcClient.java | 65 +++---
.../apache/hudi/sync/adb/TestAdbSyncConfig.java | 54 ++---
.../hudi/sync/datahub/DataHubSyncClient.java | 59 +-----
.../apache/hudi/sync/datahub/DataHubSyncTool.java | 10 +-
.../config/HoodieDataHubDatasetIdentifier.java | 2 +-
.../hudi/hive/AbstractHiveSyncHoodieClient.java | 16 +-
.../java/org/apache/hudi/hive/HiveSyncConfig.java | 223 ++++++++++-----------
.../java/org/apache/hudi/hive/HiveSyncTool.java | 74 +++----
.../org/apache/hudi/hive/HoodieHiveClient.java | 40 ++--
.../org/apache/hudi/hive/ddl/HMSDDLExecutor.java | 54 ++---
.../apache/hudi/hive/ddl/HiveQueryDDLExecutor.java | 10 +-
.../org/apache/hudi/hive/ddl/JDBCExecutor.java | 16 +-
.../hudi/hive/ddl/QueryBasedDDLExecutor.java | 34 ++--
.../hive/replication/GlobalHiveSyncConfig.java | 28 +--
.../hudi/hive/replication/GlobalHiveSyncTool.java | 2 +-
.../replication/HiveSyncGlobalCommitConfig.java | 12 +-
.../hive/replication/HiveSyncGlobalCommitTool.java | 2 +-
.../apache/hudi/hive/util/HivePartitionUtil.java | 12 +-
.../org/apache/hudi/hive/util/HiveSchemaUtil.java | 22 +-
.../hudi/hive/TestHiveSyncGlobalCommitTool.java | 24 +--
.../org/apache/hudi/hive/TestHiveSyncTool.java | 4 +-
.../testutils/HiveSyncFunctionalTestHarness.java | 28 +--
...SyncHoodieClient.java => HoodieSyncClient.java} | 109 +---------
.../apache/hudi/sync/common/HoodieSyncConfig.java | 88 ++++----
.../apache/hudi/sync/common/HoodieSyncTool.java | 49 +++++
.../apache/hudi/sync/common/SupportMetaSync.java | 7 +
.../hudi/sync/common/operation/CatalogSync.java | 32 +++
.../hudi/sync/common/operation/PartitionsSync.java | 11 +
.../sync/common/operation/ReplicatedTimeSync.java | 11 +
.../sync/common/operation/TblPropertiesSync.java | 13 ++
.../SparkDataSourceTableUtils.java} | 55 +----
.../hudi/sync/common/util/SyncUtilHelpers.java | 20 +-
.../hudi/sync/common/util/TestSyncUtilHelpers.java | 12 +-
.../hudi/utilities/deltastreamer/DeltaSync.java | 7 +-
.../functional/TestHoodieDeltaStreamer.java | 8 +-
.../utilities/testutils/UtilitiesTestBase.java | 26 +--
51 files changed, 855 insertions(+), 978 deletions(-)
diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index e15a698f27..e269908a4f 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -90,7 +90,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
super(syncConfig, hadoopConf, fs);
this.awsGlue = AWSGlueClientBuilder.standard().build();
- this.databaseName = syncConfig.databaseName;
+ this.databaseName = syncConfig.hoodieSyncConfigParams.databaseName;
}
@Override
@@ -126,7 +126,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
StorageDescriptor sd = table.getStorageDescriptor();
List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
- String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+ String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
partitionSd.setLocation(fullPartitionPath);
return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
@@ -160,7 +160,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
StorageDescriptor sd = table.getStorageDescriptor();
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
StorageDescriptor partitionSd = sd.clone();
- String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+ String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
sd.setLocation(fullPartitionPath);
PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
@@ -206,10 +206,10 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
@Override
public void updateTableDefinition(String tableName, MessageType newSchema) {
// ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
- boolean cascade = syncConfig.partitionFields.size() > 0;
+ boolean cascade = syncConfig.hoodieSyncConfigParams.partitionFields.size() > 0;
try {
Table table = getTable(awsGlue, databaseName, tableName);
- Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.supportTimestamp, false);
+ Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.hiveSyncConfigParams.supportTimestamp, false);
List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
String keyType = getPartitionKeyType(newSchemaMap, key);
return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
@@ -265,26 +265,26 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
}
CreateTableRequest request = new CreateTableRequest();
Map<String, String> params = new HashMap<>();
- if (!syncConfig.createManagedTable) {
+ if (!syncConfig.hiveSyncConfigParams.createManagedTable) {
params.put("EXTERNAL", "TRUE");
}
params.putAll(tableProperties);
try {
- Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
+ Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.hiveSyncConfigParams.supportTimestamp, false);
List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
for (String key : mapSchema.keySet()) {
String keyType = getPartitionKeyType(mapSchema, key);
Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
// In Glue, the full schema should exclude the partition keys
- if (!syncConfig.partitionFields.contains(key)) {
+ if (!syncConfig.hoodieSyncConfigParams.partitionFields.contains(key)) {
schemaWithoutPartitionKeys.add(column);
}
}
// now create the schema partition
- List<Column> schemaPartitionKeys = syncConfig.partitionFields.stream().map(partitionKey -> {
+ List<Column> schemaPartitionKeys = syncConfig.hoodieSyncConfigParams.partitionFields.stream().map(partitionKey -> {
String keyType = getPartitionKeyType(mapSchema, partitionKey);
return new Column().withName(partitionKey).withType(keyType.toLowerCase()).withComment("");
}).collect(Collectors.toList());
@@ -293,7 +293,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
serdeProperties.put("serialization.format", "1");
storageDescriptor
.withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties))
- .withLocation(s3aToS3(syncConfig.basePath))
+ .withLocation(s3aToS3(syncConfig.hoodieSyncConfigParams.basePath))
.withInputFormat(inputFormatClass)
.withOutputFormat(outputFormatClass)
.withColumns(schemaWithoutPartitionKeys);
diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
index bb1be377c9..ed5a49dbdb 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
@@ -58,11 +58,11 @@ public class AwsGlueCatalogSyncTool extends HiveSyncTool {
// parse the params
final HiveSyncConfig cfg = new HiveSyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
- if (cfg.help || args.length == 0) {
+ if (cfg.hiveSyncConfigParams.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
- FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+ FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
index 9fc5323d46..dee87e9966 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
@@ -52,7 +52,7 @@ public class HiveSyncContext {
}
public HiveSyncTool hiveSyncTool() {
- HiveSyncMode syncMode = HiveSyncMode.of(syncConfig.syncMode);
+ HiveSyncMode syncMode = HiveSyncMode.of(syncConfig.hiveSyncConfigParams.syncMode);
if (syncMode == HiveSyncMode.GLUE) {
return new AwsGlueCatalogSyncTool(this.syncConfig, this.hiveConf, this.fs);
}
@@ -76,28 +76,28 @@ public class HiveSyncContext {
@VisibleForTesting
public static HiveSyncConfig buildSyncConfig(Configuration conf) {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
- hiveSyncConfig.basePath = conf.getString(FlinkOptions.PATH);
- hiveSyncConfig.baseFileFormat = conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT);
- hiveSyncConfig.usePreApacheInputFormat = false;
- hiveSyncConfig.databaseName = conf.getString(FlinkOptions.HIVE_SYNC_DB);
- hiveSyncConfig.tableName = conf.getString(FlinkOptions.HIVE_SYNC_TABLE);
- hiveSyncConfig.syncMode = conf.getString(FlinkOptions.HIVE_SYNC_MODE);
- hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
- hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
- hiveSyncConfig.tableProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_PROPERTIES);
- hiveSyncConfig.serdeProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES);
- hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);
- hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractHivePartitionFields(conf));
- hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME);
- hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);
- hiveSyncConfig.useFileListingFromMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
- hiveSyncConfig.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS);
- hiveSyncConfig.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP);
- hiveSyncConfig.autoCreateDatabase = conf.getBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB);
- hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING);
- hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
- hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
- hiveSyncConfig.withOperationField = conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
+ hiveSyncConfig.hoodieSyncConfigParams.basePath = conf.getString(FlinkOptions.PATH);
+ hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat = conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT);
+ hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = false;
+ hiveSyncConfig.hoodieSyncConfigParams.databaseName = conf.getString(FlinkOptions.HIVE_SYNC_DB);
+ hiveSyncConfig.hoodieSyncConfigParams.tableName = conf.getString(FlinkOptions.HIVE_SYNC_TABLE);
+ hiveSyncConfig.hiveSyncConfigParams.syncMode = conf.getString(FlinkOptions.HIVE_SYNC_MODE);
+ hiveSyncConfig.hiveSyncConfigParams.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
+ hiveSyncConfig.hiveSyncConfigParams.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
+ hiveSyncConfig.hiveSyncConfigParams.tableProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_PROPERTIES);
+ hiveSyncConfig.hiveSyncConfigParams.serdeProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES);
+ hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);
+ hiveSyncConfig.hoodieSyncConfigParams.partitionFields = Arrays.asList(FilePathUtils.extractHivePartitionFields(conf));
+ hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME);
+ hiveSyncConfig.hiveSyncConfigParams.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);
+ hiveSyncConfig.hoodieSyncConfigParams.useFileListingFromMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
+ hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS);
+ hiveSyncConfig.hiveSyncConfigParams.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP);
+ hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase = conf.getBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB);
+ hiveSyncConfig.hoodieSyncConfigParams.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING);
+ hiveSyncConfig.hiveSyncConfigParams.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
+ hiveSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
+ hiveSyncConfig.hiveSyncConfigParams.withOperationField = conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
return hiveSyncConfig;
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java
index 7bfaade59e..2e8cc00bb1 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java
@@ -55,8 +55,8 @@ public class TestHiveSyncContext {
HiveSyncConfig hiveSyncConfig1 = HiveSyncContext.buildSyncConfig(configuration1);
HiveSyncConfig hiveSyncConfig2 = HiveSyncContext.buildSyncConfig(configuration2);
- assertTrue(hiveSyncConfig1.partitionFields.get(0).equals(hiveSyncPartitionField));
- assertTrue(hiveSyncConfig2.partitionFields.get(0).equals(partitionPathField));
+ assertTrue(hiveSyncConfig1.hoodieSyncConfigParams.partitionFields.get(0).equals(hiveSyncPartitionField));
+ assertTrue(hiveSyncConfig2.hoodieSyncConfigParams.partitionFields.get(0).equals(partitionPathField));
}
}
diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
index 0cb75eea89..b6904a6887 100644
--- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
import org.apache.hudi.sync.common.util.ManifestFileWriter;
import com.beust.jcommander.JCommander;
@@ -40,7 +40,7 @@ import org.apache.log4j.Logger;
*
* @Experimental
*/
-public class BigQuerySyncTool extends AbstractSyncTool {
+public class BigQuerySyncTool extends HoodieSyncTool {
private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class);
diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
index cb41ca2272..c5222aca91 100644
--- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
@@ -20,7 +20,7 @@
package org.apache.hudi.gcp.bigquery;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
@@ -39,6 +39,8 @@ import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.ViewDefinition;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.sync.common.operation.CatalogSync;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
@@ -47,7 +49,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient {
+public class HoodieBigQuerySyncClient extends HoodieSyncClient implements CatalogSync, TblPropertiesSync {
private static final Logger LOG = LogManager.getLogger(HoodieBigQuerySyncClient.class);
private final BigQuerySyncConfig syncConfig;
@@ -171,12 +173,6 @@ public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient {
return Collections.emptyMap();
}
- @Override
- public void addPartitionsToTable(final String tableName, final List<String> partitionsToAdd) {
- // bigQuery discovers the new partitions automatically, so do nothing.
- throw new UnsupportedOperationException("No support for addPartitionsToTable yet.");
- }
-
public boolean datasetExists() {
Dataset dataset = bigquery.getDataset(DatasetId.of(syncConfig.projectId, syncConfig.datasetName));
return dataset != null;
@@ -207,33 +203,8 @@ public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient {
}
@Override
- public Option<String> getLastReplicatedTime(String tableName) {
- // bigQuery doesn't support tblproperties, so do nothing.
- throw new UnsupportedOperationException("Not support getLastReplicatedTime yet.");
- }
-
- @Override
- public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
- // bigQuery doesn't support tblproperties, so do nothing.
- throw new UnsupportedOperationException("No support for updateLastReplicatedTimeStamp yet.");
- }
-
- @Override
- public void deleteLastReplicatedTimeStamp(String tableName) {
- // bigQuery doesn't support tblproperties, so do nothing.
- throw new UnsupportedOperationException("No support for deleteLastReplicatedTimeStamp yet.");
- }
-
- @Override
- public void updatePartitionsToTable(final String tableName, final List<String> changedPartitions) {
- // bigQuery updates the partitions automatically, so do nothing.
- throw new UnsupportedOperationException("No support for updatePartitionsToTable yet.");
- }
-
- @Override
- public void dropPartitions(String tableName, List<String> partitionsToDrop) {
- // bigQuery discovers the new partitions automatically, so do nothing.
- throw new UnsupportedOperationException("No support for dropPartitions yet.");
+ public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
+ throw new UnsupportedOperationException("No support for updateTableProperties yet.");
}
@Override
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
index 4736133f2c..8aef7bdf81 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
@@ -57,8 +57,8 @@ public class HiveQueryNode extends DagNode<Boolean> {
.getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(properties);
this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
- Connection con = DriverManager.getConnection(hiveSyncConfig.jdbcUrl, hiveSyncConfig.hiveUser,
- hiveSyncConfig.hivePass);
+ Connection con = DriverManager.getConnection(hiveSyncConfig.hiveSyncConfigParams.jdbcUrl, hiveSyncConfig.hiveSyncConfigParams.hiveUser,
+ hiveSyncConfig.hiveSyncConfigParams.hivePass);
Statement stmt = con.createStatement();
stmt.execute("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat");
for (String hiveProperty : this.config.getHiveProperties()) {
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index 934dbadf1c..e351fe14c3 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -37,6 +37,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.sync.common.HoodieSyncConfig;
+import org.apache.hudi.sync.common.SupportMetaSync;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
import org.apache.hadoop.conf.Configuration;
@@ -56,7 +57,7 @@ import java.util.Set;
* {@link TransactionCoordinator}
* using {@link HoodieJavaWriteClient}.
*/
-public class KafkaConnectTransactionServices implements ConnectTransactionServices {
+public class KafkaConnectTransactionServices implements ConnectTransactionServices, SupportMetaSync {
private static final Logger LOG = LogManager.getLogger(KafkaConnectTransactionServices.class);
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 4042f431d7..64b8079a64 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -279,50 +279,43 @@ public class DataSourceUtils {
public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath, String baseFileFormat) {
checkRequiredProperties(props, Collections.singletonList(DataSourceWriteOptions.HIVE_TABLE().key()));
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
- hiveSyncConfig.basePath = basePath;
- hiveSyncConfig.usePreApacheInputFormat =
- props.getBoolean(DataSourceWriteOptions.HIVE_USE_PRE_APACHE_INPUT_FORMAT().key(),
+ hiveSyncConfig.hoodieSyncConfigParams.basePath = basePath;
+ hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = props.getBoolean(DataSourceWriteOptions.HIVE_USE_PRE_APACHE_INPUT_FORMAT().key(),
Boolean.parseBoolean(DataSourceWriteOptions.HIVE_USE_PRE_APACHE_INPUT_FORMAT().defaultValue()));
- hiveSyncConfig.databaseName = props.getString(DataSourceWriteOptions.HIVE_DATABASE().key(),
- DataSourceWriteOptions.HIVE_DATABASE().defaultValue());
- hiveSyncConfig.tableName = props.getString(DataSourceWriteOptions.HIVE_TABLE().key());
- hiveSyncConfig.baseFileFormat = baseFileFormat;
- hiveSyncConfig.hiveUser =
- props.getString(DataSourceWriteOptions.HIVE_USER().key(), DataSourceWriteOptions.HIVE_USER().defaultValue());
- hiveSyncConfig.hivePass =
- props.getString(DataSourceWriteOptions.HIVE_PASS().key(), DataSourceWriteOptions.HIVE_PASS().defaultValue());
- hiveSyncConfig.jdbcUrl =
- props.getString(DataSourceWriteOptions.HIVE_URL().key(), DataSourceWriteOptions.HIVE_URL().defaultValue());
- hiveSyncConfig.metastoreUris =
- props.getString(DataSourceWriteOptions.METASTORE_URIS().key(), DataSourceWriteOptions.METASTORE_URIS().defaultValue());
- hiveSyncConfig.partitionFields =
- props.getStringList(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), ",", new ArrayList<>());
- hiveSyncConfig.partitionValueExtractorClass =
- props.getString(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
+ hiveSyncConfig.hoodieSyncConfigParams.databaseName = props.getString(DataSourceWriteOptions.HIVE_DATABASE().key(),
+ DataSourceWriteOptions.HIVE_DATABASE().defaultValue());
+ hiveSyncConfig.hoodieSyncConfigParams.tableName = props.getString(DataSourceWriteOptions.HIVE_TABLE().key());
+ hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat = baseFileFormat;
+ hiveSyncConfig.hiveSyncConfigParams.hiveUser = props.getString(DataSourceWriteOptions.HIVE_USER().key(), DataSourceWriteOptions.HIVE_USER().defaultValue());
+ hiveSyncConfig.hiveSyncConfigParams.hivePass = props.getString(DataSourceWriteOptions.HIVE_PASS().key(), DataSourceWriteOptions.HIVE_PASS().defaultValue());
+ hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = props.getString(DataSourceWriteOptions.HIVE_URL().key(), DataSourceWriteOptions.HIVE_URL().defaultValue());
+ hiveSyncConfig.hiveSyncConfigParams.metastoreUris = props.getString(DataSourceWriteOptions.METASTORE_URIS().key(), DataSourceWriteOptions.METASTORE_URIS().defaultValue());
+ hiveSyncConfig.hoodieSyncConfigParams.partitionFields = props.getStringList(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), ",", new ArrayList<>());
+ hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass = props.getString(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
SlashEncodedDayPartitionValueExtractor.class.getName());
- hiveSyncConfig.useJdbc = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_USE_JDBC().key(),
- DataSourceWriteOptions.HIVE_USE_JDBC().defaultValue()));
+ hiveSyncConfig.hiveSyncConfigParams.useJdbc = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_USE_JDBC().key(),
+ DataSourceWriteOptions.HIVE_USE_JDBC().defaultValue()));
if (props.containsKey(DataSourceWriteOptions.HIVE_SYNC_MODE().key())) {
- hiveSyncConfig.syncMode = props.getString(DataSourceWriteOptions.HIVE_SYNC_MODE().key());
+ hiveSyncConfig.hiveSyncConfigParams.syncMode = props.getString(DataSourceWriteOptions.HIVE_SYNC_MODE().key());
}
- hiveSyncConfig.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().key(),
- DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().defaultValue()));
- hiveSyncConfig.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(),
- DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().defaultValue()));
- hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().key(),
- DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().defaultValue()));
- hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().key(),
- DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().defaultValue()));
- hiveSyncConfig.isConditionalSync = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().key(),
- DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().defaultValue()));
- hiveSyncConfig.bucketSpec = props.getBoolean(DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().key(),
- DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue())
- ? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
+ hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().key(),
+ DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().defaultValue()));
+ hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(),
+ DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().defaultValue()));
+ hiveSyncConfig.hiveSyncConfigParams.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().key(),
+ DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().defaultValue()));
+ hiveSyncConfig.hiveSyncConfigParams.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().key(),
+ DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().defaultValue()));
+ hiveSyncConfig.hoodieSyncConfigParams.isConditionalSync = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().key(),
+ DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().defaultValue()));
+ hiveSyncConfig.hiveSyncConfigParams.bucketSpec = props.getBoolean(DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().key(),
+ DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue())
+ ? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())) : null;
if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION())) {
- hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION());
+ hiveSyncConfig.hoodieSyncConfigParams.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION());
}
- hiveSyncConfig.syncComment = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT().key(),
+ hiveSyncConfig.hiveSyncConfigParams.syncComment = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT().key(),
DataSourceWriteOptions.HIVE_SYNC_COMMENT().defaultValue()));
return hiveSyncConfig;
}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 3f67d5017f..cb2057a995 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -77,12 +77,12 @@ trait ProvidesHoodieConfig extends Logging {
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
- HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
- HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
- HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
+ HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+ HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.databaseName,
+ HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.tableName,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
- HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
- HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass,
+ HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
)
@@ -195,11 +195,11 @@ trait ProvidesHoodieConfig extends Logging {
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
- HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
- HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
- HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
- HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
- HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
+ HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+ HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.databaseName,
+ HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.tableName,
+ HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"),
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
@@ -232,12 +232,12 @@ trait ProvidesHoodieConfig extends Logging {
PARTITIONPATH_FIELD.key -> partitionFields,
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
- HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
- HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
- HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
- HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+ HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+ HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.databaseName,
+ HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.tableName,
+ HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
- HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass
)
.filter { case (_, v) => v != null }
}
@@ -274,8 +274,8 @@ trait ProvidesHoodieConfig extends Logging {
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
- HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
- HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+ HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+ HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
)
@@ -290,32 +290,32 @@ trait ProvidesHoodieConfig extends Logging {
def buildHiveSyncConfig(props: TypedProperties, hoodieCatalogTable: HoodieCatalogTable): HiveSyncConfig = {
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig
- hiveSyncConfig.basePath = hoodieCatalogTable.tableLocation
- hiveSyncConfig.baseFileFormat = hoodieCatalogTable.baseFileFormat
- hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key, HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.defaultValue.toBoolean)
- hiveSyncConfig.databaseName = hoodieCatalogTable.table.identifier.database.getOrElse("default")
+ hiveSyncConfig.hoodieSyncConfigParams.basePath = hoodieCatalogTable.tableLocation
+ hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat = hoodieCatalogTable.baseFileFormat
+ hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = props.getBoolean(HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key, HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.defaultValue.toBoolean)
+ hiveSyncConfig.hoodieSyncConfigParams.databaseName = hoodieCatalogTable.table.identifier.database.getOrElse("default")
if (props.containsKey(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)) {
- hiveSyncConfig.tableName = props.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)
+ hiveSyncConfig.hoodieSyncConfigParams.tableName = props.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)
} else {
- hiveSyncConfig.tableName = hoodieCatalogTable.table.identifier.table
+ hiveSyncConfig.hoodieSyncConfigParams.tableName = hoodieCatalogTable.table.identifier.table
}
- hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name())
- hiveSyncConfig.hiveUser = props.getString(HiveSyncConfig.HIVE_USER.key, HiveSyncConfig.HIVE_USER.defaultValue)
- hiveSyncConfig.hivePass = props.getString(HiveSyncConfig.HIVE_PASS.key, HiveSyncConfig.HIVE_PASS.defaultValue)
- hiveSyncConfig.jdbcUrl = props.getString(HiveSyncConfig.HIVE_URL.key, HiveSyncConfig.HIVE_URL.defaultValue)
- hiveSyncConfig.metastoreUris = props.getString(HiveSyncConfig.METASTORE_URIS.key, HiveSyncConfig.METASTORE_URIS.defaultValue)
- hiveSyncConfig.partitionFields = props.getStringList(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key, ",", new util.ArrayList[String])
- hiveSyncConfig.partitionValueExtractorClass = props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName)
- if (props.containsKey(HiveSyncConfig.HIVE_SYNC_MODE.key)) hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key)
- hiveSyncConfig.autoCreateDatabase = props.getString(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key, HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.defaultValue).toBoolean
- hiveSyncConfig.ignoreExceptions = props.getString(HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.key, HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.defaultValue).toBoolean
- hiveSyncConfig.skipROSuffix = props.getString(HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key, HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.defaultValue).toBoolean
- hiveSyncConfig.supportTimestamp = props.getString(HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key, "true").toBoolean
- hiveSyncConfig.isConditionalSync = props.getString(HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.key, HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.defaultValue).toBoolean
- hiveSyncConfig.bucketSpec = if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key, HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue)) HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key), props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key))
+ hiveSyncConfig.hiveSyncConfigParams.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name())
+ hiveSyncConfig.hiveSyncConfigParams.hiveUser = props.getString(HiveSyncConfig.HIVE_USER.key, HiveSyncConfig.HIVE_USER.defaultValue)
+ hiveSyncConfig.hiveSyncConfigParams.hivePass = props.getString(HiveSyncConfig.HIVE_PASS.key, HiveSyncConfig.HIVE_PASS.defaultValue)
+ hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = props.getString(HiveSyncConfig.HIVE_URL.key, HiveSyncConfig.HIVE_URL.defaultValue)
+ hiveSyncConfig.hiveSyncConfigParams.metastoreUris = props.getString(HiveSyncConfig.METASTORE_URIS.key, HiveSyncConfig.METASTORE_URIS.defaultValue)
+ hiveSyncConfig.hoodieSyncConfigParams.partitionFields = props.getStringList(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key, ",", new util.ArrayList[String])
+ hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass = props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName)
+ if (props.containsKey(HiveSyncConfig.HIVE_SYNC_MODE.key)) hiveSyncConfig.hiveSyncConfigParams.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key)
+ hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase = props.getString(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key, HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.defaultValue).toBoolean
+ hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions = props.getString(HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.key, HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.defaultValue).toBoolean
+ hiveSyncConfig.hiveSyncConfigParams.skipROSuffix = props.getString(HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key, HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.defaultValue).toBoolean
+ hiveSyncConfig.hiveSyncConfigParams.supportTimestamp = props.getString(HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key, "true").toBoolean
+ hiveSyncConfig.hoodieSyncConfigParams.isConditionalSync = props.getString(HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.key, HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.defaultValue).toBoolean
+ hiveSyncConfig.hiveSyncConfigParams.bucketSpec = if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key, HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue)) HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key), props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key))
else null
- if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION)) hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION)
- hiveSyncConfig.syncComment = props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT.key, DataSourceWriteOptions.HIVE_SYNC_COMMENT.defaultValue).toBoolean
+ if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION)) hiveSyncConfig.hoodieSyncConfigParams.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION)
+ hiveSyncConfig.hiveSyncConfigParams.syncComment = props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT.key, DataSourceWriteOptions.HIVE_SYNC_COMMENT.defaultValue).toBoolean
hiveSyncConfig
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 636599ce0c..296b359e90 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -467,13 +467,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
+ HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
- HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
- HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+ HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
- HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass,
HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), // set the default parallelism to 200 for sql
HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index af5bbe7717..0a1f08d048 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -261,14 +261,14 @@ public class TestDataSourceUtils {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, config.getBasePath(), PARQUET.name());
if (useSyncMode) {
- assertFalse(hiveSyncConfig.useJdbc);
- assertEquals(HMS.name(), hiveSyncConfig.syncMode);
+ assertFalse(hiveSyncConfig.hiveSyncConfigParams.useJdbc);
+ assertEquals(HMS.name(), hiveSyncConfig.hiveSyncConfigParams.syncMode);
} else {
- assertTrue(hiveSyncConfig.useJdbc);
- assertNull(hiveSyncConfig.syncMode);
+ assertTrue(hiveSyncConfig.hiveSyncConfigParams.useJdbc);
+ assertNull(hiveSyncConfig.hiveSyncConfigParams.syncMode);
}
- assertEquals(HIVE_DATABASE, hiveSyncConfig.databaseName);
- assertEquals(HIVE_TABLE, hiveSyncConfig.tableName);
+ assertEquals(HIVE_DATABASE, hiveSyncConfig.hoodieSyncConfigParams.databaseName);
+ assertEquals(HIVE_TABLE, hiveSyncConfig.hoodieSyncConfigParams.tableName);
}
private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) {
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java
index 84316ddb11..b8dec82b7b 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java
@@ -24,26 +24,29 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hive.PartitionValueExtractor;
import org.apache.hudi.hive.SchemaDifference;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.sync.common.operation.CatalogSync;
+import org.apache.hudi.sync.common.operation.PartitionsSync;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public abstract class AbstractAdbSyncHoodieClient extends AbstractSyncHoodieClient {
+public abstract class AbstractAdbSyncHoodieClient extends HoodieSyncClient implements PartitionsSync, CatalogSync, TblPropertiesSync {
protected AdbSyncConfig adbSyncConfig;
protected PartitionValueExtractor partitionValueExtractor;
protected HoodieTimeline activeTimeline;
public AbstractAdbSyncHoodieClient(AdbSyncConfig syncConfig, FileSystem fs) {
- super(syncConfig.basePath, syncConfig.assumeDatePartitioning,
- syncConfig.useFileListingFromMetadata, false, fs);
+ super(syncConfig.hoodieSyncConfigParams.basePath, syncConfig.hoodieSyncConfigParams.assumeDatePartitioning,
+ syncConfig.hoodieSyncConfigParams.useFileListingFromMetadata, false, fs);
this.adbSyncConfig = syncConfig;
- final String clazz = adbSyncConfig.partitionValueExtractorClass;
+ final String clazz = adbSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass;
try {
this.partitionValueExtractor = (PartitionValueExtractor) Class.forName(clazz).newInstance();
} catch (Exception e) {
@@ -64,13 +67,13 @@ public abstract class AbstractAdbSyncHoodieClient extends AbstractSyncHoodieClie
}
List<PartitionEvent> events = new ArrayList<>();
for (String storagePartition : partitionStoragePartitions) {
- Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, storagePartition);
+ Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, storagePartition);
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
- if (adbSyncConfig.useHiveStylePartitioning) {
+ if (adbSyncConfig.adbSyncConfigParams.useHiveStylePartitioning) {
String partition = String.join("/", storagePartitionValues);
- storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
+ storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, partition);
fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
}
if (!storagePartitionValues.isEmpty()) {
@@ -100,13 +103,13 @@ public abstract class AbstractAdbSyncHoodieClient extends AbstractSyncHoodieClie
public abstract void dropTable(String tableName);
protected String getDatabasePath() {
- String dbLocation = adbSyncConfig.dbLocation;
+ String dbLocation = adbSyncConfig.adbSyncConfigParams.dbLocation;
Path dbLocationPath;
if (StringUtils.isNullOrEmpty(dbLocation)) {
- if (new Path(adbSyncConfig.basePath).isRoot()) {
- dbLocationPath = new Path(adbSyncConfig.basePath);
+ if (new Path(adbSyncConfig.hoodieSyncConfigParams.basePath).isRoot()) {
+ dbLocationPath = new Path(adbSyncConfig.hoodieSyncConfigParams.basePath);
} else {
- dbLocationPath = new Path(adbSyncConfig.basePath).getParent();
+ dbLocationPath = new Path(adbSyncConfig.hoodieSyncConfigParams.basePath).getParent();
}
} else {
dbLocationPath = new Path(dbLocation);
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
index ae2e7024e5..a9a1c847d5 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
@@ -18,64 +18,22 @@
package org.apache.hudi.sync.adb;
+import com.beust.jcommander.ParametersDelegate;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import com.beust.jcommander.Parameter;
+import java.io.Serializable;
+
/**
* Configs needed to sync data into Alibaba Cloud AnalyticDB(ADB).
*/
public class AdbSyncConfig extends HoodieSyncConfig {
- @Parameter(names = {"--user"}, description = "Adb username", required = true)
- public String adbUser;
-
- @Parameter(names = {"--pass"}, description = "Adb password", required = true)
- public String adbPass;
-
- @Parameter(names = {"--jdbc-url"}, description = "Adb jdbc connect url", required = true)
- public String jdbcUrl;
-
- @Parameter(names = {"--skip-ro-suffix"}, description = "Whether skip the `_ro` suffix for read optimized table when syncing")
- public Boolean skipROSuffix;
-
- @Parameter(names = {"--skip-rt-sync"}, description = "Whether skip the rt table when syncing")
- public Boolean skipRTSync;
-
- @Parameter(names = {"--hive-style-partitioning"}, description = "Whether use hive style partitioning, true if like the following style: field1=value1/field2=value2")
- public Boolean useHiveStylePartitioning;
-
- @Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type")
- public Boolean supportTimestamp;
-
- @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table")
- public Boolean syncAsSparkDataSourceTable;
-
- @Parameter(names = {"--table-properties"}, description = "Table properties, to support read hoodie table as datasource table", required = true)
- public String tableProperties;
-
- @Parameter(names = {"--serde-properties"}, description = "Serde properties, to support read hoodie table as datasource table", required = true)
- public String serdeProperties;
-
- @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore")
- public int sparkSchemaLengthThreshold;
-
- @Parameter(names = {"--db-location"}, description = "Database location")
- public String dbLocation;
-
- @Parameter(names = {"--auto-create-database"}, description = "Whether auto create adb database")
- public Boolean autoCreateDatabase = true;
-
- @Parameter(names = {"--skip-last-commit-time-sync"}, description = "Whether skip last commit time syncing")
- public Boolean skipLastCommitTimeSync = false;
-
- @Parameter(names = {"--drop-table-before-creation"}, description = "Whether drop table before creation")
- public Boolean dropTableBeforeCreation = false;
-
- @Parameter(names = {"--help", "-h"}, help = true)
- public Boolean help = false;
+ public final AdbSyncConfigParams adbSyncConfigParams = new AdbSyncConfigParams();
public static final ConfigProperty<String> ADB_SYNC_USER = ConfigProperty
.key("hoodie.datasource.adb.sync.username")
@@ -159,48 +117,48 @@ public class AdbSyncConfig extends HoodieSyncConfig {
public AdbSyncConfig(TypedProperties props) {
super(props);
- adbUser = getString(ADB_SYNC_USER);
- adbPass = getString(ADB_SYNC_PASS);
- jdbcUrl = getString(ADB_SYNC_JDBC_URL);
- skipROSuffix = getBooleanOrDefault(ADB_SYNC_SKIP_RO_SUFFIX);
- skipRTSync = getBooleanOrDefault(ADB_SYNC_SKIP_RT_SYNC);
- useHiveStylePartitioning = getBooleanOrDefault(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING);
- supportTimestamp = getBooleanOrDefault(ADB_SYNC_SUPPORT_TIMESTAMP);
- syncAsSparkDataSourceTable = getBooleanOrDefault(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE);
- tableProperties = getString(ADB_SYNC_TABLE_PROPERTIES);
- serdeProperties = getString(ADB_SYNC_SERDE_PROPERTIES);
- sparkSchemaLengthThreshold = getIntOrDefault(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
- dbLocation = getString(ADB_SYNC_DB_LOCATION);
- autoCreateDatabase = getBooleanOrDefault(ADB_SYNC_AUTO_CREATE_DATABASE);
- skipLastCommitTimeSync = getBooleanOrDefault(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC);
- dropTableBeforeCreation = getBooleanOrDefault(ADB_SYNC_DROP_TABLE_BEFORE_CREATION);
+ adbSyncConfigParams.hiveSyncConfigParams.hiveUser = getString(ADB_SYNC_USER);
+ adbSyncConfigParams.hiveSyncConfigParams.hivePass = getString(ADB_SYNC_PASS);
+ adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl = getString(ADB_SYNC_JDBC_URL);
+ adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix = getBooleanOrDefault(ADB_SYNC_SKIP_RO_SUFFIX);
+ adbSyncConfigParams.skipRTSync = getBooleanOrDefault(ADB_SYNC_SKIP_RT_SYNC);
+ adbSyncConfigParams.useHiveStylePartitioning = getBooleanOrDefault(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING);
+ adbSyncConfigParams.supportTimestamp = getBooleanOrDefault(ADB_SYNC_SUPPORT_TIMESTAMP);
+ adbSyncConfigParams.syncAsSparkDataSourceTable = getBooleanOrDefault(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE);
+ adbSyncConfigParams.tableProperties = getString(ADB_SYNC_TABLE_PROPERTIES);
+ adbSyncConfigParams.serdeProperties = getString(ADB_SYNC_SERDE_PROPERTIES);
+ adbSyncConfigParams.sparkSchemaLengthThreshold = getIntOrDefault(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
+ adbSyncConfigParams.dbLocation = getString(ADB_SYNC_DB_LOCATION);
+ adbSyncConfigParams.autoCreateDatabase = getBooleanOrDefault(ADB_SYNC_AUTO_CREATE_DATABASE);
+ adbSyncConfigParams.skipLastCommitTimeSync = getBooleanOrDefault(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC);
+ adbSyncConfigParams.dropTableBeforeCreation = getBooleanOrDefault(ADB_SYNC_DROP_TABLE_BEFORE_CREATION);
}
public static TypedProperties toProps(AdbSyncConfig cfg) {
TypedProperties properties = new TypedProperties();
- properties.put(META_SYNC_DATABASE_NAME.key(), cfg.databaseName);
- properties.put(META_SYNC_TABLE_NAME.key(), cfg.tableName);
- properties.put(ADB_SYNC_USER.key(), cfg.adbUser);
- properties.put(ADB_SYNC_PASS.key(), cfg.adbPass);
- properties.put(ADB_SYNC_JDBC_URL.key(), cfg.jdbcUrl);
- properties.put(META_SYNC_BASE_PATH.key(), cfg.basePath);
- properties.put(META_SYNC_PARTITION_FIELDS.key(), String.join(",", cfg.partitionFields));
- properties.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), cfg.partitionValueExtractorClass);
- properties.put(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(cfg.assumeDatePartitioning));
- properties.put(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(cfg.skipROSuffix));
- properties.put(ADB_SYNC_SKIP_RT_SYNC.key(), String.valueOf(cfg.skipRTSync));
- properties.put(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING.key(), String.valueOf(cfg.useHiveStylePartitioning));
- properties.put(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(cfg.useFileListingFromMetadata));
- properties.put(ADB_SYNC_SUPPORT_TIMESTAMP.key(), String.valueOf(cfg.supportTimestamp));
- properties.put(ADB_SYNC_TABLE_PROPERTIES.key(), cfg.tableProperties);
- properties.put(ADB_SYNC_SERDE_PROPERTIES.key(), cfg.serdeProperties);
- properties.put(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE.key(), String.valueOf(cfg.syncAsSparkDataSourceTable));
- properties.put(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), String.valueOf(cfg.sparkSchemaLengthThreshold));
- properties.put(META_SYNC_SPARK_VERSION.key(), cfg.sparkVersion);
- properties.put(ADB_SYNC_DB_LOCATION.key(), cfg.dbLocation);
- properties.put(ADB_SYNC_AUTO_CREATE_DATABASE.key(), String.valueOf(cfg.autoCreateDatabase));
- properties.put(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC.key(), String.valueOf(cfg.skipLastCommitTimeSync));
- properties.put(ADB_SYNC_DROP_TABLE_BEFORE_CREATION.key(), String.valueOf(cfg.dropTableBeforeCreation));
+ properties.put(META_SYNC_DATABASE_NAME.key(), cfg.hoodieSyncConfigParams.databaseName);
+ properties.put(META_SYNC_TABLE_NAME.key(), cfg.hoodieSyncConfigParams.tableName);
+ properties.put(ADB_SYNC_USER.key(), cfg.adbSyncConfigParams.hiveSyncConfigParams.hiveUser);
+ properties.put(ADB_SYNC_PASS.key(), cfg.adbSyncConfigParams.hiveSyncConfigParams.hivePass);
+ properties.put(ADB_SYNC_JDBC_URL.key(), cfg.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl);
+ properties.put(META_SYNC_BASE_PATH.key(), cfg.hoodieSyncConfigParams.basePath);
+ properties.put(META_SYNC_PARTITION_FIELDS.key(), String.join(",", cfg.hoodieSyncConfigParams.partitionFields));
+ properties.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), cfg.hoodieSyncConfigParams.partitionValueExtractorClass);
+ properties.put(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(cfg.hoodieSyncConfigParams.assumeDatePartitioning));
+ properties.put(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(cfg.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix));
+ properties.put(ADB_SYNC_SKIP_RT_SYNC.key(), String.valueOf(cfg.adbSyncConfigParams.skipRTSync));
+ properties.put(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING.key(), String.valueOf(cfg.adbSyncConfigParams.useHiveStylePartitioning));
+ properties.put(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(cfg.hoodieSyncConfigParams.useFileListingFromMetadata));
+ properties.put(ADB_SYNC_SUPPORT_TIMESTAMP.key(), String.valueOf(cfg.adbSyncConfigParams.supportTimestamp));
+ properties.put(ADB_SYNC_TABLE_PROPERTIES.key(), cfg.adbSyncConfigParams.tableProperties);
+ properties.put(ADB_SYNC_SERDE_PROPERTIES.key(), cfg.adbSyncConfigParams.serdeProperties);
+ properties.put(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE.key(), String.valueOf(cfg.adbSyncConfigParams.syncAsSparkDataSourceTable));
+ properties.put(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), String.valueOf(cfg.adbSyncConfigParams.sparkSchemaLengthThreshold));
+ properties.put(META_SYNC_SPARK_VERSION.key(), cfg.hoodieSyncConfigParams.sparkVersion);
+ properties.put(ADB_SYNC_DB_LOCATION.key(), cfg.adbSyncConfigParams.dbLocation);
+ properties.put(ADB_SYNC_AUTO_CREATE_DATABASE.key(), String.valueOf(cfg.adbSyncConfigParams.autoCreateDatabase));
+ properties.put(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC.key(), String.valueOf(cfg.adbSyncConfigParams.skipLastCommitTimeSync));
+ properties.put(ADB_SYNC_DROP_TABLE_BEFORE_CREATION.key(), String.valueOf(cfg.adbSyncConfigParams.dropTableBeforeCreation));
return properties;
}
@@ -208,33 +166,66 @@ public class AdbSyncConfig extends HoodieSyncConfig {
@Override
public String toString() {
return "AdbSyncConfig{"
- + "adbUser='" + adbUser + '\''
- + ", adbPass='" + adbPass + '\''
- + ", jdbcUrl='" + jdbcUrl + '\''
- + ", skipROSuffix=" + skipROSuffix
- + ", skipRTSync=" + skipRTSync
- + ", useHiveStylePartitioning=" + useHiveStylePartitioning
- + ", supportTimestamp=" + supportTimestamp
- + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
- + ", tableProperties='" + tableProperties + '\''
- + ", serdeProperties='" + serdeProperties + '\''
- + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
- + ", dbLocation='" + dbLocation + '\''
- + ", autoCreateDatabase=" + autoCreateDatabase
- + ", skipLastCommitTimeSync=" + skipLastCommitTimeSync
- + ", dropTableBeforeCreation=" + dropTableBeforeCreation
- + ", help=" + help
- + ", databaseName='" + databaseName + '\''
- + ", tableName='" + tableName + '\''
- + ", basePath='" + basePath + '\''
- + ", baseFileFormat='" + baseFileFormat + '\''
- + ", partitionFields=" + partitionFields
- + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
- + ", assumeDatePartitioning=" + assumeDatePartitioning
- + ", decodePartition=" + decodePartition
- + ", useFileListingFromMetadata=" + useFileListingFromMetadata
- + ", isConditionalSync=" + isConditionalSync
- + ", sparkVersion='" + sparkVersion + '\''
+ + "adbUser='" + adbSyncConfigParams.hiveSyncConfigParams.hiveUser + '\''
+ + ", adbPass='" + adbSyncConfigParams.hiveSyncConfigParams.hivePass + '\''
+ + ", jdbcUrl='" + adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl + '\''
+ + ", skipROSuffix=" + adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix
+ + ", skipRTSync=" + adbSyncConfigParams.skipRTSync
+ + ", useHiveStylePartitioning=" + adbSyncConfigParams.useHiveStylePartitioning
+ + ", supportTimestamp=" + adbSyncConfigParams.supportTimestamp
+ + ", syncAsSparkDataSourceTable=" + adbSyncConfigParams.syncAsSparkDataSourceTable
+ + ", tableProperties='" + adbSyncConfigParams.tableProperties + '\''
+ + ", serdeProperties='" + adbSyncConfigParams.serdeProperties + '\''
+ + ", sparkSchemaLengthThreshold=" + adbSyncConfigParams.sparkSchemaLengthThreshold
+ + ", dbLocation='" + adbSyncConfigParams.dbLocation + '\''
+ + ", autoCreateDatabase=" + adbSyncConfigParams.autoCreateDatabase
+ + ", skipLastCommitTimeSync=" + adbSyncConfigParams.skipLastCommitTimeSync
+ + ", dropTableBeforeCreation=" + adbSyncConfigParams.dropTableBeforeCreation
+ + ", help=" + adbSyncConfigParams.help
+ + ", databaseName='" + hoodieSyncConfigParams.databaseName + '\''
+ + ", tableName='" + hoodieSyncConfigParams.tableName + '\''
+ + ", basePath='" + hoodieSyncConfigParams.basePath + '\''
+ + ", baseFileFormat='" + hoodieSyncConfigParams.baseFileFormat + '\''
+ + ", partitionFields=" + hoodieSyncConfigParams.partitionFields
+ + ", partitionValueExtractorClass='" + hoodieSyncConfigParams.partitionValueExtractorClass + '\''
+ + ", assumeDatePartitioning=" + hoodieSyncConfigParams.assumeDatePartitioning
+ + ", decodePartition=" + hoodieSyncConfigParams.decodePartition
+ + ", useFileListingFromMetadata=" + hoodieSyncConfigParams.useFileListingFromMetadata
+ + ", isConditionalSync=" + hoodieSyncConfigParams.isConditionalSync
+ + ", sparkVersion='" + hoodieSyncConfigParams.sparkVersion + '\''
+ '}';
}
+
+ public static class AdbSyncConfigParams implements Serializable {
+ @ParametersDelegate()
+ public HiveSyncConfig.HiveSyncConfigParams hiveSyncConfigParams = new HiveSyncConfig.HiveSyncConfigParams();
+
+ @Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type")
+ public Boolean supportTimestamp;
+ @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table")
+ public Boolean syncAsSparkDataSourceTable;
+ @Parameter(names = {"--table-properties"}, description = "Table properties, to support read hoodie table as datasource table", required = true)
+ public String tableProperties;
+ @Parameter(names = {"--serde-properties"}, description = "Serde properties, to support read hoodie table as datasource table", required = true)
+ public String serdeProperties;
+ @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore")
+ public int sparkSchemaLengthThreshold;
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+ @Parameter(names = {"--hive-style-partitioning"}, description = "Whether use hive style partitioning, true if like the following style: field1=value1/field2=value2")
+ public Boolean useHiveStylePartitioning;
+ @Parameter(names = {"--skip-rt-sync"}, description = "Whether skip the rt table when syncing")
+ public Boolean skipRTSync;
+ @Parameter(names = {"--db-location"}, description = "Database location")
+ public String dbLocation;
+ @Parameter(names = {"--auto-create-database"}, description = "Whether auto create adb database")
+ public Boolean autoCreateDatabase = true;
+ @Parameter(names = {"--skip-last-commit-time-sync"}, description = "Whether skip last commit time syncing")
+ public Boolean skipLastCommitTimeSync = false;
+ @Parameter(names = {"--drop-table-before-creation"}, description = "Whether drop table before creation")
+ public Boolean dropTableBeforeCreation = false;
+
+ public AdbSyncConfigParams() {
+ }
+ }
}
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
index 8c2f9e2045..fb9911ace5 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
@@ -26,9 +26,9 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.SchemaDifference;
import org.apache.hudi.hive.util.HiveSchemaUtil;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent.PartitionEventType;
+import org.apache.hudi.sync.common.HoodieSyncTool;
import org.apache.hudi.sync.common.util.ConfigUtils;
import com.beust.jcommander.JCommander;
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ import java.util.stream.Collectors;
* incremental partitions will be synced as well.
*/
@SuppressWarnings("WeakerAccess")
-public class AdbSyncTool extends AbstractSyncTool {
+public class AdbSyncTool extends HoodieSyncTool {
private static final Logger LOG = LoggerFactory.getLogger(AdbSyncTool.class);
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
@@ -72,13 +73,13 @@ public class AdbSyncTool extends AbstractSyncTool {
this.hoodieAdbClient = getHoodieAdbClient(adbSyncConfig, fs);
switch (hoodieAdbClient.getTableType()) {
case COPY_ON_WRITE:
- this.snapshotTableName = adbSyncConfig.tableName;
+ this.snapshotTableName = adbSyncConfig.hoodieSyncConfigParams.tableName;
this.roTableTableName = Option.empty();
break;
case MERGE_ON_READ:
- this.snapshotTableName = adbSyncConfig.tableName + SUFFIX_SNAPSHOT_TABLE;
- this.roTableTableName = adbSyncConfig.skipROSuffix ? Option.of(adbSyncConfig.tableName)
- : Option.of(adbSyncConfig.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+ this.snapshotTableName = adbSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_SNAPSHOT_TABLE;
+ this.roTableTableName = adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix ? Option.of(adbSyncConfig.hoodieSyncConfigParams.tableName)
+ : Option.of(adbSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
throw new HoodieAdbSyncException("Unknown table type:" + hoodieAdbClient.getTableType()
@@ -101,7 +102,7 @@ public class AdbSyncTool extends AbstractSyncTool {
// Sync a ro table for MOR table
syncHoodieTable(roTableTableName.get(), false, true);
// Sync a rt table for MOR table
- if (!adbSyncConfig.skipRTSync) {
+ if (!adbSyncConfig.adbSyncConfigParams.skipRTSync) {
syncHoodieTable(snapshotTableName, true, false);
}
break;
@@ -110,7 +111,7 @@ public class AdbSyncTool extends AbstractSyncTool {
+ ", basePath:" + hoodieAdbClient.getBasePath());
}
} catch (Exception re) {
- throw new HoodieAdbSyncException("Sync hoodie table to ADB failed, tableName:" + adbSyncConfig.tableName, re);
+ throw new HoodieAdbSyncException("Sync hoodie table to ADB failed, tableName:" + adbSyncConfig.hoodieSyncConfigParams.tableName, re);
} finally {
hoodieAdbClient.close();
}
@@ -121,19 +122,19 @@ public class AdbSyncTool extends AbstractSyncTool {
LOG.info("Try to sync hoodie table, tableName:{}, path:{}, tableType:{}",
tableName, hoodieAdbClient.getBasePath(), hoodieAdbClient.getTableType());
- if (adbSyncConfig.autoCreateDatabase) {
+ if (adbSyncConfig.adbSyncConfigParams.autoCreateDatabase) {
try {
synchronized (AdbSyncTool.class) {
- if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) {
- hoodieAdbClient.createDatabase(adbSyncConfig.databaseName);
+ if (!hoodieAdbClient.databaseExists(adbSyncConfig.hoodieSyncConfigParams.databaseName)) {
+ hoodieAdbClient.createDatabase(adbSyncConfig.hoodieSyncConfigParams.databaseName);
}
}
} catch (Exception e) {
- throw new HoodieAdbSyncException("Failed to create database:" + adbSyncConfig.databaseName
+ throw new HoodieAdbSyncException("Failed to create database:" + adbSyncConfig.hoodieSyncConfigParams.databaseName
+ ", useRealtimeInputFormat = " + useRealtimeInputFormat, e);
}
- } else if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) {
- throw new HoodieAdbSyncException("ADB database does not exists:" + adbSyncConfig.databaseName);
+ } else if (!hoodieAdbClient.databaseExists(adbSyncConfig.hoodieSyncConfigParams.databaseName)) {
+ throw new HoodieAdbSyncException("ADB database does not exists:" + adbSyncConfig.hoodieSyncConfigParams.databaseName);
}
// Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
@@ -144,11 +145,11 @@ public class AdbSyncTool extends AbstractSyncTool {
if (hoodieAdbClient.isBootstrap()
&& hoodieAdbClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& !readAsOptimized) {
- adbSyncConfig.syncAsSparkDataSourceTable = false;
+ adbSyncConfig.adbSyncConfigParams.syncAsSparkDataSourceTable = false;
LOG.info("Disable sync as spark datasource table for mor rt table:{}", tableName);
}
- if (adbSyncConfig.dropTableBeforeCreation) {
+ if (adbSyncConfig.adbSyncConfigParams.dropTableBeforeCreation) {
LOG.info("Drop table before creation, tableName:{}", tableName);
hoodieAdbClient.dropTable(tableName);
}
@@ -171,7 +172,7 @@ public class AdbSyncTool extends AbstractSyncTool {
// Scan synced partitions
List<String> writtenPartitionsSince;
- if (adbSyncConfig.partitionFields.isEmpty()) {
+ if (adbSyncConfig.hoodieSyncConfigParams.partitionFields.isEmpty()) {
writtenPartitionsSince = new ArrayList<>();
} else {
writtenPartitionsSince = hoodieAdbClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
@@ -183,7 +184,7 @@ public class AdbSyncTool extends AbstractSyncTool {
// Update sync commit time
// whether to skip syncing commit time stored in tbl properties, since it is time consuming.
- if (!adbSyncConfig.skipLastCommitTimeSync) {
+ if (!adbSyncConfig.adbSyncConfigParams.skipLastCommitTimeSync) {
hoodieAdbClient.updateLastCommitTimeSynced(tableName);
}
LOG.info("Sync complete for table:{}", tableName);
@@ -202,12 +203,12 @@ public class AdbSyncTool extends AbstractSyncTool {
private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
boolean readAsOptimized, MessageType schema) throws Exception {
// Append spark table properties & serde properties
- Map<String, String> tableProperties = ConfigUtils.toMap(adbSyncConfig.tableProperties);
- Map<String, String> serdeProperties = ConfigUtils.toMap(adbSyncConfig.serdeProperties);
- if (adbSyncConfig.syncAsSparkDataSourceTable) {
- Map<String, String> sparkTableProperties = getSparkTableProperties(adbSyncConfig.partitionFields,
- adbSyncConfig.sparkVersion, adbSyncConfig.sparkSchemaLengthThreshold, schema);
- Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, adbSyncConfig.basePath);
+ Map<String, String> tableProperties = ConfigUtils.toMap(adbSyncConfig.adbSyncConfigParams.tableProperties);
+ Map<String, String> serdeProperties = ConfigUtils.toMap(adbSyncConfig.adbSyncConfigParams.serdeProperties);
+ if (adbSyncConfig.adbSyncConfigParams.syncAsSparkDataSourceTable) {
+ Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(adbSyncConfig.hoodieSyncConfigParams.partitionFields,
+ adbSyncConfig.hoodieSyncConfigParams.sparkVersion, adbSyncConfig.adbSyncConfigParams.sparkSchemaLengthThreshold, schema);
+ Map<String, String> sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, adbSyncConfig.hoodieSyncConfigParams.basePath);
tableProperties.putAll(sparkTableProperties);
serdeProperties.putAll(sparkSerdeProperties);
LOG.info("Sync as spark datasource table, tableName:{}, tableExists:{}, tableProperties:{}, sederProperties:{}",
@@ -227,8 +228,8 @@ public class AdbSyncTool extends AbstractSyncTool {
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieAdbClient.getTableSchema(tableName);
- SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, adbSyncConfig.partitionFields,
- adbSyncConfig.supportTimestamp);
+ SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, adbSyncConfig.hoodieSyncConfigParams.partitionFields,
+ adbSyncConfig.adbSyncConfigParams.supportTimestamp);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for table:{}", tableName);
hoodieAdbClient.updateTableDefinition(tableName, schemaDiff);
@@ -244,7 +245,7 @@ public class AdbSyncTool extends AbstractSyncTool {
*/
private void syncPartitions(String tableName, List<String> writtenPartitionsSince) {
try {
- if (adbSyncConfig.partitionFields.isEmpty()) {
+ if (adbSyncConfig.hoodieSyncConfigParams.partitionFields.isEmpty()) {
LOG.info("Not a partitioned table.");
return;
}
@@ -271,13 +272,13 @@ public class AdbSyncTool extends AbstractSyncTool {
// parse the params
final AdbSyncConfig cfg = new AdbSyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
- if (cfg.help || args.length == 0) {
+ if (cfg.adbSyncConfigParams.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
Configuration hadoopConf = new Configuration();
- FileSystem fs = FSUtils.getFs(cfg.basePath, hadoopConf);
+ FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, hadoopConf);
new AdbSyncTool(AdbSyncConfig.toProps(cfg), hadoopConf, fs).syncHoodieTable();
}
}
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
index a347ba7011..81bd34ffa3 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
@@ -69,7 +69,7 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
public HoodieAdbJdbcClient(AdbSyncConfig syncConfig, FileSystem fs) {
super(syncConfig, fs);
createAdbConnection();
- LOG.info("Init adb jdbc client success, jdbcUrl:{}", syncConfig.jdbcUrl);
+ LOG.info("Init adb jdbc client success, jdbcUrl:{}", syncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl);
}
private void createAdbConnection() {
@@ -82,7 +82,8 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
}
try {
this.connection = DriverManager.getConnection(
- adbSyncConfig.jdbcUrl, adbSyncConfig.adbUser, adbSyncConfig.adbPass);
+ adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl,
+ adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hiveUser, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hivePass);
} catch (SQLException e) {
throw new HoodieException("Cannot create adb connection ", e);
}
@@ -106,7 +107,7 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
@Override
public void dropTable(String tableName) {
LOG.info("Dropping table:{}", tableName);
- String dropTable = "drop table if exists `" + adbSyncConfig.databaseName + "`.`" + tableName + "`";
+ String dropTable = "drop table if exists `" + adbSyncConfig.hoodieSyncConfigParams.databaseName + "`.`" + tableName + "`";
executeAdbSql(dropTable);
}
@@ -115,8 +116,8 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
ResultSet result = null;
try {
DatabaseMetaData databaseMetaData = connection.getMetaData();
- result = databaseMetaData.getColumns(adbSyncConfig.databaseName,
- adbSyncConfig.databaseName, tableName, null);
+ result = databaseMetaData.getColumns(adbSyncConfig.hoodieSyncConfigParams.databaseName,
+ adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, null);
while (result.next()) {
String columnName = result.getString(4);
String columnType = result.getString(6);
@@ -261,18 +262,8 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
}
@Override
- public Option<String> getLastReplicatedTime(String tableName) {
- throw new UnsupportedOperationException("Not support getLastReplicatedTime yet");
- }
-
- @Override
- public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
- throw new UnsupportedOperationException("Not support updateLastReplicatedTimeStamp yet");
- }
-
- @Override
- public void deleteLastReplicatedTimeStamp(String tableName) {
- throw new UnsupportedOperationException("Not support deleteLastReplicatedTimeStamp yet");
+ public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
+ throw new UnsupportedOperationException("Not support updateTableProperties yet");
}
@Override
@@ -304,7 +295,7 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
String str = resultSet.getString(1);
if (!StringUtils.isNullOrEmpty(str)) {
List<String> values = partitionValueExtractor.extractPartitionValuesInPath(str);
- Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, String.join("/", values));
+ Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, String.join("/", values));
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
partitions.put(values, fullStoragePartitionPath);
}
@@ -332,11 +323,11 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
private String constructAddPartitionsSql(String tableName, List<String> partitions) {
StringBuilder sqlBuilder = new StringBuilder("alter table `");
- sqlBuilder.append(adbSyncConfig.databaseName).append("`").append(".`")
+ sqlBuilder.append(adbSyncConfig.hoodieSyncConfigParams.databaseName).append("`").append(".`")
.append(tableName).append("`").append(" add if not exists ");
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
- Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
+ Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, partition);
String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
sqlBuilder.append(" partition (").append(partitionClause).append(") location '")
.append(fullPartitionPathStr).append("' ");
@@ -347,13 +338,13 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
private List<String> constructChangePartitionsSql(String tableName, List<String> partitions) {
List<String> changePartitions = new ArrayList<>();
- String useDatabase = "use `" + adbSyncConfig.databaseName + "`";
+ String useDatabase = "use `" + adbSyncConfig.hoodieSyncConfigParams.databaseName + "`";
changePartitions.add(useDatabase);
String alterTable = "alter table `" + tableName + "`";
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
- Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
+ Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, partition);
String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
String changePartition = alterTable + " add if not exists partition (" + partitionClause
+ ") location '" + fullPartitionPathStr + "'";
@@ -371,32 +362,32 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
*/
private String getPartitionClause(String partition) {
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
- ValidationUtils.checkArgument(adbSyncConfig.partitionFields.size() == partitionValues.size(),
- "Partition key parts " + adbSyncConfig.partitionFields
+ ValidationUtils.checkArgument(adbSyncConfig.hoodieSyncConfigParams.partitionFields.size() == partitionValues.size(),
+ "Partition key parts " + adbSyncConfig.hoodieSyncConfigParams.partitionFields
+ " does not match with partition values " + partitionValues + ". Check partition strategy. ");
List<String> partBuilder = new ArrayList<>();
- for (int i = 0; i < adbSyncConfig.partitionFields.size(); i++) {
- partBuilder.add(adbSyncConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
+ for (int i = 0; i < adbSyncConfig.hoodieSyncConfigParams.partitionFields.size(); i++) {
+ partBuilder.add(adbSyncConfig.hoodieSyncConfigParams.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
}
return String.join(",", partBuilder);
}
private String constructShowPartitionSql(String tableName) {
- return String.format("show partitions `%s`.`%s`", adbSyncConfig.databaseName, tableName);
+ return String.format("show partitions `%s`.`%s`", adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName);
}
private String constructShowCreateTableSql(String tableName) {
- return String.format("show create table `%s`.`%s`", adbSyncConfig.databaseName, tableName);
+ return String.format("show create table `%s`.`%s`", adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName);
}
private String constructShowLikeTableSql(String tableName) {
- return String.format("show tables from `%s` like '%s'", adbSyncConfig.databaseName, tableName);
+ return String.format("show tables from `%s` like '%s'", adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName);
}
private String constructCreateDatabaseSql(String rootPath) {
return String.format("create database if not exists `%s` with dbproperties(catalog = 'oss', location = '%s')",
- adbSyncConfig.databaseName, rootPath);
+ adbSyncConfig.hoodieSyncConfigParams.databaseName, rootPath);
}
private String constructShowCreateDatabaseSql(String databaseName) {
@@ -405,25 +396,25 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
private String constructUpdateTblPropertiesSql(String tableName, String lastCommitSynced) {
return String.format("alter table `%s`.`%s` set tblproperties('%s' = '%s')",
- adbSyncConfig.databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
+ adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
}
private String constructAddColumnSql(String tableName, String columnName, String columnType) {
return String.format("alter table `%s`.`%s` add columns(`%s` %s)",
- adbSyncConfig.databaseName, tableName, columnName, columnType);
+ adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, columnName, columnType);
}
private String constructChangeColumnSql(String tableName, String columnName, String columnType) {
return String.format("alter table `%s`.`%s` change `%s` `%s` %s",
- adbSyncConfig.databaseName, tableName, columnName, columnName, columnType);
+ adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, columnName, columnName, columnType);
}
private HiveSyncConfig getHiveSyncConfig() {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
- hiveSyncConfig.partitionFields = adbSyncConfig.partitionFields;
- hiveSyncConfig.databaseName = adbSyncConfig.databaseName;
- Path basePath = new Path(adbSyncConfig.basePath);
- hiveSyncConfig.basePath = generateAbsolutePathStr(basePath);
+ hiveSyncConfig.hoodieSyncConfigParams.partitionFields = adbSyncConfig.hoodieSyncConfigParams.partitionFields;
+ hiveSyncConfig.hoodieSyncConfigParams.databaseName = adbSyncConfig.hoodieSyncConfigParams.databaseName;
+ Path basePath = new Path(adbSyncConfig.hoodieSyncConfigParams.basePath);
+ hiveSyncConfig.hoodieSyncConfigParams.basePath = generateAbsolutePathStr(basePath);
return hiveSyncConfig;
}
diff --git a/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java b/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java
index f4eb8fc7fc..0a3d937496 100644
--- a/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java
+++ b/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java
@@ -30,36 +30,36 @@ public class TestAdbSyncConfig {
@Test
public void testCopy() {
AdbSyncConfig adbSyncConfig = new AdbSyncConfig();
- adbSyncConfig.partitionFields = Arrays.asList("a", "b");
- adbSyncConfig.basePath = "/tmp";
- adbSyncConfig.assumeDatePartitioning = true;
- adbSyncConfig.databaseName = "test";
- adbSyncConfig.tableName = "test";
- adbSyncConfig.adbUser = "adb";
- adbSyncConfig.adbPass = "adb";
- adbSyncConfig.jdbcUrl = "jdbc:mysql://localhost:3306";
- adbSyncConfig.skipROSuffix = false;
- adbSyncConfig.tableProperties = "spark.sql.sources.provider= 'hudi'\\n"
- + "spark.sql.sources.schema.numParts = '1'\\n "
- + "spark.sql.sources.schema.part.0 ='xx'\\n "
- + "spark.sql.sources.schema.numPartCols = '1'\\n"
- + "spark.sql.sources.schema.partCol.0 = 'dt'";
- adbSyncConfig.serdeProperties = "'path'='/tmp/test_db/tbl'";
- adbSyncConfig.dbLocation = "file://tmp/test_db";
+ adbSyncConfig.hoodieSyncConfigParams.partitionFields = Arrays.asList("a", "b");
+ adbSyncConfig.hoodieSyncConfigParams.basePath = "/tmp";
+ adbSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning = true;
+ adbSyncConfig.hoodieSyncConfigParams.databaseName = "test";
+ adbSyncConfig.hoodieSyncConfigParams.tableName = "test";
+ adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hiveUser = "adb";
+ adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hivePass = "adb";
+ adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl = "jdbc:mysql://localhost:3306";
+ adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix = false;
+ adbSyncConfig.adbSyncConfigParams.tableProperties = "spark.sql.sources.provider= 'hudi'\\n"
+ + "spark.sql.sources.schema.numParts = '1'\\n "
+ + "spark.sql.sources.schema.part.0 ='xx'\\n "
+ + "spark.sql.sources.schema.numPartCols = '1'\\n"
+ + "spark.sql.sources.schema.partCol.0 = 'dt'";
+ adbSyncConfig.adbSyncConfigParams.serdeProperties = "'path'='/tmp/test_db/tbl'";
+ adbSyncConfig.adbSyncConfigParams.dbLocation = "file://tmp/test_db";
TypedProperties props = AdbSyncConfig.toProps(adbSyncConfig);
AdbSyncConfig copied = new AdbSyncConfig(props);
- assertEquals(copied.partitionFields, adbSyncConfig.partitionFields);
- assertEquals(copied.basePath, adbSyncConfig.basePath);
- assertEquals(copied.assumeDatePartitioning, adbSyncConfig.assumeDatePartitioning);
- assertEquals(copied.databaseName, adbSyncConfig.databaseName);
- assertEquals(copied.tableName, adbSyncConfig.tableName);
- assertEquals(copied.adbUser, adbSyncConfig.adbUser);
- assertEquals(copied.adbPass, adbSyncConfig.adbPass);
- assertEquals(copied.basePath, adbSyncConfig.basePath);
- assertEquals(copied.jdbcUrl, adbSyncConfig.jdbcUrl);
- assertEquals(copied.skipROSuffix, adbSyncConfig.skipROSuffix);
- assertEquals(copied.supportTimestamp, adbSyncConfig.supportTimestamp);
+ assertEquals(copied.hoodieSyncConfigParams.partitionFields, adbSyncConfig.hoodieSyncConfigParams.partitionFields);
+ assertEquals(copied.hoodieSyncConfigParams.basePath, adbSyncConfig.hoodieSyncConfigParams.basePath);
+ assertEquals(copied.hoodieSyncConfigParams.assumeDatePartitioning, adbSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning);
+ assertEquals(copied.hoodieSyncConfigParams.databaseName, adbSyncConfig.hoodieSyncConfigParams.databaseName);
+ assertEquals(copied.hoodieSyncConfigParams.tableName, adbSyncConfig.hoodieSyncConfigParams.tableName);
+ assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.hiveUser, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hiveUser);
+ assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.hivePass, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hivePass);
+ assertEquals(copied.hoodieSyncConfigParams.basePath, adbSyncConfig.hoodieSyncConfigParams.basePath);
+ assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl);
+ assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix);
+ assertEquals(copied.adbSyncConfigParams.supportTimestamp, adbSyncConfig.adbSyncConfigParams.supportTimestamp);
}
}
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index 68569822cc..e784d591d6 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -23,8 +23,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.HoodieSyncException;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
import com.linkedin.common.urn.DatasetUrn;
@@ -53,14 +54,13 @@ import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.parquet.schema.MessageType;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public class DataHubSyncClient extends AbstractSyncHoodieClient {
+public class DataHubSyncClient extends HoodieSyncClient implements TblPropertiesSync {
private final HoodieTimeline activeTimeline;
private final DataHubSyncConfig syncConfig;
@@ -68,34 +68,13 @@ public class DataHubSyncClient extends AbstractSyncHoodieClient {
private final DatasetUrn datasetUrn;
public DataHubSyncClient(DataHubSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
- super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, false, fs);
+ super(syncConfig.hoodieSyncConfigParams.basePath, syncConfig.hoodieSyncConfigParams.assumeDatePartitioning, syncConfig.hoodieSyncConfigParams.useFileListingFromMetadata, false, fs);
this.syncConfig = syncConfig;
this.hadoopConf = hadoopConf;
this.datasetUrn = syncConfig.datasetIdentifier.getDatasetUrn();
this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
}
- @Override
- public void createTable(String tableName,
- MessageType storageSchema,
- String inputFormatClass,
- String outputFormatClass,
- String serdeClass,
- Map<String, String> serdeProperties,
- Map<String, String> tableProperties) {
- throw new UnsupportedOperationException("Not supported: `createTable`");
- }
-
- @Override
- public boolean doesTableExist(String tableName) {
- return tableExists(tableName);
- }
-
- @Override
- public boolean tableExists(String tableName) {
- throw new UnsupportedOperationException("Not supported: `tableExists`");
- }
-
@Override
public Option<String> getLastCommitTimeSynced(String tableName) {
throw new UnsupportedOperationException("Not supported: `getLastCommitTimeSynced`");
@@ -106,36 +85,6 @@ public class DataHubSyncClient extends AbstractSyncHoodieClient {
updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, activeTimeline.lastInstant().get().getTimestamp()));
}
- @Override
- public Option<String> getLastReplicatedTime(String tableName) {
- throw new UnsupportedOperationException("Not supported: `getLastReplicatedTime`");
- }
-
- @Override
- public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
- throw new UnsupportedOperationException("Not supported: `updateLastReplicatedTimeStamp`");
- }
-
- @Override
- public void deleteLastReplicatedTimeStamp(String tableName) {
- throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
- }
-
- @Override
- public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
- throw new UnsupportedOperationException("Not supported: `addPartitionsToTable`");
- }
-
- @Override
- public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
- throw new UnsupportedOperationException("Not supported: `updatePartitionsToTable`");
- }
-
- @Override
- public void dropPartitions(String tableName, List<String> partitionsToDrop) {
- throw new UnsupportedOperationException("Not supported: `dropPartitions`");
- }
-
@Override
public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
MetadataChangeProposalWrapper propertiesChangeProposal = MetadataChangeProposalWrapper.builder()
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
index 9633d6b089..6502dcd1ce 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
@@ -21,7 +21,7 @@ package org.apache.hudi.sync.datahub;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
import com.beust.jcommander.JCommander;
@@ -34,7 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
* @Experimental
* @see <a href="https://datahubproject.io/">https://datahubproject.io/</a>
*/
-public class DataHubSyncTool extends AbstractSyncTool {
+public class DataHubSyncTool extends HoodieSyncTool {
private final DataHubSyncConfig config;
@@ -56,8 +56,8 @@ public class DataHubSyncTool extends AbstractSyncTool {
@Override
public void syncHoodieTable() {
try (DataHubSyncClient syncClient = new DataHubSyncClient(config, conf, fs)) {
- syncClient.updateTableDefinition(config.tableName);
- syncClient.updateLastCommitTimeSynced(config.tableName);
+ syncClient.updateTableDefinition(config.hoodieSyncConfigParams.tableName);
+ syncClient.updateLastCommitTimeSynced(config.hoodieSyncConfigParams.tableName);
}
}
@@ -68,7 +68,7 @@ public class DataHubSyncTool extends AbstractSyncTool {
cmd.usage();
System.exit(1);
}
- FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+ FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
new DataHubSyncTool(cfg, fs.getConf(), fs).syncHoodieTable();
}
}
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
index e3c1ad486c..0f9f9ece9e 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
@@ -43,6 +43,6 @@ public class HoodieDataHubDatasetIdentifier {
public DatasetUrn getDatasetUrn() {
DataPlatformUrn dataPlatformUrn = new DataPlatformUrn(DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME);
DataHubSyncConfig config = new DataHubSyncConfig(props);
- return new DatasetUrn(dataPlatformUrn, String.format("%s.%s", config.databaseName, config.tableName), FabricType.DEV);
+ return new DatasetUrn(dataPlatformUrn, String.format("%s.%s", config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName), FabricType.DEV);
}
}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java
index f0641b6fc0..b52237c698 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
import org.apache.hudi.sync.common.HoodieSyncException;
import org.apache.hudi.sync.common.model.Partition;
@@ -31,6 +31,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hudi.sync.common.operation.CatalogSync;
+import org.apache.hudi.sync.common.operation.PartitionsSync;
+import org.apache.hudi.sync.common.operation.ReplicatedTimeSync;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
import org.apache.parquet.schema.MessageType;
import java.util.ArrayList;
@@ -41,7 +45,8 @@ import java.util.Map;
/**
* Base class to sync Hudi tables with Hive based metastores, such as Hive server, HMS or managed Hive services.
*/
-public abstract class AbstractHiveSyncHoodieClient extends AbstractSyncHoodieClient {
+public abstract class AbstractHiveSyncHoodieClient extends HoodieSyncClient implements ReplicatedTimeSync,
+ PartitionsSync, CatalogSync, TblPropertiesSync {
protected final HoodieTimeline activeTimeline;
protected final HiveSyncConfig syncConfig;
@@ -49,10 +54,11 @@ public abstract class AbstractHiveSyncHoodieClient extends AbstractSyncHoodieCli
protected final PartitionValueExtractor partitionValueExtractor;
public AbstractHiveSyncHoodieClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
- super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, syncConfig.withOperationField, fs);
+ super(syncConfig.hoodieSyncConfigParams.basePath, syncConfig.hoodieSyncConfigParams.assumeDatePartitioning,
+ syncConfig.hoodieSyncConfigParams.useFileListingFromMetadata, syncConfig.hiveSyncConfigParams.withOperationField, fs);
this.syncConfig = syncConfig;
this.hadoopConf = hadoopConf;
- this.partitionValueExtractor = ReflectionUtils.loadClass(syncConfig.partitionValueExtractorClass);
+ this.partitionValueExtractor = ReflectionUtils.loadClass(syncConfig.hoodieSyncConfigParams.partitionValueExtractorClass);
this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
}
@@ -75,7 +81,7 @@ public abstract class AbstractHiveSyncHoodieClient extends AbstractSyncHoodieCli
List<PartitionEvent> events = new ArrayList<>();
for (String storagePartition : partitionStoragePartitions) {
- Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, storagePartition);
+ Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, storagePartition);
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 36dba81a33..66d17661a8 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -24,79 +24,14 @@ import org.apache.hudi.sync.common.HoodieSyncConfig;
import com.beust.jcommander.Parameter;
+import java.io.Serializable;
+
/**
* Configs needed to sync data into the Hive Metastore.
*/
public class HiveSyncConfig extends HoodieSyncConfig {
- @Parameter(names = {"--user"}, description = "Hive username")
- public String hiveUser;
-
- @Parameter(names = {"--pass"}, description = "Hive password")
- public String hivePass;
-
- @Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url")
- public String jdbcUrl;
-
- @Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
- public String metastoreUris;
-
- @Parameter(names = {"--use-pre-apache-input-format"},
- description = "Use InputFormat under com.uber.hoodie package "
- + "instead of org.apache.hudi package. Use this when you are in the process of migrating from "
- + "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
- + "org.apache.hudi input format.")
- public Boolean usePreApacheInputFormat;
-
- @Parameter(names = {"--bucket-spec"}, description = "bucket spec stored in metastore", required = false)
- public String bucketSpec;
-
- @Deprecated
- @Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
- public Boolean useJdbc;
-
- @Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms,glue,jdbc and hiveql")
- public String syncMode;
-
- @Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
- public Boolean autoCreateDatabase;
-
- @Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive exceptions")
- public Boolean ignoreExceptions;
-
- @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
- public Boolean skipROSuffix;
-
- @Parameter(names = {"--table-properties"}, description = "Table properties to hive table")
- public String tableProperties;
-
- @Parameter(names = {"--serde-properties"}, description = "Serde properties to hive table")
- public String serdeProperties;
-
- @Parameter(names = {"--help", "-h"}, help = true)
- public Boolean help = false;
-
- @Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type."
- + "Disabled by default for backward compatibility.")
- public Boolean supportTimestamp;
-
- @Parameter(names = {"--managed-table"}, description = "Create a managed table")
- public Boolean createManagedTable;
-
- @Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
- public Integer batchSyncNum;
-
- @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.")
- public Boolean syncAsSparkDataSourceTable;
-
- @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
- public int sparkSchemaLengthThreshold;
-
- @Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields")
- public Boolean withOperationField = false;
-
- @Parameter(names = {"--sync-comment"}, description = "synchronize table comments to hive")
- public boolean syncComment = false;
+ public final HiveSyncConfigParams hiveSyncConfigParams = new HiveSyncConfigParams();
// HIVE SYNC SPECIFIC CONFIGS
// NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
@@ -223,64 +158,118 @@ public class HiveSyncConfig extends HoodieSyncConfig {
public HiveSyncConfig(TypedProperties props) {
super(props);
- this.hiveUser = getStringOrDefault(HIVE_USER);
- this.hivePass = getStringOrDefault(HIVE_PASS);
- this.jdbcUrl = getStringOrDefault(HIVE_URL);
- this.usePreApacheInputFormat = getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT);
- this.useJdbc = getBooleanOrDefault(HIVE_USE_JDBC);
- this.metastoreUris = getStringOrDefault(METASTORE_URIS);
- this.syncMode = getString(HIVE_SYNC_MODE);
- this.autoCreateDatabase = getBooleanOrDefault(HIVE_AUTO_CREATE_DATABASE);
- this.ignoreExceptions = getBooleanOrDefault(HIVE_IGNORE_EXCEPTIONS);
- this.skipROSuffix = getBooleanOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE);
- this.tableProperties = getString(HIVE_TABLE_PROPERTIES);
- this.serdeProperties = getString(HIVE_TABLE_SERDE_PROPERTIES);
- this.supportTimestamp = getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE);
- this.batchSyncNum = getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
- this.syncAsSparkDataSourceTable = getBooleanOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE);
- this.sparkSchemaLengthThreshold = getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
- this.createManagedTable = getBooleanOrDefault(HIVE_CREATE_MANAGED_TABLE);
- this.bucketSpec = getStringOrDefault(HIVE_SYNC_BUCKET_SYNC_SPEC);
- this.syncComment = getBooleanOrDefault(HIVE_SYNC_COMMENT);
+ this.hiveSyncConfigParams.hiveUser = getStringOrDefault(HIVE_USER);
+ this.hiveSyncConfigParams.hivePass = getStringOrDefault(HIVE_PASS);
+ this.hiveSyncConfigParams.jdbcUrl = getStringOrDefault(HIVE_URL);
+ this.hiveSyncConfigParams.usePreApacheInputFormat = getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT);
+ this.hiveSyncConfigParams.useJdbc = getBooleanOrDefault(HIVE_USE_JDBC);
+ this.hiveSyncConfigParams.metastoreUris = getStringOrDefault(METASTORE_URIS);
+ this.hiveSyncConfigParams.syncMode = getString(HIVE_SYNC_MODE);
+ this.hiveSyncConfigParams.autoCreateDatabase = getBooleanOrDefault(HIVE_AUTO_CREATE_DATABASE);
+ this.hiveSyncConfigParams.ignoreExceptions = getBooleanOrDefault(HIVE_IGNORE_EXCEPTIONS);
+ this.hiveSyncConfigParams.skipROSuffix = getBooleanOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE);
+ this.hiveSyncConfigParams.tableProperties = getString(HIVE_TABLE_PROPERTIES);
+ this.hiveSyncConfigParams.serdeProperties = getString(HIVE_TABLE_SERDE_PROPERTIES);
+ this.hiveSyncConfigParams.supportTimestamp = getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE);
+ this.hiveSyncConfigParams.batchSyncNum = getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
+ this.hiveSyncConfigParams.syncAsSparkDataSourceTable = getBooleanOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE);
+ this.hiveSyncConfigParams.sparkSchemaLengthThreshold = getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
+ this.hiveSyncConfigParams.createManagedTable = getBooleanOrDefault(HIVE_CREATE_MANAGED_TABLE);
+ this.hiveSyncConfigParams.bucketSpec = getStringOrDefault(HIVE_SYNC_BUCKET_SYNC_SPEC);
+ this.hiveSyncConfigParams.syncComment = getBooleanOrDefault(HIVE_SYNC_COMMENT);
}
@Override
public String toString() {
return "HiveSyncConfig{"
- + "databaseName='" + databaseName + '\''
- + ", tableName='" + tableName + '\''
- + ", bucketSpec='" + bucketSpec + '\''
- + ", baseFileFormat='" + baseFileFormat + '\''
- + ", hiveUser='" + hiveUser + '\''
- + ", hivePass='" + hivePass + '\''
- + ", jdbcUrl='" + jdbcUrl + '\''
- + ", metastoreUris='" + metastoreUris + '\''
- + ", basePath='" + basePath + '\''
- + ", partitionFields=" + partitionFields
- + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
- + ", assumeDatePartitioning=" + assumeDatePartitioning
- + ", usePreApacheInputFormat=" + usePreApacheInputFormat
- + ", useJdbc=" + useJdbc
- + ", autoCreateDatabase=" + autoCreateDatabase
- + ", ignoreExceptions=" + ignoreExceptions
- + ", skipROSuffix=" + skipROSuffix
- + ", useFileListingFromMetadata=" + useFileListingFromMetadata
- + ", tableProperties='" + tableProperties + '\''
- + ", serdeProperties='" + serdeProperties + '\''
- + ", help=" + help
- + ", supportTimestamp=" + supportTimestamp
- + ", decodePartition=" + decodePartition
- + ", createManagedTable=" + createManagedTable
- + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
- + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
- + ", withOperationField=" + withOperationField
- + ", isConditionalSync=" + isConditionalSync
- + ", sparkVersion=" + sparkVersion
- + ", syncComment=" + syncComment
+ + "databaseName='" + hoodieSyncConfigParams.databaseName + '\''
+ + ", tableName='" + hoodieSyncConfigParams.tableName + '\''
+ + ", bucketSpec='" + hiveSyncConfigParams.bucketSpec + '\''
+ + ", baseFileFormat='" + hoodieSyncConfigParams.baseFileFormat + '\''
+ + ", hiveUser='" + hiveSyncConfigParams.hiveUser + '\''
+ + ", hivePass='" + hiveSyncConfigParams.hivePass + '\''
+ + ", jdbcUrl='" + hiveSyncConfigParams.jdbcUrl + '\''
+ + ", metastoreUris='" + hiveSyncConfigParams.metastoreUris + '\''
+ + ", basePath='" + hoodieSyncConfigParams.basePath + '\''
+ + ", partitionFields=" + hoodieSyncConfigParams.partitionFields
+ + ", partitionValueExtractorClass='" + hoodieSyncConfigParams.partitionValueExtractorClass + '\''
+ + ", assumeDatePartitioning=" + hoodieSyncConfigParams.assumeDatePartitioning
+ + ", usePreApacheInputFormat=" + hiveSyncConfigParams.usePreApacheInputFormat
+ + ", useJdbc=" + hiveSyncConfigParams.useJdbc
+ + ", autoCreateDatabase=" + hiveSyncConfigParams.autoCreateDatabase
+ + ", ignoreExceptions=" + hiveSyncConfigParams.ignoreExceptions
+ + ", skipROSuffix=" + hiveSyncConfigParams.skipROSuffix
+ + ", useFileListingFromMetadata=" + hoodieSyncConfigParams.useFileListingFromMetadata
+ + ", tableProperties='" + hiveSyncConfigParams.tableProperties + '\''
+ + ", serdeProperties='" + hiveSyncConfigParams.serdeProperties + '\''
+ + ", help=" + hiveSyncConfigParams.help
+ + ", supportTimestamp=" + hiveSyncConfigParams.supportTimestamp
+ + ", decodePartition=" + hoodieSyncConfigParams.decodePartition
+ + ", createManagedTable=" + hiveSyncConfigParams.createManagedTable
+ + ", syncAsSparkDataSourceTable=" + hiveSyncConfigParams.syncAsSparkDataSourceTable
+ + ", sparkSchemaLengthThreshold=" + hiveSyncConfigParams.sparkSchemaLengthThreshold
+ + ", withOperationField=" + hiveSyncConfigParams.withOperationField
+ + ", isConditionalSync=" + hoodieSyncConfigParams.isConditionalSync
+ + ", sparkVersion=" + hoodieSyncConfigParams.sparkVersion
+ + ", syncComment=" + hiveSyncConfigParams.syncComment
+ '}';
}
public static String getBucketSpec(String bucketCols, int bucketNum) {
return "CLUSTERED BY (" + bucketCols + " INTO " + bucketNum + " BUCKETS";
}
+
+ public static class HiveSyncConfigParams implements Serializable {
+ @Parameter(names = {"--user"}, description = "Hive username")
+ public String hiveUser;
+ @Parameter(names = {"--pass"}, description = "Hive password")
+ public String hivePass;
+ @Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url")
+ public String jdbcUrl;
+ @Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
+ public String metastoreUris;
+ @Parameter(names = {"--use-pre-apache-input-format"},
+ description = "Use InputFormat under com.uber.hoodie package "
+ + "instead of org.apache.hudi package. Use this when you are in the process of migrating from "
+ + "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
+ + "org.apache.hudi input format.")
+ public Boolean usePreApacheInputFormat;
+ @Parameter(names = {"--bucket-spec"}, description = "bucket spec stored in metastore", required = false)
+ public String bucketSpec;
+ @Deprecated
+ @Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
+ public Boolean useJdbc;
+ @Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms,glue,jdbc and hiveql")
+ public String syncMode;
+ @Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
+ public Boolean autoCreateDatabase;
+ @Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive exceptions")
+ public Boolean ignoreExceptions;
+ @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
+ public Boolean skipROSuffix;
+ @Parameter(names = {"--table-properties"}, description = "Table properties to hive table")
+ public String tableProperties;
+ @Parameter(names = {"--serde-properties"}, description = "Serde properties to hive table")
+ public String serdeProperties;
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+ @Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type."
+ + "Disabled by default for backward compatibility.")
+ public Boolean supportTimestamp;
+ @Parameter(names = {"--managed-table"}, description = "Create a managed table")
+ public Boolean createManagedTable;
+ @Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
+ public Integer batchSyncNum;
+ @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.")
+ public Boolean syncAsSparkDataSourceTable;
+ @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
+ public int sparkSchemaLengthThreshold;
+ @Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields")
+ public Boolean withOperationField = false;
+ @Parameter(names = {"--sync-comment"}, description = "synchronize table comments to hive")
+ public boolean syncComment = false;
+
+ public HiveSyncConfigParams() {
+ }
+ }
}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 4d6fad033b..ded0756f8e 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -29,9 +29,9 @@ import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.sync.common.util.ConfigUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent.PartitionEventType;
+import org.apache.hudi.sync.common.HoodieSyncTool;
import org.apache.hudi.sync.common.model.Partition;
import com.beust.jcommander.JCommander;
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
@@ -57,7 +58,7 @@ import java.util.stream.Collectors;
* partitions incrementally (all the partitions modified since the last commit)
*/
@SuppressWarnings("WeakerAccess")
-public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
+public class HiveSyncTool extends HoodieSyncTool implements AutoCloseable {
private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
@@ -76,7 +77,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
super(hiveSyncConfig.getProps(), hiveConf, fs);
// TODO: reconcile the way to set METASTOREURIS
if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) {
- hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris);
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.hiveSyncConfigParams.metastoreUris);
}
// HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
hiveConf.addResource(fs.getConf());
@@ -88,7 +89,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
try {
this.hoodieHiveClient = new HoodieHiveClient(hiveSyncConfig, hiveConf, fs);
} catch (RuntimeException e) {
- if (hiveSyncConfig.ignoreExceptions) {
+ if (hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions) {
LOG.error("Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", e);
} else {
throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
@@ -99,21 +100,21 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
private void initConfig(HiveSyncConfig hiveSyncConfig) {
// Set partitionFields to empty, when the NonPartitionedExtractor is used
// TODO: HiveSyncConfig should be responsible for inferring config value
- if (NonPartitionedExtractor.class.getName().equals(hiveSyncConfig.partitionValueExtractorClass)) {
+ if (NonPartitionedExtractor.class.getName().equals(hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass)) {
LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
- hiveSyncConfig.partitionFields = new ArrayList<>();
+ hiveSyncConfig.hoodieSyncConfigParams.partitionFields = new ArrayList<>();
}
this.hiveSyncConfig = hiveSyncConfig;
if (hoodieHiveClient != null) {
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
- this.snapshotTableName = hiveSyncConfig.tableName;
+ this.snapshotTableName = hiveSyncConfig.hoodieSyncConfigParams.tableName;
this.roTableName = Option.empty();
break;
case MERGE_ON_READ:
- this.snapshotTableName = hiveSyncConfig.tableName + SUFFIX_SNAPSHOT_TABLE;
- this.roTableName = hiveSyncConfig.skipROSuffix ? Option.of(hiveSyncConfig.tableName) :
- Option.of(hiveSyncConfig.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+ this.snapshotTableName = hiveSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_SNAPSHOT_TABLE;
+ this.roTableName = hiveSyncConfig.hiveSyncConfigParams.skipROSuffix ? Option.of(hiveSyncConfig.hoodieSyncConfigParams.tableName) :
+ Option.of(hiveSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
@@ -126,13 +127,13 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
public void syncHoodieTable() {
try {
if (hoodieHiveClient != null) {
- LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
- + hiveSyncConfig.jdbcUrl + ", basePath :" + hiveSyncConfig.basePath);
+ LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.hoodieSyncConfigParams.tableName + "). Hive metastore URL :"
+ + hiveSyncConfig.hiveSyncConfigParams.jdbcUrl + ", basePath :" + hiveSyncConfig.hoodieSyncConfigParams.basePath);
doSync();
}
} catch (RuntimeException re) {
- throw new HoodieException("Got runtime exception when hive syncing " + hiveSyncConfig.tableName, re);
+ throw new HoodieException("Got runtime exception when hive syncing " + hiveSyncConfig.hoodieSyncConfigParams.tableName, re);
} finally {
close();
}
@@ -172,19 +173,19 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
+ " of type " + hoodieHiveClient.getTableType());
// check if the database exists else create it
- if (hiveSyncConfig.autoCreateDatabase) {
+ if (hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase) {
try {
- if (!hoodieHiveClient.databaseExists(hiveSyncConfig.databaseName)) {
- hoodieHiveClient.createDatabase(hiveSyncConfig.databaseName);
+ if (!hoodieHiveClient.databaseExists(hiveSyncConfig.hoodieSyncConfigParams.databaseName)) {
+ hoodieHiveClient.createDatabase(hiveSyncConfig.hoodieSyncConfigParams.databaseName);
}
} catch (Exception e) {
// this is harmless since table creation will fail anyways, creation of DB is needed for in-memory testing
LOG.warn("Unable to create database", e);
}
} else {
- if (!hoodieHiveClient.databaseExists(hiveSyncConfig.databaseName)) {
- LOG.error("Hive database does not exist " + hiveSyncConfig.databaseName);
- throw new HoodieHiveSyncException("hive database does not exist " + hiveSyncConfig.databaseName);
+ if (!hoodieHiveClient.databaseExists(hiveSyncConfig.hoodieSyncConfigParams.databaseName)) {
+ LOG.error("Hive database does not exist " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
+ throw new HoodieHiveSyncException("hive database does not exist " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
}
}
@@ -204,7 +205,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
if (hoodieHiveClient.isBootstrap()
&& hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& !readAsOptimized) {
- hiveSyncConfig.syncAsSparkDataSourceTable = false;
+ hiveSyncConfig.hiveSyncConfigParams.syncAsSparkDataSourceTable = false;
}
// Sync schema if needed
@@ -223,7 +224,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
// Sync the partitions if needed
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition);
boolean meetSyncConditions = schemaChanged || partitionsChanged;
- if (!hiveSyncConfig.isConditionalSync || meetSyncConditions) {
+ if (!hiveSyncConfig.hoodieSyncConfigParams.isConditionalSync || meetSyncConditions) {
hoodieHiveClient.updateLastCommitTimeSynced(tableName);
}
LOG.info("Sync complete for " + tableName);
@@ -239,12 +240,12 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
private boolean syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
boolean readAsOptimized, MessageType schema) {
// Append spark table properties & serde properties
- Map<String, String> tableProperties = ConfigUtils.toMap(hiveSyncConfig.tableProperties);
- Map<String, String> serdeProperties = ConfigUtils.toMap(hiveSyncConfig.serdeProperties);
- if (hiveSyncConfig.syncAsSparkDataSourceTable) {
- Map<String, String> sparkTableProperties = getSparkTableProperties(hiveSyncConfig.partitionFields,
- hiveSyncConfig.sparkVersion, hiveSyncConfig.sparkSchemaLengthThreshold, schema);
- Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, hiveSyncConfig.basePath);
+ Map<String, String> tableProperties = ConfigUtils.toMap(hiveSyncConfig.hiveSyncConfigParams.tableProperties);
+ Map<String, String> serdeProperties = ConfigUtils.toMap(hiveSyncConfig.hiveSyncConfigParams.serdeProperties);
+ if (hiveSyncConfig.hiveSyncConfigParams.syncAsSparkDataSourceTable) {
+ Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(hiveSyncConfig.hoodieSyncConfigParams.partitionFields,
+ hiveSyncConfig.hoodieSyncConfigParams.sparkVersion, hiveSyncConfig.hiveSyncConfigParams.sparkSchemaLengthThreshold, schema);
+ Map<String, String> sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, hiveSyncConfig.hoodieSyncConfigParams.basePath);
tableProperties.putAll(sparkTableProperties);
serdeProperties.putAll(sparkSerdeProperties);
}
@@ -252,10 +253,10 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
// Check and sync schema
if (!tableExists) {
LOG.info("Hive table " + tableName + " is not found. Creating it");
- HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(hiveSyncConfig.baseFileFormat.toUpperCase());
+ HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat.toUpperCase());
String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
- if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && hiveSyncConfig.usePreApacheInputFormat) {
+ if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat) {
// Parquet input format had an InputFormat class visible under the old naming scheme.
inputFormatClassName = useRealTimeInputFormat
? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
@@ -274,12 +275,13 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
} else {
// Check if the table schema has evolved
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
- SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, hiveSyncConfig.partitionFields, hiveSyncConfig.supportTimestamp);
+ SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, hiveSyncConfig.hoodieSyncConfigParams.partitionFields,
+ hiveSyncConfig.hiveSyncConfigParams.supportTimestamp);
if (!schemaDiff.isEmpty()) {
LOG.info("Schema difference found for " + tableName);
hoodieHiveClient.updateTableDefinition(tableName, schema);
// Sync the table properties if the schema has changed
- if (hiveSyncConfig.tableProperties != null || hiveSyncConfig.syncAsSparkDataSourceTable) {
+ if (hiveSyncConfig.hiveSyncConfigParams.tableProperties != null || hiveSyncConfig.hiveSyncConfigParams.syncAsSparkDataSourceTable) {
hoodieHiveClient.updateTableProperties(tableName, tableProperties);
LOG.info("Sync table properties for " + tableName + ", table properties is: " + tableProperties);
}
@@ -289,7 +291,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
}
}
- if (hiveSyncConfig.syncComment) {
+ if (hiveSyncConfig.hiveSyncConfigParams.syncComment) {
Schema avroSchemaWithoutMetadataFields = hoodieHiveClient.getAvroSchemaWithoutMetadataFields();
Map<String, String> newComments = avroSchemaWithoutMetadataFields.getFields()
.stream().collect(Collectors.toMap(Schema.Field::name, field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc()));
@@ -349,11 +351,11 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
// parse the params
final HiveSyncConfig cfg = new HiveSyncConfig();
JCommander cmd = new JCommander(cfg, null, args);
- if (cfg.help || args.length == 0) {
+ if (cfg.hiveSyncConfigParams.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
- FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+ FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
HiveConf hiveConf = new HiveConf();
hiveConf.addResource(fs.getConf());
new HiveSyncTool(cfg, hiveConf, fs).syncHoodieTable();
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 539d18a213..a14f4f8a89 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -66,8 +66,8 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
// Support JDBC, HiveQL and metastore based implementations for backwards compatibility. Future users should
// disable jdbc and depend on metastore client for all hive registrations
try {
- if (!StringUtils.isNullOrEmpty(cfg.syncMode)) {
- HiveSyncMode syncMode = HiveSyncMode.of(cfg.syncMode);
+ if (!StringUtils.isNullOrEmpty(cfg.hiveSyncConfigParams.syncMode)) {
+ HiveSyncMode syncMode = HiveSyncMode.of(cfg.hiveSyncConfigParams.syncMode);
switch (syncMode) {
case HMS:
ddlExecutor = new HMSDDLExecutor(configuration, cfg, fs);
@@ -79,10 +79,10 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
ddlExecutor = new JDBCExecutor(cfg, fs);
break;
default:
- throw new HoodieHiveSyncException("Invalid sync mode given " + cfg.syncMode);
+ throw new HoodieHiveSyncException("Invalid sync mode given " + cfg.hiveSyncConfigParams.syncMode);
}
} else {
- ddlExecutor = cfg.useJdbc ? new JDBCExecutor(cfg, fs) : new HiveQueryDDLExecutor(cfg, fs, configuration);
+ ddlExecutor = cfg.hiveSyncConfigParams.useJdbc ? new JDBCExecutor(cfg, fs) : new HiveQueryDDLExecutor(cfg, fs, configuration);
}
this.client = Hive.get(configuration).getMSC();
} catch (Exception e) {
@@ -123,11 +123,11 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
return;
}
try {
- Table table = client.getTable(syncConfig.databaseName, tableName);
+ Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
for (Map.Entry<String, String> entry : tableProperties.entrySet()) {
table.putToParameters(entry.getKey(), entry.getValue());
}
- client.alter_table(syncConfig.databaseName, tableName, table);
+ client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to update table properties for table: "
+ tableName, e);
@@ -141,7 +141,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
*/
@Deprecated
public List<org.apache.hadoop.hive.metastore.api.Partition> scanTablePartitions(String tableName) throws TException {
- return client.listPartitions(syncConfig.databaseName, tableName, (short) -1);
+ return client.listPartitions(syncConfig.hoodieSyncConfigParams.databaseName, tableName, (short) -1);
}
@Override
@@ -152,12 +152,12 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
@Override
public List<Partition> getAllPartitions(String tableName) {
try {
- return client.listPartitions(syncConfig.databaseName, tableName, (short) -1)
+ return client.listPartitions(syncConfig.hoodieSyncConfigParams.databaseName, tableName, (short) -1)
.stream()
.map(p -> new Partition(p.getValues(), p.getSd().getLocation()))
.collect(Collectors.toList());
} catch (TException e) {
- throw new HoodieHiveSyncException("Failed to get all partitions for table " + tableId(syncConfig.databaseName, tableName), e);
+ throw new HoodieHiveSyncException("Failed to get all partitions for table " + tableId(syncConfig.hoodieSyncConfigParams.databaseName, tableName), e);
}
}
@@ -189,7 +189,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
@Override
public boolean tableExists(String tableName) {
try {
- return client.tableExists(syncConfig.databaseName, tableName);
+ return client.tableExists(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
} catch (TException e) {
throw new HoodieHiveSyncException("Failed to check if table exists " + tableName, e);
}
@@ -222,7 +222,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
public Option<String> getLastCommitTimeSynced(String tableName) {
// Get the last commit time from the TBLproperties
try {
- Table table = client.getTable(syncConfig.databaseName, tableName);
+ Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
return Option.ofNullable(table.getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the last commit time synced from the table " + tableName, e);
@@ -232,10 +232,10 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
public Option<String> getLastReplicatedTime(String tableName) {
// Get the last replicated time from the TBLproperties
try {
- Table table = client.getTable(syncConfig.databaseName, tableName);
+ Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
return Option.ofNullable(table.getParameters().getOrDefault(GLOBALLY_CONSISTENT_READ_TIMESTAMP, null));
} catch (NoSuchObjectException e) {
- LOG.warn("the said table not found in hms " + syncConfig.databaseName + "." + tableName);
+ LOG.warn("the said table not found in hms " + syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName);
return Option.empty();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get the last replicated time from the table " + tableName, e);
@@ -249,9 +249,9 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
"Not a valid completed timestamp " + timeStamp + " for table " + tableName);
}
try {
- Table table = client.getTable(syncConfig.databaseName, tableName);
+ Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
table.putToParameters(GLOBALLY_CONSISTENT_READ_TIMESTAMP, timeStamp);
- client.alter_table(syncConfig.databaseName, tableName, table);
+ client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException(
"Failed to update last replicated time to " + timeStamp + " for " + tableName, e);
@@ -260,9 +260,9 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
public void deleteLastReplicatedTimeStamp(String tableName) {
try {
- Table table = client.getTable(syncConfig.databaseName, tableName);
+ Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
String timestamp = table.getParameters().remove(GLOBALLY_CONSISTENT_READ_TIMESTAMP);
- client.alter_table(syncConfig.databaseName, tableName, table);
+ client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
if (timestamp != null) {
LOG.info("deleted last replicated timestamp " + timestamp + " for table " + tableName);
}
@@ -293,9 +293,9 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
Option<String> lastCommitSynced = activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
if (lastCommitSynced.isPresent()) {
try {
- Table table = client.getTable(syncConfig.databaseName, tableName);
+ Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced.get());
- client.alter_table(syncConfig.databaseName, tableName, table);
+ client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e);
}
@@ -305,7 +305,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
@Override
public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
try {
- return client.getSchema(syncConfig.databaseName, tableName);
+ return client.getSchema(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get table comments for : " + tableName, e);
}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
index 868f59b4fe..73fd31a51a 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
@@ -71,10 +71,10 @@ public class HMSDDLExecutor implements DDLExecutor {
this.fs = fs;
try {
this.partitionValueExtractor =
- (PartitionValueExtractor) Class.forName(syncConfig.partitionValueExtractorClass).newInstance();
+ (PartitionValueExtractor) Class.forName(syncConfig.hoodieSyncConfigParams.partitionValueExtractorClass).newInstance();
} catch (Exception e) {
throw new HoodieHiveSyncException(
- "Failed to initialize PartitionValueExtractor class " + syncConfig.partitionValueExtractorClass, e);
+ "Failed to initialize PartitionValueExtractor class " + syncConfig.hoodieSyncConfigParams.partitionValueExtractorClass, e);
}
}
@@ -93,16 +93,16 @@ public class HMSDDLExecutor implements DDLExecutor {
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
Map<String, String> tableProperties) {
try {
- LinkedHashMap<String, String> mapSchema = HiveSchemaUtil.parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
+ LinkedHashMap<String, String> mapSchema = HiveSchemaUtil.parquetSchemaToMapSchema(storageSchema, syncConfig.hiveSyncConfigParams.supportTimestamp, false);
List<FieldSchema> fieldSchema = HiveSchemaUtil.convertMapSchemaToHiveFieldSchema(mapSchema, syncConfig);
- List<FieldSchema> partitionSchema = syncConfig.partitionFields.stream().map(partitionKey -> {
+ List<FieldSchema> partitionSchema = syncConfig.hoodieSyncConfigParams.partitionFields.stream().map(partitionKey -> {
String partitionKeyType = HiveSchemaUtil.getPartitionKeyType(mapSchema, partitionKey);
return new FieldSchema(partitionKey, partitionKeyType.toLowerCase(), "");
}).collect(Collectors.toList());
Table newTb = new Table();
- newTb.setDbName(syncConfig.databaseName);
+ newTb.setDbName(syncConfig.hoodieSyncConfigParams.databaseName);
newTb.setTableName(tableName);
newTb.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
newTb.setCreateTime((int) System.currentTimeMillis());
@@ -110,13 +110,13 @@ public class HMSDDLExecutor implements DDLExecutor {
storageDescriptor.setCols(fieldSchema);
storageDescriptor.setInputFormat(inputFormatClass);
storageDescriptor.setOutputFormat(outputFormatClass);
- storageDescriptor.setLocation(syncConfig.basePath);
+ storageDescriptor.setLocation(syncConfig.hoodieSyncConfigParams.basePath);
serdeProperties.put("serialization.format", "1");
storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass, serdeProperties));
newTb.setSd(storageDescriptor);
newTb.setPartitionKeys(partitionSchema);
- if (!syncConfig.createManagedTable) {
+ if (!syncConfig.hiveSyncConfigParams.createManagedTable) {
newTb.putToParameters("EXTERNAL", "TRUE");
}
@@ -134,9 +134,9 @@ public class HMSDDLExecutor implements DDLExecutor {
@Override
public void updateTableDefinition(String tableName, MessageType newSchema) {
try {
- boolean cascade = syncConfig.partitionFields.size() > 0;
+ boolean cascade = syncConfig.hoodieSyncConfigParams.partitionFields.size() > 0;
List<FieldSchema> fieldSchema = HiveSchemaUtil.convertParquetSchemaToHiveFieldSchema(newSchema, syncConfig);
- Table table = client.getTable(syncConfig.databaseName, tableName);
+ Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
StorageDescriptor sd = table.getSd();
sd.setCols(fieldSchema);
table.setSd(sd);
@@ -145,7 +145,7 @@ public class HMSDDLExecutor implements DDLExecutor {
LOG.info("partition table,need cascade");
environmentContext.putToProperties(StatsSetupConst.CASCADE, StatsSetupConst.TRUE);
}
- client.alter_table_with_environmentContext(syncConfig.databaseName, tableName, table, environmentContext);
+ client.alter_table_with_environmentContext(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table, environmentContext);
} catch (Exception e) {
LOG.error("Failed to update table for " + tableName, e);
throw new HoodieHiveSyncException("Failed to update table for " + tableName, e);
@@ -158,7 +158,7 @@ public class HMSDDLExecutor implements DDLExecutor {
// HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
// get the Schema of the table.
final long start = System.currentTimeMillis();
- Table table = this.client.getTable(syncConfig.databaseName, tableName);
+ Table table = this.client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
Map<String, String> partitionKeysMap =
table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
@@ -184,22 +184,22 @@ public class HMSDDLExecutor implements DDLExecutor {
}
LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
try {
- StorageDescriptor sd = client.getTable(syncConfig.databaseName, tableName).getSd();
+ StorageDescriptor sd = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName).getSd();
List<Partition> partitionList = partitionsToAdd.stream().map(partition -> {
StorageDescriptor partitionSd = new StorageDescriptor();
partitionSd.setCols(sd.getCols());
partitionSd.setInputFormat(sd.getInputFormat());
partitionSd.setOutputFormat(sd.getOutputFormat());
partitionSd.setSerdeInfo(sd.getSerdeInfo());
- String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+ String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition).toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
partitionSd.setLocation(fullPartitionPath);
- return new Partition(partitionValues, syncConfig.databaseName, tableName, 0, 0, partitionSd, null);
+ return new Partition(partitionValues, syncConfig.hoodieSyncConfigParams.databaseName, tableName, 0, 0, partitionSd, null);
}).collect(Collectors.toList());
client.add_partitions(partitionList, true, false);
} catch (TException e) {
- LOG.error(syncConfig.databaseName + "." + tableName + " add partition failed", e);
- throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " add partition failed", e);
+ LOG.error(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " add partition failed", e);
+ throw new HoodieHiveSyncException(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " add partition failed", e);
}
}
@@ -211,20 +211,20 @@ public class HMSDDLExecutor implements DDLExecutor {
}
LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName);
try {
- StorageDescriptor sd = client.getTable(syncConfig.databaseName, tableName).getSd();
+ StorageDescriptor sd = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName).getSd();
List<Partition> partitionList = changedPartitions.stream().map(partition -> {
- Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition);
+ Path partitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition);
String partitionScheme = partitionPath.toUri().getScheme();
String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString();
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
sd.setLocation(fullPartitionPath);
- return new Partition(partitionValues, syncConfig.databaseName, tableName, 0, 0, sd, null);
+ return new Partition(partitionValues, syncConfig.hoodieSyncConfigParams.databaseName, tableName, 0, 0, sd, null);
}).collect(Collectors.toList());
- client.alter_partitions(syncConfig.databaseName, tableName, partitionList, null);
+ client.alter_partitions(syncConfig.hoodieSyncConfigParams.databaseName, tableName, partitionList, null);
} catch (TException e) {
- LOG.error(syncConfig.databaseName + "." + tableName + " update partition failed", e);
- throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " update partition failed", e);
+ LOG.error(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " update partition failed", e);
+ throw new HoodieHiveSyncException(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " update partition failed", e);
}
}
@@ -241,20 +241,20 @@ public class HMSDDLExecutor implements DDLExecutor {
if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) {
String partitionClause =
HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig);
- client.dropPartition(syncConfig.databaseName, tableName, partitionClause, false);
+ client.dropPartition(syncConfig.hoodieSyncConfigParams.databaseName, tableName, partitionClause, false);
}
LOG.info("Drop partition " + dropPartition + " on " + tableName);
}
} catch (TException e) {
- LOG.error(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
- throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
+ LOG.error(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
+ throw new HoodieHiveSyncException(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
}
}
@Override
public void updateTableComments(String tableName, Map<String, ImmutablePair<String,String>> alterSchema) {
try {
- Table table = client.getTable(syncConfig.databaseName, tableName);
+ Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
StorageDescriptor sd = new StorageDescriptor(table.getSd());
for (FieldSchema fieldSchema : sd.getCols()) {
if (alterSchema.containsKey(fieldSchema.getName())) {
@@ -264,7 +264,7 @@ public class HMSDDLExecutor implements DDLExecutor {
}
table.setSd(sd);
EnvironmentContext environmentContext = new EnvironmentContext();
- client.alter_table_with_environmentContext(syncConfig.databaseName, tableName, table, environmentContext);
+ client.alter_table_with_environmentContext(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table, environmentContext);
sd.clear();
} catch (Exception e) {
LOG.error("Failed to update table comments for " + tableName, e);
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
index 4b8ceec952..f1cafbffdf 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
@@ -64,7 +64,7 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
this.sessionState = new SessionState(configuration,
UserGroupInformation.getCurrentUser().getShortUserName());
SessionState.start(this.sessionState);
- this.sessionState.setCurrentDatabase(config.databaseName);
+ this.sessionState.setCurrentDatabase(config.hoodieSyncConfigParams.databaseName);
hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
} catch (Exception e) {
if (sessionState != null) {
@@ -109,7 +109,7 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
// HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
// get the Schema of the table.
final long start = System.currentTimeMillis();
- Table table = metaStoreClient.getTable(config.databaseName, tableName);
+ Table table = metaStoreClient.getTable(config.hoodieSyncConfigParams.databaseName, tableName);
Map<String, String> partitionKeysMap =
table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
@@ -141,13 +141,13 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
config)) {
String partitionClause =
HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config);
- metaStoreClient.dropPartition(config.databaseName, tableName, partitionClause, false);
+ metaStoreClient.dropPartition(config.hoodieSyncConfigParams.databaseName, tableName, partitionClause, false);
}
LOG.info("Drop partition " + dropPartition + " on " + tableName);
}
} catch (Exception e) {
- LOG.error(config.databaseName + "." + tableName + " drop partition failed", e);
- throw new HoodieHiveSyncException(config.databaseName + "." + tableName + " drop partition failed", e);
+ LOG.error(config.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
+ throw new HoodieHiveSyncException(config.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
}
}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
index 997d6e087c..e1b33c93f5 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
@@ -49,11 +49,11 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
public JDBCExecutor(HiveSyncConfig config, FileSystem fs) {
super(config, fs);
- Objects.requireNonNull(config.jdbcUrl, "--jdbc-url option is required for jdbc sync mode");
- Objects.requireNonNull(config.hiveUser, "--user option is required for jdbc sync mode");
- Objects.requireNonNull(config.hivePass, "--pass option is required for jdbc sync mode");
+ Objects.requireNonNull(config.hiveSyncConfigParams.jdbcUrl, "--jdbc-url option is required for jdbc sync mode");
+ Objects.requireNonNull(config.hiveSyncConfigParams.hiveUser, "--user option is required for jdbc sync mode");
+ Objects.requireNonNull(config.hiveSyncConfigParams.hivePass, "--pass option is required for jdbc sync mode");
this.config = config;
- createHiveConnection(config.jdbcUrl, config.hiveUser, config.hivePass);
+ createHiveConnection(config.hiveSyncConfigParams.jdbcUrl, config.hiveSyncConfigParams.hiveUser, config.hiveSyncConfigParams.hivePass);
}
@Override
@@ -126,7 +126,7 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
ResultSet result = null;
try {
DatabaseMetaData databaseMetaData = connection.getMetaData();
- result = databaseMetaData.getColumns(null, config.databaseName, tableName, null);
+ result = databaseMetaData.getColumns(null, config.hoodieSyncConfigParams.databaseName, tableName, null);
while (result.next()) {
String columnName = result.getString(4);
String columnType = result.getString(6);
@@ -157,11 +157,11 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
}
private List<String> constructDropPartitions(String tableName, List<String> partitions) {
- if (config.batchSyncNum <= 0) {
+ if (config.hiveSyncConfigParams.batchSyncNum <= 0) {
throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
}
List<String> result = new ArrayList<>();
- int batchSyncPartitionNum = config.batchSyncNum;
+ int batchSyncPartitionNum = config.hiveSyncConfigParams.batchSyncNum;
StringBuilder alterSQL = getAlterTableDropPrefix(tableName);
for (int i = 0; i < partitions.size(); i++) {
@@ -186,7 +186,7 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
public StringBuilder getAlterTableDropPrefix(String tableName) {
StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
- alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName)
+ alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.hoodieSyncConfigParams.databaseName)
.append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
.append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" DROP IF EXISTS ");
return alterSQL;
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
index d9b663ccb0..70b671595b 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
@@ -55,10 +55,10 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
this.config = config;
try {
this.partitionValueExtractor =
- (PartitionValueExtractor) Class.forName(config.partitionValueExtractorClass).newInstance();
+ (PartitionValueExtractor) Class.forName(config.hoodieSyncConfigParams.partitionValueExtractorClass).newInstance();
} catch (Exception e) {
throw new HoodieHiveSyncException(
- "Failed to initialize PartitionValueExtractor class " + config.partitionValueExtractorClass, e);
+ "Failed to initialize PartitionValueExtractor class " + config.hoodieSyncConfigParams.partitionValueExtractorClass, e);
}
}
@@ -90,11 +90,11 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
@Override
public void updateTableDefinition(String tableName, MessageType newSchema) {
try {
- String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, config.partitionFields, config.supportTimestamp);
+ String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, config.hoodieSyncConfigParams.partitionFields, config.hiveSyncConfigParams.supportTimestamp);
// Cascade clause should not be present for non-partitioned tables
- String cascadeClause = config.partitionFields.size() > 0 ? " cascade" : "";
+ String cascadeClause = config.hoodieSyncConfigParams.partitionFields.size() > 0 ? " cascade" : "";
StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)
- .append(config.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
+ .append(config.hoodieSyncConfigParams.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
.append(HIVE_ESCAPE_CHARACTER).append(tableName)
.append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(")
.append(newSchemaStr).append(" )").append(cascadeClause);
@@ -138,7 +138,7 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
String comment = field.getValue().getRight();
comment = comment.replace("'","");
sql.append("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)
- .append(config.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
+ .append(config.hoodieSyncConfigParams.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
.append(HIVE_ESCAPE_CHARACTER).append(tableName)
.append(HIVE_ESCAPE_CHARACTER)
.append(" CHANGE COLUMN `").append(name).append("` `").append(name)
@@ -148,15 +148,15 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
}
private List<String> constructAddPartitions(String tableName, List<String> partitions) {
- if (config.batchSyncNum <= 0) {
+ if (config.hiveSyncConfigParams.batchSyncNum <= 0) {
throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
}
List<String> result = new ArrayList<>();
- int batchSyncPartitionNum = config.batchSyncNum;
+ int batchSyncPartitionNum = config.hiveSyncConfigParams.batchSyncNum;
StringBuilder alterSQL = getAlterTablePrefix(tableName);
for (int i = 0; i < partitions.size(); i++) {
String partitionClause = getPartitionClause(partitions.get(i));
- String fullPartitionPath = FSUtils.getPartitionPath(config.basePath, partitions.get(i)).toString();
+ String fullPartitionPath = FSUtils.getPartitionPath(config.hoodieSyncConfigParams.basePath, partitions.get(i)).toString();
alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPath)
.append("' ");
if ((i + 1) % batchSyncPartitionNum == 0) {
@@ -173,7 +173,7 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
private StringBuilder getAlterTablePrefix(String tableName) {
StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
- alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName)
+ alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.hoodieSyncConfigParams.databaseName)
.append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
.append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
return alterSQL;
@@ -181,18 +181,18 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
public String getPartitionClause(String partition) {
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
- ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(),
- "Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues
+ ValidationUtils.checkArgument(config.hoodieSyncConfigParams.partitionFields.size() == partitionValues.size(),
+ "Partition key parts " + config.hoodieSyncConfigParams.partitionFields + " does not match with partition values " + partitionValues
+ ". Check partition strategy. ");
List<String> partBuilder = new ArrayList<>();
- for (int i = 0; i < config.partitionFields.size(); i++) {
+ for (int i = 0; i < config.hoodieSyncConfigParams.partitionFields.size(); i++) {
String partitionValue = partitionValues.get(i);
// decode the partition before sync to hive to prevent multiple escapes of HIVE
- if (config.decodePartition) {
+ if (config.hoodieSyncConfigParams.decodePartition) {
// This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath
partitionValue = PartitionPathEncodeUtils.unescapePathName(partitionValue);
}
- partBuilder.add("`" + config.partitionFields.get(i) + "`='" + partitionValue + "'");
+ partBuilder.add("`" + config.hoodieSyncConfigParams.partitionFields.get(i) + "`='" + partitionValue + "'");
}
return String.join(",", partBuilder);
}
@@ -200,12 +200,12 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
private List<String> constructChangePartitions(String tableName, List<String> partitions) {
List<String> changePartitions = new ArrayList<>();
// Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first
- String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + config.databaseName + HIVE_ESCAPE_CHARACTER;
+ String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + config.hoodieSyncConfigParams.databaseName + HIVE_ESCAPE_CHARACTER;
changePartitions.add(useDatabase);
String alterTable = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER;
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
- Path partitionPath = FSUtils.getPartitionPath(config.basePath, partition);
+ Path partitionPath = FSUtils.getPartitionPath(config.hoodieSyncConfigParams.basePath, partition);
String partitionScheme = partitionPath.toUri().getScheme();
String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString();
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
index 16c30a16aa..6ec4a586b8 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
@@ -36,20 +36,20 @@ public class GlobalHiveSyncConfig extends HiveSyncConfig {
public static GlobalHiveSyncConfig copy(GlobalHiveSyncConfig cfg) {
GlobalHiveSyncConfig newConfig = new GlobalHiveSyncConfig(cfg.getProps());
- newConfig.basePath = cfg.basePath;
- newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
- newConfig.databaseName = cfg.databaseName;
- newConfig.hivePass = cfg.hivePass;
- newConfig.hiveUser = cfg.hiveUser;
- newConfig.partitionFields = cfg.partitionFields;
- newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
- newConfig.jdbcUrl = cfg.jdbcUrl;
- newConfig.tableName = cfg.tableName;
- newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
- newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
- newConfig.supportTimestamp = cfg.supportTimestamp;
- newConfig.decodePartition = cfg.decodePartition;
- newConfig.batchSyncNum = cfg.batchSyncNum;
+ newConfig.hoodieSyncConfigParams.basePath = cfg.hoodieSyncConfigParams.basePath;
+ newConfig.hoodieSyncConfigParams.assumeDatePartitioning = cfg.hoodieSyncConfigParams.assumeDatePartitioning;
+ newConfig.hoodieSyncConfigParams.databaseName = cfg.hoodieSyncConfigParams.databaseName;
+ newConfig.hiveSyncConfigParams.hivePass = cfg.hiveSyncConfigParams.hivePass;
+ newConfig.hiveSyncConfigParams.hiveUser = cfg.hiveSyncConfigParams.hiveUser;
+ newConfig.hoodieSyncConfigParams.partitionFields = cfg.hoodieSyncConfigParams.partitionFields;
+ newConfig.hoodieSyncConfigParams.partitionValueExtractorClass = cfg.hoodieSyncConfigParams.partitionValueExtractorClass;
+ newConfig.hiveSyncConfigParams.jdbcUrl = cfg.hiveSyncConfigParams.jdbcUrl;
+ newConfig.hoodieSyncConfigParams.tableName = cfg.hoodieSyncConfigParams.tableName;
+ newConfig.hiveSyncConfigParams.usePreApacheInputFormat = cfg.hiveSyncConfigParams.usePreApacheInputFormat;
+ newConfig.hoodieSyncConfigParams.useFileListingFromMetadata = cfg.hoodieSyncConfigParams.useFileListingFromMetadata;
+ newConfig.hiveSyncConfigParams.supportTimestamp = cfg.hiveSyncConfigParams.supportTimestamp;
+ newConfig.hoodieSyncConfigParams.decodePartition = cfg.hoodieSyncConfigParams.decodePartition;
+ newConfig.hiveSyncConfigParams.batchSyncNum = cfg.hiveSyncConfigParams.batchSyncNum;
newConfig.globallyReplicatedTimeStamp = cfg.globallyReplicatedTimeStamp;
return newConfig;
}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
index a7d205962e..92c9631ab2 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
@@ -80,7 +80,7 @@ public class GlobalHiveSyncTool extends HiveSyncTool {
}
public static GlobalHiveSyncTool buildGlobalHiveSyncTool(GlobalHiveSyncConfig cfg, HiveConf hiveConf) {
- FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+ FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
hiveConf.addResource(fs.getConf());
return new GlobalHiveSyncTool(cfg, hiveConf, fs);
}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
index c3dd2af346..917bc1e123 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
@@ -74,12 +74,12 @@ public class HiveSyncGlobalCommitConfig extends GlobalHiveSyncConfig {
GlobalHiveSyncConfig mkGlobalHiveSyncConfig(boolean forRemote) {
GlobalHiveSyncConfig cfg = GlobalHiveSyncConfig.copy(this);
- cfg.basePath = forRemote ? properties.getProperty(REMOTE_BASE_PATH)
- : properties.getProperty(LOCAL_BASE_PATH, cfg.basePath);
- cfg.jdbcUrl = forRemote ? properties.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
- : properties.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, cfg.jdbcUrl);
- LOG.info("building hivesync config forRemote: " + forRemote + " " + cfg.jdbcUrl + " "
- + cfg.basePath);
+ cfg.hoodieSyncConfigParams.basePath = forRemote ? properties.getProperty(REMOTE_BASE_PATH)
+ : properties.getProperty(LOCAL_BASE_PATH, cfg.hoodieSyncConfigParams.basePath);
+ cfg.hiveSyncConfigParams.jdbcUrl = forRemote ? properties.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
+ : properties.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, cfg.hiveSyncConfigParams.jdbcUrl);
+ LOG.info("building hivesync config forRemote: " + forRemote + " " + cfg.hiveSyncConfigParams.jdbcUrl + " "
+ + cfg.hoodieSyncConfigParams.basePath);
return cfg;
}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
index a194eeb2e9..4d15d8db1e 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
@@ -104,7 +104,7 @@ public class HiveSyncGlobalCommitTool implements HiveSyncGlobalCommit, AutoClose
throws IOException {
HiveSyncGlobalCommitConfig cfg = new HiveSyncGlobalCommitConfig();
JCommander cmd = new JCommander(cfg, null, args);
- if (cfg.help || args.length == 0) {
+ if (cfg.hiveSyncConfigParams.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
index 0258cfc5ef..e17fe8af22 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
@@ -40,18 +40,18 @@ public class HivePartitionUtil {
*/
public static String getPartitionClauseForDrop(String partition, PartitionValueExtractor partitionValueExtractor, HiveSyncConfig config) {
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
- ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(),
- "Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues
+ ValidationUtils.checkArgument(config.hoodieSyncConfigParams.partitionFields.size() == partitionValues.size(),
+ "Partition key parts " + config.hoodieSyncConfigParams.partitionFields + " does not match with partition values " + partitionValues
+ ". Check partition strategy. ");
List<String> partBuilder = new ArrayList<>();
- for (int i = 0; i < config.partitionFields.size(); i++) {
+ for (int i = 0; i < config.hoodieSyncConfigParams.partitionFields.size(); i++) {
String partitionValue = partitionValues.get(i);
// decode the partition before sync to hive to prevent multiple escapes of HIVE
- if (config.decodePartition) {
+ if (config.hoodieSyncConfigParams.decodePartition) {
// This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath
partitionValue = PartitionPathEncodeUtils.unescapePathName(partitionValue);
}
- partBuilder.add(config.partitionFields.get(i) + "=" + partitionValue);
+ partBuilder.add(config.hoodieSyncConfigParams.partitionFields.get(i) + "=" + partitionValue);
}
return String.join("/", partBuilder);
}
@@ -61,7 +61,7 @@ public class HivePartitionUtil {
Partition newPartition;
try {
List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partitionPath);
- newPartition = client.getPartition(config.databaseName, tableName, partitionValues);
+ newPartition = client.getPartition(config.hoodieSyncConfigParams.databaseName, tableName, partitionValues);
} catch (NoSuchObjectException ignored) {
newPartition = null;
} catch (TException e) {
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
index 2d700596f0..85faa012d2 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
@@ -156,7 +156,7 @@ public class HiveSchemaUtil {
* @return : Hive Table schema read from parquet file List[FieldSchema] without partitionField
*/
public static List<FieldSchema> convertParquetSchemaToHiveFieldSchema(MessageType messageType, HiveSyncConfig syncConfig) throws IOException {
- return convertMapSchemaToHiveFieldSchema(parquetSchemaToMapSchema(messageType, syncConfig.supportTimestamp, false), syncConfig);
+ return convertMapSchemaToHiveFieldSchema(parquetSchemaToMapSchema(messageType, syncConfig.hiveSyncConfigParams.supportTimestamp, false), syncConfig);
}
/**
@@ -202,7 +202,7 @@ public class HiveSchemaUtil {
public static List<FieldSchema> convertMapSchemaToHiveFieldSchema(LinkedHashMap<String, String> schema, HiveSyncConfig syncConfig) throws IOException {
return schema.keySet().stream()
.map(key -> new FieldSchema(key, schema.get(key).toLowerCase(), ""))
- .filter(field -> !syncConfig.partitionFields.contains(field.getName()))
+ .filter(field -> !syncConfig.hoodieSyncConfigParams.partitionFields.contains(field.getName()))
.collect(Collectors.toList());
}
@@ -448,11 +448,11 @@ public class HiveSchemaUtil {
public static String generateCreateDDL(String tableName, MessageType storageSchema, HiveSyncConfig config, String inputFormatClass,
String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
Map<String, String> tableProperties) throws IOException {
- Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.supportTimestamp);
- String columns = generateSchemaString(storageSchema, config.partitionFields, config.supportTimestamp);
+ Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.hiveSyncConfigParams.supportTimestamp);
+ String columns = generateSchemaString(storageSchema, config.hoodieSyncConfigParams.partitionFields, config.hiveSyncConfigParams.supportTimestamp);
List<String> partitionFields = new ArrayList<>();
- for (String partitionKey : config.partitionFields) {
+ for (String partitionKey : config.hoodieSyncConfigParams.partitionFields) {
String partitionKeyWithTicks = tickSurround(partitionKey);
partitionFields.add(new StringBuilder().append(partitionKeyWithTicks).append(" ")
.append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString());
@@ -460,26 +460,26 @@ public class HiveSchemaUtil {
String partitionsStr = String.join(",", partitionFields);
StringBuilder sb = new StringBuilder();
- if (config.createManagedTable) {
+ if (config.hiveSyncConfigParams.createManagedTable) {
sb.append("CREATE TABLE IF NOT EXISTS ");
} else {
sb.append("CREATE EXTERNAL TABLE IF NOT EXISTS ");
}
- sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
+ sb.append(HIVE_ESCAPE_CHARACTER).append(config.hoodieSyncConfigParams.databaseName).append(HIVE_ESCAPE_CHARACTER)
.append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER);
sb.append("( ").append(columns).append(")");
- if (!config.partitionFields.isEmpty()) {
+ if (!config.hoodieSyncConfigParams.partitionFields.isEmpty()) {
sb.append(" PARTITIONED BY (").append(partitionsStr).append(")");
}
- if (config.bucketSpec != null) {
- sb.append(' ' + config.bucketSpec + ' ');
+ if (config.hiveSyncConfigParams.bucketSpec != null) {
+ sb.append(' ' + config.hiveSyncConfigParams.bucketSpec + ' ');
}
sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'");
if (serdeProperties != null && !serdeProperties.isEmpty()) {
sb.append(" WITH SERDEPROPERTIES (").append(propertyToString(serdeProperties)).append(")");
}
sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'");
- sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.basePath).append("'");
+ sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.hoodieSyncConfigParams.basePath).append("'");
if (tableProperties != null && !tableProperties.isEmpty()) {
sb.append(" TBLPROPERTIES(").append(propertyToString(tableProperties)).append(")");
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
index 937243393f..aa1c606205 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
@@ -57,22 +57,22 @@ public class TestHiveSyncGlobalCommitTool {
config.properties.setProperty(LOCAL_BASE_PATH, localCluster.tablePath(dbName, tblName));
config.properties.setProperty(REMOTE_BASE_PATH, remoteCluster.tablePath(dbName, tblName));
config.globallyReplicatedTimeStamp = commitTime;
- config.hiveUser = System.getProperty("user.name");
- config.hivePass = "";
- config.databaseName = dbName;
- config.tableName = tblName;
- config.basePath = localCluster.tablePath(dbName, tblName);
- config.assumeDatePartitioning = true;
- config.usePreApacheInputFormat = false;
- config.partitionFields = Collections.singletonList("datestr");
+ config.hiveSyncConfigParams.hiveUser = System.getProperty("user.name");
+ config.hiveSyncConfigParams.hivePass = "";
+ config.hoodieSyncConfigParams.databaseName = dbName;
+ config.hoodieSyncConfigParams.tableName = tblName;
+ config.hoodieSyncConfigParams.basePath = localCluster.tablePath(dbName, tblName);
+ config.hoodieSyncConfigParams.assumeDatePartitioning = true;
+ config.hiveSyncConfigParams.usePreApacheInputFormat = false;
+ config.hoodieSyncConfigParams.partitionFields = Collections.singletonList("datestr");
return config;
}
private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig config) throws Exception {
Assertions.assertEquals(localCluster.getHMSClient()
- .getTable(config.databaseName, config.tableName).getParameters()
+ .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), remoteCluster.getHMSClient()
- .getTable(config.databaseName, config.tableName).getParameters()
+ .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), "compare replicated timestamps");
}
@@ -116,11 +116,11 @@ public class TestHiveSyncGlobalCommitTool {
remoteCluster.stopHiveServer2();
Assertions.assertFalse(tool.commit());
Assertions.assertEquals(commitTime, localCluster.getHMSClient()
- .getTable(config.databaseName, config.tableName).getParameters()
+ .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
Assertions.assertTrue(tool.rollback()); // do a rollback
Assertions.assertNotEquals(commitTime, localCluster.getHMSClient()
- .getTable(config.databaseName, config.tableName).getParameters()
+ .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
Assertions.assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
remoteCluster.startHiveServer2();
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 167c35a124..3377917ffe 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -29,8 +29,8 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.sync.common.util.ConfigUtils;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent.PartitionEventType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java
index b6adcb2982..0de27d3267 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java
@@ -80,32 +80,32 @@ public class HiveSyncFunctionalTestHarness {
public HiveSyncConfig hiveSyncConf() throws IOException {
HiveSyncConfig conf = new HiveSyncConfig();
- conf.jdbcUrl = hiveTestService.getJdbcHive2Url();
- conf.hiveUser = "";
- conf.hivePass = "";
- conf.databaseName = "hivesynctestdb";
- conf.tableName = "hivesynctesttable";
- conf.basePath = Files.createDirectories(tempDir.resolve("hivesynctestcase-" + Instant.now().toEpochMilli())).toUri().toString();
- conf.assumeDatePartitioning = true;
- conf.usePreApacheInputFormat = false;
- conf.partitionFields = Collections.singletonList("datestr");
+ conf.hiveSyncConfigParams.jdbcUrl = hiveTestService.getJdbcHive2Url();
+ conf.hiveSyncConfigParams.hiveUser = "";
+ conf.hiveSyncConfigParams.hivePass = "";
+ conf.hoodieSyncConfigParams.databaseName = "hivesynctestdb";
+ conf.hoodieSyncConfigParams.tableName = "hivesynctesttable";
+ conf.hoodieSyncConfigParams.basePath = Files.createDirectories(tempDir.resolve("hivesynctestcase-" + Instant.now().toEpochMilli())).toUri().toString();
+ conf.hoodieSyncConfigParams.assumeDatePartitioning = true;
+ conf.hiveSyncConfigParams.usePreApacheInputFormat = false;
+ conf.hoodieSyncConfigParams.partitionFields = Collections.singletonList("datestr");
return conf;
}
public HoodieHiveClient hiveClient(HiveSyncConfig hiveSyncConfig) throws IOException {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
- .setTableName(hiveSyncConfig.tableName)
+ .setTableName(hiveSyncConfig.hoodieSyncConfigParams.tableName)
.setPayloadClass(HoodieAvroPayload.class)
- .initTable(hadoopConf, hiveSyncConfig.basePath);
+ .initTable(hadoopConf, hiveSyncConfig.hoodieSyncConfigParams.basePath);
return new HoodieHiveClient(hiveSyncConfig, hiveConf(), fs());
}
public void dropTables(String database, String... tables) throws IOException, HiveException, MetaException {
HiveSyncConfig hiveSyncConfig = hiveSyncConf();
- hiveSyncConfig.databaseName = database;
+ hiveSyncConfig.hoodieSyncConfigParams.databaseName = database;
for (String table : tables) {
- hiveSyncConfig.tableName = table;
+ hiveSyncConfig.hoodieSyncConfigParams.tableName = table;
new HiveQueryDDLExecutor(hiveSyncConfig, fs(), hiveConf()).runSQL("drop table if exists " + table);
}
}
@@ -113,7 +113,7 @@ public class HiveSyncFunctionalTestHarness {
public void dropDatabases(String... databases) throws IOException, HiveException, MetaException {
HiveSyncConfig hiveSyncConfig = hiveSyncConf();
for (String database : databases) {
- hiveSyncConfig.databaseName = database;
+ hiveSyncConfig.hoodieSyncConfigParams.databaseName = database;
new HiveQueryDDLExecutor(hiveSyncConfig, fs(), hiveConf()).runSQL("drop database if exists " + database);
}
}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
similarity index 58%
rename from hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
rename to hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 8eec327890..54e151dd69 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -18,15 +18,12 @@
package org.apache.hudi.sync.common;
-import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -34,21 +31,18 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
-public abstract class AbstractSyncHoodieClient implements AutoCloseable {
+public abstract class HoodieSyncClient implements AutoCloseable {
- private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+ private static final Logger LOG = LogManager.getLogger(HoodieSyncClient.class);
public static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
- public static final TypeConverter TYPE_CONVERTOR = new TypeConverter() {};
protected final HoodieTableMetaClient metaClient;
protected final HoodieTableType tableType;
@@ -59,13 +53,13 @@ public abstract class AbstractSyncHoodieClient implements AutoCloseable {
private final boolean withOperationField;
@Deprecated
- public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
- boolean verifyMetadataFileListing, boolean withOperationField, FileSystem fs) {
+ public HoodieSyncClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
+ boolean verifyMetadataFileListing, boolean withOperationField, FileSystem fs) {
this(basePath, assumeDatePartitioning, useFileListingFromMetadata, withOperationField, fs);
}
- public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
- boolean withOperationField, FileSystem fs) {
+ public HoodieSyncClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
+ boolean withOperationField, FileSystem fs) {
this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
this.tableType = metaClient.getTableType();
this.basePath = basePath;
@@ -75,47 +69,6 @@ public abstract class AbstractSyncHoodieClient implements AutoCloseable {
this.fs = fs;
}
- /**
- * Create the table.
- * @param tableName The table name.
- * @param storageSchema The table schema.
- * @param inputFormatClass The input format class of this table.
- * @param outputFormatClass The output format class of this table.
- * @param serdeClass The serde class of this table.
- * @param serdeProperties The serde properties of this table.
- * @param tableProperties The table properties for this table.
- */
- public abstract void createTable(String tableName, MessageType storageSchema,
- String inputFormatClass, String outputFormatClass,
- String serdeClass, Map<String, String> serdeProperties,
- Map<String, String> tableProperties);
-
- /**
- * @deprecated Use {@link #tableExists} instead.
- */
- @Deprecated
- public abstract boolean doesTableExist(String tableName);
-
- public abstract boolean tableExists(String tableName);
-
- public abstract Option<String> getLastCommitTimeSynced(String tableName);
-
- public abstract void updateLastCommitTimeSynced(String tableName);
-
- public abstract Option<String> getLastReplicatedTime(String tableName);
-
- public abstract void updateLastReplicatedTimeStamp(String tableName, String timeStamp);
-
- public abstract void deleteLastReplicatedTimeStamp(String tableName);
-
- public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
-
- public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
-
- public abstract void dropPartitions(String tableName, List<String> partitionsToDrop);
-
- public void updateTableProperties(String tableName, Map<String, String> tableProperties) {}
-
public abstract Map<String, String> getTableSchema(String tableName);
public HoodieTableType getTableType() {
@@ -194,56 +147,6 @@ public abstract class AbstractSyncHoodieClient implements AutoCloseable {
}
}
- public abstract static class TypeConverter implements Serializable {
-
- static final String DEFAULT_TARGET_TYPE = "DECIMAL";
-
- protected String targetType;
-
- public TypeConverter() {
- this.targetType = DEFAULT_TARGET_TYPE;
- }
-
- public TypeConverter(String targetType) {
- ValidationUtils.checkArgument(Objects.nonNull(targetType));
- this.targetType = targetType;
- }
-
- public void doConvert(ResultSet resultSet, Map<String, String> schema) throws SQLException {
- schema.put(getColumnName(resultSet), targetType.equalsIgnoreCase(getColumnType(resultSet))
- ? convert(resultSet) : getColumnType(resultSet));
- }
-
- public String convert(ResultSet resultSet) throws SQLException {
- String columnType = getColumnType(resultSet);
- int columnSize = resultSet.getInt("COLUMN_SIZE");
- int decimalDigits = resultSet.getInt("DECIMAL_DIGITS");
- return columnType + String.format("(%s,%s)", columnSize, decimalDigits);
- }
-
- public String getColumnName(ResultSet resultSet) throws SQLException {
- return resultSet.getString(4);
- }
-
- public String getColumnType(ResultSet resultSet) throws SQLException {
- return resultSet.getString(6);
- }
- }
-
- /**
- * Read the schema from the log file on path.
- */
- @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
- private MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path) throws Exception {
- MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, path);
- // Fall back to read the schema from last compaction
- if (messageType == null) {
- LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
- return new TableSchemaResolver(this.metaClient).readSchemaFromLastCompaction(lastCompactionCommitOpt);
- }
- return messageType;
- }
-
/**
* Partition Event captures any partition that needs to be added or updated.
*/
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
index dc2b21ba45..6296958907 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
@@ -28,6 +28,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import com.beust.jcommander.Parameter;
+import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
@@ -37,40 +38,7 @@ import java.util.function.Function;
*/
public class HoodieSyncConfig extends HoodieConfig {
- @Parameter(names = {"--database"}, description = "name of the target database in meta store", required = true)
- public String databaseName;
-
- @Parameter(names = {"--table"}, description = "name of the target table in meta store", required = true)
- public String tableName;
-
- @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
- public String basePath;
-
- @Parameter(names = {"--base-file-format"}, description = "Format of the base files (PARQUET (or) HFILE)")
- public String baseFileFormat;
-
- @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
- public List<String> partitionFields;
-
- @Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
- + "to extract the partition values from HDFS path")
- public String partitionValueExtractorClass;
-
- @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
- + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
- public Boolean assumeDatePartitioning;
-
- @Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
- public Boolean decodePartition;
-
- @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
- public Boolean useFileListingFromMetadata;
-
- @Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
- public Boolean isConditionalSync;
-
- @Parameter(names = {"--spark-version"}, description = "The spark version")
- public String sparkVersion;
+ public final HoodieSyncConfigParams hoodieSyncConfigParams = new HoodieSyncConfigParams();
public static final ConfigProperty<String> META_SYNC_BASE_PATH = ConfigProperty
.key("hoodie.datasource.meta.sync.base.path")
@@ -169,20 +137,50 @@ public class HoodieSyncConfig extends HoodieConfig {
super(props);
setDefaults();
- this.basePath = getStringOrDefault(META_SYNC_BASE_PATH);
- this.databaseName = getStringOrDefault(META_SYNC_DATABASE_NAME);
- this.tableName = getStringOrDefault(META_SYNC_TABLE_NAME);
- this.baseFileFormat = getStringOrDefault(META_SYNC_BASE_FILE_FORMAT);
- this.partitionFields = props.getStringList(META_SYNC_PARTITION_FIELDS.key(), ",", Collections.emptyList());
- this.partitionValueExtractorClass = getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS);
- this.assumeDatePartitioning = getBooleanOrDefault(META_SYNC_ASSUME_DATE_PARTITION);
- this.decodePartition = getBooleanOrDefault(KeyGeneratorOptions.URL_ENCODE_PARTITIONING);
- this.useFileListingFromMetadata = getBooleanOrDefault(META_SYNC_USE_FILE_LISTING_FROM_METADATA);
- this.isConditionalSync = getBooleanOrDefault(META_SYNC_CONDITIONAL_SYNC);
- this.sparkVersion = getStringOrDefault(META_SYNC_SPARK_VERSION);
+ this.hoodieSyncConfigParams.basePath = getStringOrDefault(META_SYNC_BASE_PATH);
+ this.hoodieSyncConfigParams.databaseName = getStringOrDefault(META_SYNC_DATABASE_NAME);
+ this.hoodieSyncConfigParams.tableName = getStringOrDefault(META_SYNC_TABLE_NAME);
+ this.hoodieSyncConfigParams.baseFileFormat = getStringOrDefault(META_SYNC_BASE_FILE_FORMAT);
+ this.hoodieSyncConfigParams.partitionFields = props.getStringList(META_SYNC_PARTITION_FIELDS.key(), ",", Collections.emptyList());
+ this.hoodieSyncConfigParams.partitionValueExtractorClass = getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS);
+ this.hoodieSyncConfigParams.assumeDatePartitioning = getBooleanOrDefault(META_SYNC_ASSUME_DATE_PARTITION);
+ this.hoodieSyncConfigParams.decodePartition = getBooleanOrDefault(KeyGeneratorOptions.URL_ENCODE_PARTITIONING);
+ this.hoodieSyncConfigParams.useFileListingFromMetadata = getBooleanOrDefault(META_SYNC_USE_FILE_LISTING_FROM_METADATA);
+ this.hoodieSyncConfigParams.isConditionalSync = getBooleanOrDefault(META_SYNC_CONDITIONAL_SYNC);
+ this.hoodieSyncConfigParams.sparkVersion = getStringOrDefault(META_SYNC_SPARK_VERSION);
}
protected void setDefaults() {
this.setDefaultValue(META_SYNC_TABLE_NAME);
}
+
+ public static class HoodieSyncConfigParams implements Serializable {
+ @Parameter(names = {"--database"}, description = "name of the target database in meta store", required = true)
+ public String databaseName;
+ @Parameter(names = {"--table"}, description = "name of the target table in meta store", required = true)
+ public String tableName;
+ @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
+ public String basePath;
+ @Parameter(names = {"--base-file-format"}, description = "Format of the base files (PARQUET (or) HFILE)")
+ public String baseFileFormat;
+ @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
+ public List<String> partitionFields;
+ @Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
+ + "to extract the partition values from HDFS path")
+ public String partitionValueExtractorClass;
+ @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+ + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
+ public Boolean assumeDatePartitioning;
+ @Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
+ public Boolean decodePartition;
+ @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
+ public Boolean useFileListingFromMetadata;
+ @Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
+ public Boolean isConditionalSync;
+ @Parameter(names = {"--spark-version"}, description = "The spark version")
+ public String sparkVersion;
+
+ public HoodieSyncConfigParams() {
+ }
+ }
}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
new file mode 100644
index 0000000000..754d142695
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Properties;
+
+/**
+ * Base class to sync Hudi meta data with Metastores to make
+ * Hudi table queryable through external systems.
+ */
+public abstract class HoodieSyncTool {
+ protected final Configuration conf;
+ protected final FileSystem fs;
+ protected TypedProperties props;
+
+ public HoodieSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
+ this.props = props;
+ this.conf = conf;
+ this.fs = fs;
+ }
+
+ @Deprecated
+ public HoodieSyncTool(Properties props, FileSystem fileSystem) {
+ this(new TypedProperties(props), fileSystem.getConf(), fileSystem);
+ }
+
+ public abstract void syncHoodieTable();
+
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/SupportMetaSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/SupportMetaSync.java
new file mode 100644
index 0000000000..4e29a57a08
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/SupportMetaSync.java
@@ -0,0 +1,7 @@
+package org.apache.hudi.sync.common;
+
+public interface SupportMetaSync {
+ default void runMetaSync() {
+
+ }
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/CatalogSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/CatalogSync.java
new file mode 100644
index 0000000000..fc5e093d7e
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/CatalogSync.java
@@ -0,0 +1,32 @@
+package org.apache.hudi.sync.common.operation;
+
+import org.apache.parquet.schema.MessageType;
+
+import java.util.Map;
+
+public interface CatalogSync {
+ /**
+ * Create the table.
+ *
+ * @param tableName The table name.
+ * @param storageSchema The table schema.
+ * @param inputFormatClass The input format class of this table.
+ * @param outputFormatClass The output format class of this table.
+ * @param serdeClass The serde class of this table.
+ * @param serdeProperties The serde properties of this table.
+ * @param tableProperties The table properties for this table.
+ */
+ void createTable(String tableName, MessageType storageSchema,
+ String inputFormatClass, String outputFormatClass,
+ String serdeClass, Map<String, String> serdeProperties,
+ Map<String, String> tableProperties);
+
+ /**
+ * @deprecated Use {@link #tableExists} instead.
+ */
+ @Deprecated
+ boolean doesTableExist(String tableName);
+
+ boolean tableExists(String tableName);
+
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/PartitionsSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/PartitionsSync.java
new file mode 100644
index 0000000000..c5b1edd4cd
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/PartitionsSync.java
@@ -0,0 +1,11 @@
+package org.apache.hudi.sync.common.operation;
+
+import java.util.List;
+
+public interface PartitionsSync {
+ void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
+
+ void updatePartitionsToTable(String tableName, List<String> changedPartitions);
+
+ void dropPartitions(String tableName, List<String> partitionsToDrop);
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/ReplicatedTimeSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/ReplicatedTimeSync.java
new file mode 100644
index 0000000000..a93a9481ce
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/ReplicatedTimeSync.java
@@ -0,0 +1,11 @@
+package org.apache.hudi.sync.common.operation;
+
+import org.apache.hudi.common.util.Option;
+
+public interface ReplicatedTimeSync {
+ Option<String> getLastReplicatedTime(String tableName);
+
+ void updateLastReplicatedTimeStamp(String tableName, String timeStamp);
+
+ void deleteLastReplicatedTimeStamp(String tableName);
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/TblPropertiesSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/TblPropertiesSync.java
new file mode 100644
index 0000000000..c145e95532
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/TblPropertiesSync.java
@@ -0,0 +1,13 @@
+package org.apache.hudi.sync.common.operation;
+
+import org.apache.hudi.common.util.Option;
+
+import java.util.Map;
+
+public interface TblPropertiesSync {
+ Option<String> getLastCommitTimeSynced(String tableName);
+
+ void updateLastCommitTimeSynced(String tableName);
+
+ void updateTableProperties(String tableName, Map<String, String> tableProperties);
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java
similarity index 63%
rename from hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java
rename to hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java
index 972ae1f96c..bcf572e088 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java
@@ -1,29 +1,6 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+package org.apache.hudi.sync.common.util;
-package org.apache.hudi.sync.common;
-
-import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.sync.common.util.ConfigUtils;
-import org.apache.hudi.sync.common.util.Parquet2SparkSchemaUtils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
@@ -33,40 +10,18 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-/**
- * Base class to sync Hudi meta data with Metastores to make
- * Hudi table queryable through external systems.
- */
-public abstract class AbstractSyncTool {
- protected final Configuration conf;
- protected final FileSystem fs;
- protected TypedProperties props;
-
- public AbstractSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
- this.props = props;
- this.conf = conf;
- this.fs = fs;
- }
-
- @Deprecated
- public AbstractSyncTool(Properties props, FileSystem fileSystem) {
- this(new TypedProperties(props), fileSystem.getConf(), fileSystem);
- }
-
- public abstract void syncHoodieTable();
-
+public class SparkDataSourceTableUtils {
/**
* Get Spark Sql related table properties. This is used for spark datasource table.
* @param schema The schema to write to the table.
* @return A new parameters added the spark's table properties.
*/
- protected Map<String, String> getSparkTableProperties(List<String> partitionNames, String sparkVersion,
- int schemaLengthThreshold, MessageType schema) {
+ public static Map<String, String> getSparkTableProperties(List<String> partitionNames, String sparkVersion,
+ int schemaLengthThreshold, MessageType schema) {
// Convert the schema and partition info used by spark sql to hive table properties.
// The following code refers to the spark code in
// https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -122,7 +77,7 @@ public abstract class AbstractSyncTool {
return sparkProperties;
}
- protected Map<String, String> getSparkSerdeProperties(boolean readAsOptimized, String basePath) {
+ public static Map<String, String> getSparkSerdeProperties(boolean readAsOptimized, String basePath) {
Map<String, String> sparkSerdeProperties = new HashMap<>();
sparkSerdeProperties.put("path", basePath);
sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index def85c5b80..39cc56c04c 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -22,7 +22,7 @@ package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hadoop.conf.Configuration;
@@ -39,7 +39,7 @@ public class SyncUtilHelpers {
private static final Logger LOG = LogManager.getLogger(SyncUtilHelpers.class);
/**
- * Create an instance of an implementation of {@link AbstractSyncTool} that will sync all the relevant meta information
+ * Create an instance of an implementation of {@link HoodieSyncTool} that will sync all the relevant meta information
* with an external metastore such as Hive etc. to ensure Hoodie tables can be queried or read via external systems.
*
* @param metaSyncFQCN The class that implements the sync of the metadata.
@@ -62,12 +62,12 @@ public class SyncUtilHelpers {
}
}
- static AbstractSyncTool instantiateMetaSyncTool(String metaSyncFQCN,
- TypedProperties props,
- Configuration hadoopConfig,
- FileSystem fs,
- String targetBasePath,
- String baseFileFormat) {
+ static HoodieSyncTool instantiateMetaSyncTool(String metaSyncFQCN,
+ TypedProperties props,
+ Configuration hadoopConfig,
+ FileSystem fs,
+ String targetBasePath,
+ String baseFileFormat) {
TypedProperties properties = new TypedProperties();
properties.putAll(props);
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), targetBasePath);
@@ -75,13 +75,13 @@ public class SyncUtilHelpers {
if (ReflectionUtils.hasConstructor(metaSyncFQCN,
new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class})) {
- return ((AbstractSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
+ return ((HoodieSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class},
properties, hadoopConfig, fs));
} else {
LOG.warn("Falling back to deprecated constructor for class: " + metaSyncFQCN);
try {
- return ((AbstractSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
+ return ((HoodieSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
new Class<?>[] {Properties.class, FileSystem.class}, properties, fs));
} catch (Throwable t) {
throw new HoodieException("Could not load meta sync class " + metaSyncFQCN, t);
diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
index dc9dee8b42..4e8757d142 100644
--- a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
+++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
@@ -20,7 +20,7 @@ package org.apache.hudi.sync.common.util;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -48,7 +48,7 @@ public class TestSyncUtilHelpers {
@Test
public void testCreateValidSyncClass() {
- AbstractSyncTool metaSyncTool = SyncUtilHelpers.instantiateMetaSyncTool(
+ HoodieSyncTool metaSyncTool = SyncUtilHelpers.instantiateMetaSyncTool(
ValidMetaSyncClass.class.getName(),
new TypedProperties(),
hadoopConf,
@@ -60,13 +60,13 @@ public class TestSyncUtilHelpers {
}
/**
- * Ensure it still works for the deprecated constructor of {@link AbstractSyncTool}
+ * Ensure it still works for the deprecated constructor of {@link HoodieSyncTool}
* as we implemented the fallback.
*/
@Test
public void testCreateDeprecatedSyncClass() {
Properties properties = new Properties();
- AbstractSyncTool deprecatedMetaSyncClass = SyncUtilHelpers.instantiateMetaSyncTool(
+ HoodieSyncTool deprecatedMetaSyncClass = SyncUtilHelpers.instantiateMetaSyncTool(
DeprecatedMetaSyncClass.class.getName(),
new TypedProperties(properties),
hadoopConf,
@@ -95,7 +95,7 @@ public class TestSyncUtilHelpers {
}
- public static class ValidMetaSyncClass extends AbstractSyncTool {
+ public static class ValidMetaSyncClass extends HoodieSyncTool {
public ValidMetaSyncClass(TypedProperties props, Configuration conf, FileSystem fs) {
super(props, conf, fs);
}
@@ -106,7 +106,7 @@ public class TestSyncUtilHelpers {
}
}
- public static class DeprecatedMetaSyncClass extends AbstractSyncTool {
+ public static class DeprecatedMetaSyncClass extends HoodieSyncTool {
public DeprecatedMetaSyncClass(Properties props, FileSystem fileSystem) {
super(props, fileSystem);
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 736e416162..ef03079d05 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -60,6 +60,7 @@ import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.sync.common.SupportMetaSync;
import org.apache.hudi.sync.common.util.SyncUtilHelpers;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
@@ -122,7 +123,7 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
/**
* Sync's one batch of data to hoodie table.
*/
-public class DeltaSync implements Serializable {
+public class DeltaSync implements SupportMetaSync, Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
@@ -629,7 +630,7 @@ public class DeltaSync implements Serializable {
}
if (!isEmpty) {
- syncMeta(metrics);
+ runMetaSync();
}
} else {
LOG.info("Commit " + instantTime + " failed!");
@@ -690,7 +691,7 @@ public class DeltaSync implements Serializable {
return syncClassName.substring(syncClassName.lastIndexOf(".") + 1);
}
- private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
+ public void runMetaSync() {
Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(",")));
// for backward compatibility
if (cfg.enableHiveSync) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index ae38968187..bd979b5f62 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -1355,13 +1355,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
// Test Hive integration
HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
- hiveSyncConfig.partitionFields = CollectionUtils.createImmutableList("year", "month", "day");
+ hiveSyncConfig.hoodieSyncConfigParams.partitionFields = CollectionUtils.createImmutableList("year", "month", "day");
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
- assertTrue(hiveClient.tableExists(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist");
- assertEquals(3, hiveClient.getAllPartitions(hiveSyncConfig.tableName).size(),
+ assertTrue(hiveClient.tableExists(hiveSyncConfig.hoodieSyncConfigParams.tableName), "Table " + hiveSyncConfig.hoodieSyncConfigParams.tableName + " should exist");
+ assertEquals(3, hiveClient.getAllPartitions(hiveSyncConfig.hoodieSyncConfigParams.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(lastInstantForUpstreamTable,
- hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+ hiveClient.getLastCommitTimeSynced(hiveSyncConfig.hoodieSyncConfigParams.tableName).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 7df6e11014..fec1ed6511 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -184,15 +184,15 @@ public class UtilitiesTestBase {
*/
protected static HiveSyncConfig getHiveSyncConfig(String basePath, String tableName) {
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
- hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
- hiveSyncConfig.hiveUser = "";
- hiveSyncConfig.hivePass = "";
- hiveSyncConfig.databaseName = "testdb1";
- hiveSyncConfig.tableName = tableName;
- hiveSyncConfig.basePath = basePath;
- hiveSyncConfig.assumeDatePartitioning = false;
- hiveSyncConfig.usePreApacheInputFormat = false;
- hiveSyncConfig.partitionFields = CollectionUtils.createImmutableList("datestr");
+ hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
+ hiveSyncConfig.hiveSyncConfigParams.hiveUser = "";
+ hiveSyncConfig.hiveSyncConfigParams.hivePass = "";
+ hiveSyncConfig.hoodieSyncConfigParams.databaseName = "testdb1";
+ hiveSyncConfig.hoodieSyncConfigParams.tableName = tableName;
+ hiveSyncConfig.hoodieSyncConfigParams.basePath = basePath;
+ hiveSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning = false;
+ hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = false;
+ hiveSyncConfig.hoodieSyncConfigParams.partitionFields = CollectionUtils.createImmutableList("datestr");
return hiveSyncConfig;
}
@@ -208,12 +208,12 @@ public class UtilitiesTestBase {
hiveConf.addResource(hiveServer.getHiveConf());
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
- .setTableName(hiveSyncConfig.tableName)
- .initTable(dfs.getConf(), hiveSyncConfig.basePath);
+ .setTableName(hiveSyncConfig.hoodieSyncConfigParams.tableName)
+ .initTable(dfs.getConf(), hiveSyncConfig.hoodieSyncConfigParams.basePath);
QueryBasedDDLExecutor ddlExecutor = new JDBCExecutor(hiveSyncConfig, dfs);
- ddlExecutor.runSQL("drop database if exists " + hiveSyncConfig.databaseName);
- ddlExecutor.runSQL("create database " + hiveSyncConfig.databaseName);
+ ddlExecutor.runSQL("drop database if exists " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
+ ddlExecutor.runSQL("create database " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
ddlExecutor.close();
}