You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/17 03:42:25 UTC

incubator-gobblin git commit: [GOBBLIN-463] Change lineage event for Avro2Orc conversion to have underlying FileSystem as platform

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master fc389522a -> 1c03ea22f


[GOBBLIN-463] Change lineage event for Avro2Orc conversion to have underlying FileSystem as platform

Closes #2340 from eogren/aditya-branch


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1c03ea22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1c03ea22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1c03ea22

Branch: refs/heads/master
Commit: 1c03ea22fa21f44000e7aa73b142fc92c5fc2ba2
Parents: fc38952
Author: Aditya Sharma <ad...@linkedin.com>
Authored: Mon Apr 16 20:42:18 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Apr 16 20:42:18 2018 -0700

----------------------------------------------------------------------
 .../gobblin/dataset/DatasetConstants.java       |  4 ++
 .../dataset/HiveToHdfsDatasetResolver.java      | 50 +++++++++++++++++
 .../HiveToHdfsDatasetResolverFactory.java       | 28 ++++++++++
 .../hive/dataset/ConvertibleHiveDataset.java    | 50 +++++++++++++++++
 .../hive/publisher/HiveConvertPublisher.java    | 32 +++++++++++
 .../hive/source/HiveAvroToOrcSource.java        | 32 ++++++++++-
 .../conversion/hive/source/HiveSource.java      | 54 +-----------------
 .../conversion/hive/utils/LineageUtils.java     | 45 +++++++++++++++
 .../dataset/ConvertibleHiveDatasetTest.java     | 59 ++++++++++++++------
 9 files changed, 283 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
