You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/13 03:56:55 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-958] make hive
flow edge accept multiple tables
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a714c94 [GOBBLIN-958] make hive flow edge accept multiple tables
a714c94 is described below
commit a714c943bddfe49c19830444434aa64501294b2d
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Nov 12 19:56:47 2019 -0800
[GOBBLIN-958] make hive flow edge accept multiple tables
Closes #2807 from arjun4084346/hiveMultiDataset
---
.../modules/dataset/HiveDatasetDescriptor.java | 70 +++++++++++++++++++---
.../modules/dataset/SqlDatasetDescriptor.java | 4 +-
2 files changed, 65 insertions(+), 9 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
index 3cc2665..ce28efb 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
@@ -18,7 +18,12 @@
package org.apache.gobblin.service.modules.dataset;
import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+import org.apache.hadoop.fs.GlobPattern;
+
+import com.google.common.base.Splitter;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
@@ -27,6 +32,8 @@ import lombok.EqualsAndHashCode;
import org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper;
import org.apache.gobblin.data.management.version.finder.DatePartitionHiveVersionFinder;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+
/**
* As of now, {@link HiveDatasetDescriptor} has same implementation as that of {@link SqlDatasetDescriptor}.
@@ -39,9 +46,11 @@ public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
static final String PARTITION_COLUMN = "partition.column";
static final String PARTITION_FORMAT = "partition.format";
static final String CONFLICT_POLICY = "conflict.policy";
+ static final String WHITELIST_TABLES = "whitelist.tables";
private final boolean isPartitioned;
private final String partitionColumn;
private final String partitionFormat;
+ private final String conflictPolicy;
public HiveDatasetDescriptor(Config config) throws IOException {
super(config);
@@ -50,16 +59,22 @@ public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
if (isPartitioned) {
partitionColumn = ConfigUtils.getString(config, PARTITION_COLUMN, DatePartitionHiveVersionFinder.DEFAULT_PARTITION_KEY_NAME);
partitionFormat = ConfigUtils.getString(config, PARTITION_FORMAT, DatePartitionHiveVersionFinder.DEFAULT_PARTITION_VALUE_DATE_TIME_PATTERN);
- this.setRawConfig(this.getRawConfig().withValue(CONFLICT_POLICY,
- ConfigValueFactory.fromAnyRef(HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_PARTITIONS.name())));
- this.setRawConfig(this.getRawConfig().withValue(PARTITION_COLUMN, ConfigValueFactory.fromAnyRef(partitionColumn)));
- this.setRawConfig(this.getRawConfig().withValue(PARTITION_FORMAT, ConfigValueFactory.fromAnyRef(partitionFormat)));
+ conflictPolicy = HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_PARTITIONS.name();
} else {
partitionColumn = "";
partitionFormat = "";
- this.setRawConfig(this.getRawConfig().withValue(CONFLICT_POLICY,
- ConfigValueFactory.fromAnyRef(HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_TABLE.name())));
+ conflictPolicy = HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_TABLE.name();
}
+ this.setRawConfig(this.getRawConfig()
+ .withValue(CONFLICT_POLICY, ConfigValueFactory.fromAnyRef(conflictPolicy))
+ .withValue(PARTITION_COLUMN, ConfigValueFactory.fromAnyRef(partitionColumn))
+ .withValue(PARTITION_FORMAT, ConfigValueFactory.fromAnyRef(partitionFormat))
+ .withValue(WHITELIST_TABLES, ConfigValueFactory.fromAnyRef(createWhitelistedTables())
+ ));
+ }
+
+ private String createWhitelistedTables() {
+ return this.tableName.replace(',', '|');
}
@Override
@@ -69,6 +84,47 @@ public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
@Override
protected boolean isPathContaining(DatasetDescriptor other) {
- return super.isPathContaining(other) && this.isPartitioned == ((HiveDatasetDescriptor) other).isPartitioned;
+ String otherPath = other.getPath();
+ if (otherPath == null) {
+ return false;
+ }
+
+ if (this.isPartitioned != ((HiveDatasetDescriptor) other).isPartitioned) {
+ return false;
+ }
+
+ //Extract the dbName and tableName from otherPath
+ List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
+ if (parts.size() != 2) {
+ return false;
+ }
+
+ String otherDbName = parts.get(0);
+ String otherTableNames = parts.get(1);
+
+ if(!Pattern.compile(this.databaseName).matcher(otherDbName).matches()) {
+ return false;
+ }
+
+ List<String> tables = Splitter.on(",").splitToList(otherTableNames);
+ for (String table : tables) {
+ if (!isPathContaining(table)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isPathContaining(String tableName) {
+ if (tableName == null) {
+ return false;
+ }
+
+ if (PathUtils.GLOB_TOKENS.matcher(tableName).find()) {
+ return false;
+ }
+
+ GlobPattern globPattern = new GlobPattern(this.tableName);
+ return globPattern.matches(tableName);
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
index 37e3e5a..42b68d9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
@@ -44,8 +44,8 @@ import org.apache.gobblin.util.PathUtils;
public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements DatasetDescriptor {
protected static final String SEPARATION_CHAR = ";";
- private final String databaseName;
- private final String tableName;
+ protected final String databaseName;
+ protected final String tableName;
@Getter
private final String path;