You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:31:00 UTC

[42/50] incubator-gobblin git commit: [GOBBLIN-414] Added lineage event for convertible hive datasets

[GOBBLIN-414] Added lineage event for convertible hive datasets

Closes #2290 from aditya1105/metadata


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/faa27f41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/faa27f41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/faa27f41

Branch: refs/heads/0.12.0
Commit: faa27f41f00f1d142c128e13a3da0f8c388d83b9
Parents: 5e6bfb0
Author: aditya1105 <ad...@linkedin.com>
Authored: Thu Mar 1 07:34:58 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Mar 1 07:34:58 2018 -0800

----------------------------------------------------------------------
 .../conversion/hive/source/HiveSource.java      | 54 +++++++++++++++++-
 .../dataset/ConvertibleHiveDatasetTest.java     | 60 ++++++++++++++++++++
 2 files changed, 111 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/faa27f41/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index 3ad99fd..4cee48f 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -27,6 +27,13 @@ import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -148,6 +155,7 @@ public class HiveSource implements Source {
   protected long maxLookBackTime;
   protected long beginGetWorkunitsTime;
   protected List<String> ignoreDataPathIdentifierList;
+  protected SharedResourcesBroker<GobblinScopeTypes> sharedJobBroker;
 
   protected final ClassAliasResolver<HiveBaseExtractorFactory> classAliasResolver =
       new ClassAliasResolver<>(HiveBaseExtractorFactory.class);
@@ -214,6 +222,7 @@ public class HiveSource implements Source {
     this.maxLookBackTime = new DateTime().minusDays(maxLookBackDays).getMillis();
     this.ignoreDataPathIdentifierList = COMMA_BASED_SPLITTER.splitToList(state.getProp(HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER_KEY,
         DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER));
+    this.sharedJobBroker = state.getBroker();
 
     silenceHiveLoggers();
   }
@@ -252,7 +261,10 @@ public class HiveSource implements Source {
 
         EventWorkunitUtils.setTableSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), updateTime, lowWatermark.getValue(),
             this.beginGetWorkunitsTime);
-
+        if (hiveDataset instanceof ConvertibleHiveDataset) {
+          setLineageInfo((ConvertibleHiveDataset) hiveDataset, hiveWorkUnit, this.sharedJobBroker);
+          log.info("Added lineage event for dataset " + hiveDataset.getUrn());
+        }
         this.workunits.add(hiveWorkUnit);
         log.debug(String.format("Workunit added for table: %s", hiveWorkUnit));
 
@@ -281,7 +293,7 @@ public class HiveSource implements Source {
   }
 
   protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client) throws IOException {
-
+    boolean setLineageInfo = false;
     long tableProcessTime = new DateTime().getMillis();
     this.watermarker.onTableProcessBegin(hiveDataset.getTable(), tableProcessTime);
 
@@ -329,7 +341,12 @@ public class HiveSource implements Source {
 
           EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), sourcePartition, updateTime,
               lowWatermark.getValue(), this.beginGetWorkunitsTime);
-
+          if (hiveDataset instanceof ConvertibleHiveDataset && !setLineageInfo) {
+            setLineageInfo((ConvertibleHiveDataset) hiveDataset, hiveWorkUnit, this.sharedJobBroker);
+            log.info("Added lineage event for dataset " + hiveDataset.getUrn());
+            // Add lineage information only once per hive table
+            setLineageInfo = true;
+          }
           workunits.add(hiveWorkUnit);
           log.info(String.format("Creating workunit for partition %s as updateTime %s is greater than low watermark %s",
               sourcePartition.getCompleteName(), updateTime, lowWatermark.getValue()));