index 03b7fcb..d704525 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
@@ -37,4 +37,8 @@ public class DatasetConstants {
 
   /** JDBC metadata */
   public static final String CONNECTION_URL = "connectionUrl";
+
+  /** FileSystem scheme and location */
+  public static final String FS_SCHEME = "fsScheme";
+  public static final String FS_LOCATION = "fsLocation";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java
new file mode 100644
index 0000000..0130271
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolver.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import org.apache.gobblin.configuration.State;
+
+
+/**
+ * Singleton {@link DatasetResolver} to convert a Hive {@link DatasetDescriptor} to HDFS {@link DatasetDescriptor}
+ */
+public class HiveToHdfsDatasetResolver implements DatasetResolver {
+  public static final String HIVE_TABLE = "hiveTable";
+  public static final HiveToHdfsDatasetResolver INSTANCE = new HiveToHdfsDatasetResolver();
+
+  private HiveToHdfsDatasetResolver() {
+    // To make it singleton
+  }
+
+  @Override
+  public DatasetDescriptor resolve(DatasetDescriptor raw, State state) {
+    ImmutableMap<String, String> metadata = raw.getMetadata();
+    Preconditions.checkArgument(metadata.containsKey(DatasetConstants.FS_SCHEME),
+        String.format("Hive Dataset Descriptor must contain metadata %s to create Hdfs Dataset Descriptor",
+            DatasetConstants.FS_SCHEME));
+    Preconditions.checkArgument(metadata.containsKey(DatasetConstants.FS_SCHEME),
+        String.format("Hive Dataset Descriptor must contain metadata %s to create Hdfs Dataset Descriptor",
+            DatasetConstants.FS_LOCATION));
+    DatasetDescriptor datasetDescriptor =
+        new DatasetDescriptor(metadata.get(DatasetConstants.FS_SCHEME), metadata.get(DatasetConstants.FS_LOCATION));
+    datasetDescriptor.addMetadata(HIVE_TABLE, raw.getName());
+    return datasetDescriptor;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java
new file mode 100644
index 0000000..acd2212
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/HiveToHdfsDatasetResolverFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.dataset;
+
+import com.typesafe.config.Config;
+
+
+public class HiveToHdfsDatasetResolverFactory implements DatasetResolverFactory {
+  @Override
+  public DatasetResolver createResolver(Config config) {
+    return HiveToHdfsDatasetResolver.INSTANCE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
index f4c8744..63f1bee 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
@@ -16,6 +16,9 @@
  */
 package org.apache.gobblin.data.management.conversion.hive.dataset;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -25,7 +28,11 @@ import lombok.ToString;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.Table;
 
 import com.google.common.base.Optional;
@@ -74,6 +81,14 @@ public class ConvertibleHiveDataset extends HiveDataset {
   // Mapping for destination format to it's Conversion config
   private final Map<String, ConversionConfig> destConversionConfigs;
 
+  // Source Dataset Descriptor
+  @Getter
+  private final DatasetDescriptor sourceDataset;
+
+  // List of destination Dataset Descriptor
+  @Getter
+  private final List<DatasetDescriptor> destDatasets;
+
   /**
    * <ul>
    *  <li> The constructor takes in a dataset {@link Config} which MUST have a comma separated list of destination formats at key,
@@ -112,6 +127,41 @@ public class ConvertibleHiveDataset extends HiveDataset {
 
       }
     }
+    this.sourceDataset = createSourceDataset();
+    this.destDatasets = createDestDatasets();
+  }
+
+  private List<DatasetDescriptor> createDestDatasets() {
+    List<DatasetDescriptor> destDatasets = new ArrayList<>();
+    for (String format : getDestFormats()) {
+      Optional<ConversionConfig> conversionConfigForFormat = getConversionConfigForFormat(format);
+      if (!conversionConfigForFormat.isPresent()) {
+        continue;
+      }
+      String destTable = conversionConfigForFormat.get().getDestinationDbName() + "." + conversionConfigForFormat.get()
+          .getDestinationTableName();
+      DatasetDescriptor dest = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destTable);
+      String destLocation = conversionConfigForFormat.get().getDestinationDataPath() + Path.SEPARATOR + "final";
+      dest.addMetadata(DatasetConstants.FS_SCHEME, getSourceDataset().getMetadata().get(DatasetConstants.FS_SCHEME));
+      dest.addMetadata(DatasetConstants.FS_LOCATION, destLocation);
+      destDatasets.add(dest);
+    }
+    return destDatasets;
+  }
+
+  private DatasetDescriptor createSourceDataset() {
+    try {
+      String sourceTable = getTable().getDbName() + "." + getTable().getTableName();
+      DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
+      Path sourcePath = getTable().getDataLocation();
+      String sourceLocation = Path.getPathWithoutSchemeAndAuthority(sourcePath).toString();
+      FileSystem sourceFs = sourcePath.getFileSystem(new Configuration());
+      source.addMetadata(DatasetConstants.FS_SCHEME, sourceFs.getScheme());
+      source.addMetadata(DatasetConstants.FS_LOCATION, sourceLocation);
+      return source;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
index ff4d9e9..f87a6a0 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java
@@ -31,6 +31,13 @@ import javax.annotation.Nonnull;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveAvroToOrcSource;
+import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -90,6 +97,7 @@ public class HiveConvertPublisher extends DataPublisher {
   private final FileSystem fs;
   private final HiveSourceWatermarker watermarker;
   private final HiveMetastoreClientPool pool;
+  private final Optional<LineageInfo> lineageInfo;
 
   public static final String PARTITION_PARAMETERS_WHITELIST = "hive.conversion.partitionParameters.whitelist";
   public static final String PARTITION_PARAMETERS_BLACKLIST = "hive.conversion.partitionParameters.blacklist";
@@ -105,6 +113,15 @@ public class HiveConvertPublisher extends DataPublisher {
     this.metricContext = Instrumented.getMetricContext(state, HiveConvertPublisher.class);
     this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, EventConstants.CONVERSION_NAMESPACE).build();
 
+    // Extract LineageInfo from state
+    if (state instanceof SourceState) {
+      lineageInfo = LineageInfo.getLineageInfo(((SourceState) state).getBroker());
+    } else if (state instanceof WorkUnitState) {
+      lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) state).getTaskBrokerNullable());
+    } else {
+      lineageInfo = Optional.absent();
+    }
+
     Configuration conf = new Configuration();
     Optional<String> uri = Optional.fromNullable(this.state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI));
     if (uri.isPresent()) {
@@ -226,6 +243,10 @@ public class HiveConvertPublisher extends DataPublisher {
             } catch (Exception e) {
               log.error("Failed while emitting SLA event, but ignoring and moving forward to curate " + "all clean up commands", e);
             }
+            if (LineageUtils.shouldSetLineageInfo(wus.getWorkunit())) {
+              setDestLineageInfo(wus.getWorkunit(),
+                  (ConvertibleHiveDataset) ((HiveWorkUnit) wus.getWorkunit()).getHiveDataset(), this.lineageInfo);
+            }
           }
         }
       }
@@ -254,6 +275,17 @@ public class HiveConvertPublisher extends DataPublisher {
   }
 
   @VisibleForTesting
+  public void setDestLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset convertibleHiveDataset,
+      Optional<LineageInfo> lineageInfo) {
+    List<DatasetDescriptor> destDatasets = convertibleHiveDataset.getDestDatasets();
+    for (int i = 0; i < destDatasets.size(); i++) {
+      if (lineageInfo.isPresent()) {
+        lineageInfo.get().putDestination(destDatasets.get(i), i + 1, workUnit);
+      }
+    }
+  }
+
+  @VisibleForTesting
   public void preservePartitionParams(Collection<? extends WorkUnitState> states) {
     for (WorkUnitState wus : states) {
       if (wus.getWorkingState() != WorkingState.COMMITTED) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
index 89615e9..4659aad 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveAvroToOrcSource.java
@@ -16,18 +16,26 @@
  */
 package org.apache.gobblin.data.management.conversion.hive.source;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import java.util.List;
 
 import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
 import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder;
+import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils;
+import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.workunit.WorkUnit;
 
 /**
  * An extension to {@link HiveSource} that is used for Avro to ORC conversion jobs.
  */
 public class HiveAvroToOrcSource extends HiveSource {
-
+  private Optional<LineageInfo> lineageInfo;
   @Override
   public List<WorkUnit> getWorkunits(SourceState state) {
     if (!state.contains(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY)) {
@@ -36,8 +44,26 @@ public class HiveAvroToOrcSource extends HiveSource {
     if (!state.contains(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY)) {
       state.setProp(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, "hive.conversion.avro");
     }
+    this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
+
+    List<WorkUnit> workunits = super.getWorkunits(state);
+    for (WorkUnit workUnit : workunits) {
+      if (LineageUtils.shouldSetLineageInfo(workUnit)) {
+        setSourceLineageInfo(workUnit, (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(),
+            this.lineageInfo);
+      }
+    }
+    return workunits;
+  }
 
-    return super.getWorkunits(state);
+  @VisibleForTesting
+  public void setSourceLineageInfo(WorkUnit workUnit, ConvertibleHiveDataset hiveDataset,
+      Optional<LineageInfo> lineageInfo) {
+    DatasetDescriptor sourceDataset = hiveDataset.getSourceDataset();
+    if (lineageInfo.isPresent()) {
+      lineageInfo.get().setSource(sourceDataset, workUnit);
+    }
   }
 
-}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index 4cee48f..3ad99fd 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -27,13 +27,6 @@ import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
-import org.apache.gobblin.broker.iface.SharedResourcesBroker;
-import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
-import org.apache.gobblin.dataset.DatasetConstants;
-import org.apache.gobblin.dataset.DatasetDescriptor;
-import org.apache.gobblin.metrics.event.lineage.LineageInfo;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -155,7 +148,6 @@ public class HiveSource implements Source {
   protected long maxLookBackTime;
   protected long beginGetWorkunitsTime;
   protected List<String> ignoreDataPathIdentifierList;
-  protected SharedResourcesBroker<GobblinScopeTypes> sharedJobBroker;
 
   protected final ClassAliasResolver<HiveBaseExtractorFactory> classAliasResolver =
       new ClassAliasResolver<>(HiveBaseExtractorFactory.class);
@@ -222,7 +214,6 @@ public class HiveSource implements Source {
     this.maxLookBackTime = new DateTime().minusDays(maxLookBackDays).getMillis();
     this.ignoreDataPathIdentifierList = COMMA_BASED_SPLITTER.splitToList(state.getProp(HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER_KEY,
         DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER));
-    this.sharedJobBroker = state.getBroker();
 
     silenceHiveLoggers();
   }
@@ -261,10 +252,7 @@ public class HiveSource implements Source {
 
         EventWorkunitUtils.setTableSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), updateTime, lowWatermark.getValue(),
             this.beginGetWorkunitsTime);
-        if (hiveDataset instanceof ConvertibleHiveDataset) {
-          setLineageInfo((ConvertibleHiveDataset) hiveDataset, hiveWorkUnit, this.sharedJobBroker);
-          log.info("Added lineage event for dataset " + hiveDataset.getUrn());
-        }
+
         this.workunits.add(hiveWorkUnit);
         log.debug(String.format("Workunit added for table: %s", hiveWorkUnit));
 
@@ -293,7 +281,7 @@ public class HiveSource implements Source {
   }
 
   protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client) throws IOException {
-    boolean setLineageInfo = false;
+
     long tableProcessTime = new DateTime().getMillis();
     this.watermarker.onTableProcessBegin(hiveDataset.getTable(), tableProcessTime);
 
@@ -341,12 +329,7 @@ public class HiveSource implements Source {
 
           EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), sourcePartition, updateTime,
               lowWatermark.getValue(), this.beginGetWorkunitsTime);
-          if (hiveDataset instanceof ConvertibleHiveDataset && !setLineageInfo) {
-            setLineageInfo((ConvertibleHiveDataset) hiveDataset, hiveWorkUnit, this.sharedJobBroker);
-            log.info("Added lineage event for dataset " + hiveDataset.getUrn());
-            // Add lineage information only once per hive table
-            setLineageInfo = true;
-          }
+
           workunits.add(hiveWorkUnit);
           log.info(String.format("Creating workunit for partition %s as updateTime %s is greater than low watermark %s",
               sourcePartition.getCompleteName(), updateTime, lowWatermark.getValue()));
@@ -491,35 +474,4 @@ public class HiveSource implements Source {
   private boolean isAvro(Table table) {
     return AvroSerDe.class.getName().equals(table.getSd().getSerdeInfo().getSerializationLib());
   }
-
-  public static void setLineageInfo(ConvertibleHiveDataset convertibleHiveDataset, WorkUnit workUnit,
-      SharedResourcesBroker<GobblinScopeTypes> sharedJobBroker)
-      throws IOException {
-    String sourceTable =
-        convertibleHiveDataset.getTable().getDbName() + "." + convertibleHiveDataset.getTable().getTableName();
-    DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, sourceTable);
-    source.addMetadata(DatasetConstants.FS_URI,
-        convertibleHiveDataset.getTable().getDataLocation().getFileSystem(new Configuration()).getUri().toString());
-
-    int virtualBranch = 0;
-    for (String format : convertibleHiveDataset.getDestFormats()) {
-      ++virtualBranch;
-      Optional<ConvertibleHiveDataset.ConversionConfig> conversionConfigForFormat =
-          convertibleHiveDataset.getConversionConfigForFormat(format);
-      Optional<LineageInfo> lineageInfo = LineageInfo.getLineageInfo(sharedJobBroker);
-      if (!lineageInfo.isPresent()) {
-        continue;
-      } else if (!conversionConfigForFormat.isPresent()) {
-        continue;
-      }
-      String destTable = conversionConfigForFormat.get().getDestinationDbName() + "." + conversionConfigForFormat.get()
-          .getDestinationTableName();
-      DatasetDescriptor dest = new DatasetDescriptor(DatasetConstants.PLATFORM_HIVE, destTable);
-      Path destPath = new Path(conversionConfigForFormat.get().getDestinationDataPath());
-      dest.addMetadata(DatasetConstants.FS_URI, destPath.getFileSystem(new Configuration()).getUri().toString());
-
-      lineageInfo.get().setSource(source, workUnit);
-      lineageInfo.get().putDestination(dest, virtualBranch, workUnit);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
new file mode 100644
index 0000000..249359b
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/utils/LineageUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.data.management.conversion.hive.utils;
+
+import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
+import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * Utility functions for tracking lineage in hive conversion workflows
+ */
+public class LineageUtils {
+  public static boolean shouldSetLineageInfo(WorkUnit workUnit) {
+    if (!(workUnit instanceof HiveWorkUnit)) {
+      return false;
+    }
+    HiveWorkUnit hiveWorkUnit = (HiveWorkUnit) workUnit;
+    if (hiveWorkUnit.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY, false)) {
+      return false;
+    }
+    HiveDataset hiveDataset = hiveWorkUnit.getHiveDataset();
+    return hiveDataset instanceof ConvertibleHiveDataset;
+  }
+
+  private LineageUtils() {
+    // cant instantiate
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1c03ea22/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
index c399264..b1a38f0 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.data.management.conversion.hive.dataset;
 
+import com.google.common.base.Optional;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -27,13 +28,18 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
-import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
-import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.conversion.hive.publisher.HiveConvertPublisher;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveAvroToOrcSource;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
+import org.apache.gobblin.data.management.conversion.hive.utils.LineageUtils;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.HiveToHdfsDatasetResolver;
+import org.apache.gobblin.dataset.HiveToHdfsDatasetResolverFactory;
 import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
-import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.mockito.Mockito;
@@ -56,39 +62,57 @@ import static org.mockito.Mockito.when;
 
 @Test(groups = { "gobblin.data.management.conversion" })
 public class ConvertibleHiveDatasetTest {
-
+  /**
+   * Test if lineage information is properly set in the workunit for convertible hive datasets
+   */
   @Test
-  public void testLineageInfo()
-      throws Exception {
+  public void testLineageInfo() throws Exception {
     String testConfFilePath = "convertibleHiveDatasetTest/flattenedAndNestedOrc.conf";
     Config config = ConfigFactory.parseResources(testConfFilePath).getConfig("hive.conversion.avro");
-    WorkUnit workUnit = WorkUnit.createEmpty();
+    // Set datasetResolverFactory to convert Hive Lineage event to Hdfs Lineage event
     Gson GSON = new Gson();
-    HiveSource.setLineageInfo(createTestConvertibleDataset(config), workUnit, getSharedJobBroker());
+    ConvertibleHiveDataset testConvertibleDataset = createTestConvertibleDataset(config);
+    HiveWorkUnit workUnit = new HiveWorkUnit(testConvertibleDataset);
+    workUnit.setProp("gobblin.broker.lineageInfo.datasetResolverFactory",
+        HiveToHdfsDatasetResolverFactory.class.getName());
+    workUnit.setProp(ConfigurationKeys.JOB_ID_KEY, "123456");
+    Optional<LineageInfo> lineageInfo = LineageInfo.getLineageInfo(getSharedJobBroker(workUnit.getProperties()));
+    HiveAvroToOrcSource src = new HiveAvroToOrcSource();
+    Assert.assertTrue(LineageUtils.shouldSetLineageInfo(workUnit));
+    src.setSourceLineageInfo(workUnit,
+      (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo);
+    new HiveConvertPublisher(workUnit).setDestLineageInfo(workUnit,
+      (ConvertibleHiveDataset) ((HiveWorkUnit) workUnit).getHiveDataset(), lineageInfo);
+
     Properties props = workUnit.getSpecProperties();
     // Asset that lineage name is correct
-    Assert.assertEquals(props.getProperty("gobblin.event.lineage.name"), "db1.tb1");
+    Assert.assertEquals(props.getProperty("gobblin.event.lineage.name"), "/tmp/test");
 
     // Assert that source is correct for lineage event
     Assert.assertTrue(props.containsKey("gobblin.event.lineage.source"));
     DatasetDescriptor sourceDD =
         GSON.fromJson(props.getProperty("gobblin.event.lineage.source"), DatasetDescriptor.class);
-    Assert.assertEquals(sourceDD.getPlatform(), DatasetConstants.PLATFORM_HIVE);
-    Assert.assertEquals(sourceDD.getName(), "db1.tb1");
+    Assert.assertEquals(sourceDD.getPlatform(), "file");
+    Assert.assertEquals(sourceDD.getName(), "/tmp/test");
+    Assert.assertEquals(sourceDD.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), "db1.tb1");
 
     // Assert that first dest is correct for lineage event
     Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.1.destination"));
     DatasetDescriptor destDD1 =
         GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.1.destination"), DatasetDescriptor.class);
-    Assert.assertEquals(destDD1.getPlatform(), DatasetConstants.PLATFORM_HIVE);
-    Assert.assertEquals(destDD1.getName(), "db1_nestedOrcDb.tb1_nestedOrc");
+    Assert.assertEquals(destDD1.getPlatform(), "file");
+    Assert.assertEquals(destDD1.getName(), "/tmp/data_nestedOrc/db1/tb1/final");
+    Assert.assertEquals(destDD1.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
+        "db1_nestedOrcDb.tb1_nestedOrc");
 
     // Assert that second dest is correct for lineage event
     Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.2.destination"));
     DatasetDescriptor destDD2 =
         GSON.fromJson(props.getProperty("gobblin.event.lineage.branch.2.destination"), DatasetDescriptor.class);
-    Assert.assertEquals(destDD2.getPlatform(), DatasetConstants.PLATFORM_HIVE);
-    Assert.assertEquals(destDD2.getName(), "db1_flattenedOrcDb.tb1_flattenedOrc");
+    Assert.assertEquals(destDD2.getPlatform(), "file");
+    Assert.assertEquals(destDD2.getName(), "/tmp/data_flattenedOrc/db1/tb1/final");
+    Assert.assertEquals(destDD2.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
+        "db1_flattenedOrcDb.tb1_flattenedOrc");
 
     // Assert that there are two eventBuilders for nestedOrc and flattenedOrc
     Collection<LineageEventBuilder> lineageEventBuilders = LineageInfo.load(Collections.singleton(workUnit));
@@ -227,15 +251,16 @@ public class ConvertibleHiveDatasetTest {
     Table table = new Table();
     table.setDbName(dbName);
     table.setTableName(tableName);
+    table.setTableType(TableType.EXTERNAL_TABLE.name());
     StorageDescriptor sd = new StorageDescriptor();
     sd.setLocation("/tmp/test");
     table.setSd(sd);
     return table;
   }
 
-  public static SharedResourcesBroker<GobblinScopeTypes> getSharedJobBroker() {
+  public static SharedResourcesBroker<GobblinScopeTypes> getSharedJobBroker(Properties props) {
     SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory
-        .createDefaultTopLevelBroker(ConfigFactory.empty(), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+        .createDefaultTopLevelBroker(ConfigFactory.parseProperties(props), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
     SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker
         .newSubscopedBuilder(new JobScopeInstance("ConvertibleHiveDatasetLineageEventTest", String.valueOf(System.currentTimeMillis())))
         .build();