You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/13 03:56:55 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-958] make hive flow edge accept multiple tables

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a714c94  [GOBBLIN-958] make hive flow edge accept multiple tables
a714c94 is described below

commit a714c943bddfe49c19830444434aa64501294b2d
Author: Arjun <ab...@linkedin.com>
AuthorDate: Tue Nov 12 19:56:47 2019 -0800

    [GOBBLIN-958] make hive flow edge accept multiple tables
    
    Closes #2807 from arjun4084346/hiveMultiDataset
---
 .../modules/dataset/HiveDatasetDescriptor.java     | 70 +++++++++++++++++++---
 .../modules/dataset/SqlDatasetDescriptor.java      |  4 +-
 2 files changed, 65 insertions(+), 9 deletions(-)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
index 3cc2665..ce28efb 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
@@ -18,7 +18,12 @@
 package org.apache.gobblin.service.modules.dataset;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
 
+import org.apache.hadoop.fs.GlobPattern;
+
+import com.google.common.base.Splitter;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
@@ -27,6 +32,8 @@ import lombok.EqualsAndHashCode;
 import org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper;
 import org.apache.gobblin.data.management.version.finder.DatePartitionHiveVersionFinder;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+
 
 /**
  * As of now, {@link HiveDatasetDescriptor} has same implementation as that of {@link SqlDatasetDescriptor}.
@@ -39,9 +46,11 @@ public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
   static final String PARTITION_COLUMN = "partition.column";
   static final String PARTITION_FORMAT = "partition.format";
   static final String CONFLICT_POLICY = "conflict.policy";
+  static final String WHITELIST_TABLES = "whitelist.tables";
   private final boolean isPartitioned;
   private final String partitionColumn;
   private final String partitionFormat;
+  private final String conflictPolicy;
 
   public HiveDatasetDescriptor(Config config) throws IOException {
     super(config);
@@ -50,16 +59,22 @@ public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
     if (isPartitioned) {
       partitionColumn = ConfigUtils.getString(config, PARTITION_COLUMN, DatePartitionHiveVersionFinder.DEFAULT_PARTITION_KEY_NAME);
       partitionFormat = ConfigUtils.getString(config, PARTITION_FORMAT, DatePartitionHiveVersionFinder.DEFAULT_PARTITION_VALUE_DATE_TIME_PATTERN);
-      this.setRawConfig(this.getRawConfig().withValue(CONFLICT_POLICY,
-          ConfigValueFactory.fromAnyRef(HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_PARTITIONS.name())));
-      this.setRawConfig(this.getRawConfig().withValue(PARTITION_COLUMN, ConfigValueFactory.fromAnyRef(partitionColumn)));
-      this.setRawConfig(this.getRawConfig().withValue(PARTITION_FORMAT, ConfigValueFactory.fromAnyRef(partitionFormat)));
+      conflictPolicy = HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_PARTITIONS.name();
     } else {
       partitionColumn = "";
       partitionFormat = "";
-      this.setRawConfig(this.getRawConfig().withValue(CONFLICT_POLICY,
-          ConfigValueFactory.fromAnyRef(HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_TABLE.name())));
+      conflictPolicy = HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_TABLE.name();
     }
+    this.setRawConfig(this.getRawConfig()
+        .withValue(CONFLICT_POLICY, ConfigValueFactory.fromAnyRef(conflictPolicy))
+        .withValue(PARTITION_COLUMN, ConfigValueFactory.fromAnyRef(partitionColumn))
+        .withValue(PARTITION_FORMAT, ConfigValueFactory.fromAnyRef(partitionFormat))
+        .withValue(WHITELIST_TABLES, ConfigValueFactory.fromAnyRef(createWhitelistedTables())
+        ));
+  }
+
+  private String createWhitelistedTables() {
+    return this.tableName.replace(',', '|');
   }
 
   @Override
@@ -69,6 +84,47 @@ public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
 
   @Override
   protected boolean isPathContaining(DatasetDescriptor other) {
-    return super.isPathContaining(other) && this.isPartitioned == ((HiveDatasetDescriptor) other).isPartitioned;
+    String otherPath = other.getPath();
+    if (otherPath == null) {
+      return false;
+    }
+
+    if (this.isPartitioned != ((HiveDatasetDescriptor) other).isPartitioned) {
+      return false;
+    }
+
+    //Extract the dbName and tableName from otherPath
+    List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
+    if (parts.size() != 2) {
+      return false;
+    }
+
+    String otherDbName = parts.get(0);
+    String otherTableNames = parts.get(1);
+
+    if(!Pattern.compile(this.databaseName).matcher(otherDbName).matches()) {
+      return false;
+    }
+
+    List<String> tables = Splitter.on(",").splitToList(otherTableNames);
+    for (String table : tables) {
+      if (!isPathContaining(table)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean isPathContaining(String tableName) {
+    if (tableName == null) {
+      return false;
+    }
+
+    if (PathUtils.GLOB_TOKENS.matcher(tableName).find()) {
+      return false;
+    }
+
+    GlobPattern globPattern = new GlobPattern(this.tableName);
+    return globPattern.matches(tableName);
   }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
index 37e3e5a..42b68d9 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
@@ -44,8 +44,8 @@ import org.apache.gobblin.util.PathUtils;
 public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements DatasetDescriptor {
   protected static final String SEPARATION_CHAR = ";";
 
-  private final String databaseName;
-  private final String tableName;
+  protected final String databaseName;
+  protected final String tableName;
 
   @Getter
   private final String path;