@@ -474,4 +491,35 @@ public class HiveSource implements Source {
   private boolean isAvro(Table table) {
     return AvroSerDe.class.getName().equals(table.getSd().getSerdeInfo().getSerializationLib());
   }
+
+  public static void setLineageInfo(ConvertibleHiveDataset convertibleHiveDataset, WorkUnit workUnit,
+      SharedResourcesBroker<GobblinScopeTypes> sharedJobBroker)
+      throws IOException {
+    String sourceTable =
+        convertibleHiveDataset.getTable().getDbName() + "." + convertibleHiveDataset.getTable().getTableName();
+    DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
+    source.addMetadata(DatasetConstants.FS_URI,
+        convertibleHiveDataset.getTable().getDataLocation().getFileSystem(new Configuration()).getUri().toString());
+
+    int virtualBranch = 0;
+    for (String format : convertibleHiveDataset.getDestFormats()) {
+      ++virtualBranch;
+      Optional<ConvertibleHiveDataset.ConversionConfig> conversionConfigForFormat =
+          convertibleHiveDataset.getConversionConfigForFormat(format);
+      Optional<LineageInfo> lineageInfo = LineageInfo.getLineageInfo(sharedJobBroker);
+      if (!lineageInfo.isPresent()) {
+        continue;
+      } else if (!conversionConfigForFormat.isPresent()) {
+        continue;
+      }
+      String destTable = conversionConfigForFormat.get().getDestinationDbName() + "." + conversionConfigForFormat.get()
+          .getDestinationTableName();
+      DatasetDescriptor dest = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destTable);
+      Path destPath = new Path(conversionConfigForFormat.get().getDestinationDataPath());
+      dest.addMetadata(DatasetConstants.FS_URI, destPath.getFileSystem(new Configuration()).getUri().toString());
+
+      lineageInfo.get().setSource(source, workUnit);
+      lineageInfo.get().putDestination(dest, virtualBranch, workUnit);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/faa27f41/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
index 5021d4d..c399264 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
@@ -19,8 +19,20 @@ package org.apache.gobblin.data.management.conversion.hive.dataset;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Properties;
 
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -31,6 +43,7 @@ import org.testng.annotations.Test;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.gson.Gson;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -45,6 +58,44 @@ import static org.mockito.Mockito.when;
 public class ConvertibleHiveDatasetTest {
 
   @Test
+  public void testLineageInfo()
+      throws Exception {
+    String testConfFilePath = "convertibleHiveDatasetTest/flattenedAndNestedOrc.conf";
+    Config config = ConfigFactory.parseResources(testConfFilePath).getConfig("hive.conversion.avro");
+    WorkUnit workUnit = WorkUnit.createEmpty();
+    Gson GSON = new Gson();
+    HiveSource.setLineageInfo(createTestConvertibleDataset(config), workUnit, getSharedJobBroker());
+    Properties props = workUnit.getSpecProperties();
+    // Asset that lineage name is correct
+    Assert.assertEquals(props.getProperty("gobblin.event.lineage.name"), "db1.tb1");
+
+    // Assert that source is correct for lineage event
+    Assert.assertTrue(props.containsKey("gobblin.event.lineage.source"));
+    DatasetDescriptor sourceDD =
+        GSON.fromJson(props.getProperty("gobblin.event.lineage.source"), DatasetDescriptor.class);
+    Assert.assertEquals(sourceDD.getPlatform(), DatasetConstants.PLATFORM_HIVE);
+    Assert.assertEquals(sourceDD.getName(), "db1.tb1");
+
+    // Assert that first dest is correct for lineage event
+    Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.1.destination"));
+    DatasetDescriptor destDD1 =
+        GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.1.destination"), DatasetDescriptor.class);
+    Assert.assertEquals(destDD1.getPlatform(), DatasetConstants.PLATFORM_HIVE);
+    Assert.assertEquals(destDD1.getName(), "db1_nestedOrcDb.tb1_nestedOrc");
+
+    // Assert that second dest is correct for lineage event
+    Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.2.destination"));
+    DatasetDescriptor destDD2 =
+        GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.2.destination"), DatasetDescriptor.class);
+    Assert.assertEquals(destDD2.getPlatform(), DatasetConstants.PLATFORM_HIVE);
+    Assert.assertEquals(destDD2.getName(), "db1_flattenedOrcDb.tb1_flattenedOrc");
+
+    // Assert that there are two eventBuilders for nestedOrc and flattenedOrc
+    Collection<LineageEventBuilder> lineageEventBuilders = LineageInfo.load(Collections.singleton(workUnit));
+    Assert.assertEquals(lineageEventBuilders.size(), 2);
+  }
+
+  @Test
   public void testFlattenedOrcConfig() throws Exception {
     String testConfFilePath = "convertibleHiveDatasetTest/flattenedOrc.conf";
     Config config = ConfigFactory.parseResources(testConfFilePath).getConfig("hive.conversion.avro");
@@ -181,4 +232,13 @@ public class ConvertibleHiveDatasetTest {
     table.setSd(sd);
     return table;
   }
+
+  public static SharedResourcesBroker<GobblinScopeTypes> getSharedJobBroker() {
+    SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory
+        .createDefaultTopLevelBroker(ConfigFactory.empty(), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+    SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker
+        .newSubscopedBuilder(new JobScopeInstance("ConvertibleHiveDatasetLineageEventTest", String.valueOf(System.currentTimeMillis())))
+        .build();
+    return jobBroker;
+  }
 }