You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/09/14 19:31:08 UTC

[1/2] incubator-gobblin git commit: [GOBBLIN-587] Implement partition level lineage for fs based destination

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master e74c8b711 -> ef59a1517


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-utility/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java
deleted file mode 100644
index 17f1ac0..0000000
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.util.io;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-import com.google.gson.Gson;
-
-import org.apache.gobblin.util.test.BaseClass;
-import org.apache.gobblin.util.test.TestClass;
-
-
-public class GsonInterfaceAdapterTest {
-
-  @Test(groups = {"gobblin.util.io"})
-  public void test() {
-    Gson gson = GsonInterfaceAdapter.getGson(Object.class);
-
-    TestClass test = new TestClass();
-    test.absent = Optional.absent();
-    Assert.assertNotEquals(test, new TestClass());
-
-    String ser = gson.toJson(test);
-    BaseClass deser = gson.fromJson(ser, BaseClass.class);
-    Assert.assertEquals(test, deser);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-utility/src/test/java/org/apache/gobblin/util/test/BaseClass.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/BaseClass.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/BaseClass.java
deleted file mode 100644
index 4d1d7c7..0000000
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/BaseClass.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.util.test;
-
-import lombok.EqualsAndHashCode;
-
-import java.util.Random;
-
-
-/**
- * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}.
- */
-@EqualsAndHashCode
-public class BaseClass {
-
-  public BaseClass() {
-    this.field = Integer.toString(new Random().nextInt());
-  }
-
-  private String field;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-utility/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java
deleted file mode 100644
index 32d43cc..0000000
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.util.test;
-
-import lombok.EqualsAndHashCode;
-
-import java.util.Random;
-
-
-/**
- * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}.
- */
-@EqualsAndHashCode(callSuper = true)
-public class ExtendedClass extends BaseClass {
-
-  private final int otherField = new Random().nextInt();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestClass.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestClass.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestClass.java
deleted file mode 100644
index fd1947c..0000000
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/test/TestClass.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.util.test;
-
-import lombok.EqualsAndHashCode;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-
-/**
- * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}.
- */
-@EqualsAndHashCode(callSuper = true)
-public class TestClass extends BaseClass {
-
-  private static final Random random = new Random();
-
-  private final int intValue = random.nextInt();
-  private final long longValue = random.nextLong();
-  private final double doubleValue = random.nextLong();
-  private final Map<String, Integer> map = createRandomMap();
-  private final List<String> list = createRandomList();
-  private final Optional<String> present = Optional.of(Integer.toString(random.nextInt()));
-  // Set manually to absent
-  public Optional<String> absent = Optional.of("a");
-  private final Optional<BaseClass> optionalObject = Optional.of(new BaseClass());
-  private final BaseClass polymorphic = new ExtendedClass();
-  private final Optional<? extends BaseClass> polymorphicOptional = Optional.of(new ExtendedClass());
-
-  private static Map<String, Integer> createRandomMap() {
-    Map<String, Integer> map = Maps.newHashMap();
-    int size = random.nextInt(5);
-    for (int i = 0; i < size; i++) {
-      map.put("value" + random.nextInt(), random.nextInt());
-    }
-    return map;
-  }
-
-  private static List<String> createRandomList() {
-    List<String> list = Lists.newArrayList();
-    int size = random.nextInt(5);
-    for (int i = 0; i < size; i++) {
-      list.add("value" + random.nextInt());
-    }
-    return list;
-  }
-
-}


[2/2] incubator-gobblin git commit: [GOBBLIN-587] Implement partition level lineage for fs based destination

Posted by ab...@apache.org.
[GOBBLIN-587] Implement partition level lineage for fs based destination

Closes #2453 from zxcware/pd


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ef59a151
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ef59a151
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ef59a151

Branch: refs/heads/master
Commit: ef59a1517575f41671b0ec4ffa6ac53b3648e30c
Parents: e74c8b7
Author: zhchen <zh...@linkedin.com>
Authored: Fri Sep 14 12:31:02 2018 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Fri Sep 14 12:31:02 2018 -0700

----------------------------------------------------------------------
 .../org/apache/gobblin/dataset/Descriptor.java  |  71 +++----
 .../gobblin/dataset/PartitionDescriptor.java    |  28 +++
 .../gobblin/util/io/GsonInterfaceAdapter.java   | 195 +++++++++++++++++++
 .../org/apache/gobblin/writer/DataWriter.java   |  23 ++-
 .../apache/gobblin/dataset/DescriptorTest.java  |  13 +-
 .../util/io/GsonInterfaceAdapterTest.java       |  46 +++++
 .../org/apache/gobblin/util/test/BaseClass.java |  37 ++++
 .../apache/gobblin/util/test/ExtendedClass.java |  33 ++++
 .../org/apache/gobblin/util/test/TestClass.java |  69 +++++++
 .../writer/InstrumentedDataWriterDecorator.java |   6 +
 .../gobblin/publisher/BaseDataPublisher.java    |  31 ++-
 .../publisher/TimePartitionedDataPublisher.java |  21 +-
 .../writer/CloseOnFlushWriterWrapper.java       |   6 +
 .../org/apache/gobblin/writer/FsDataWriter.java |  17 ++
 .../gobblin/writer/PartitionedDataWriter.java   |  72 ++++++-
 .../publisher/BaseDataPublisherTest.java        |  86 +++++++-
 .../gobblin/writer/PartitionedWriterTest.java   |  18 ++
 .../test/TestPartitionAwareWriterBuilder.java   |   9 +
 .../dataset/ConvertibleHiveDatasetTest.java     |  25 ++-
 .../apache/gobblin/metrics/MetricContext.java   |   2 +-
 .../event/lineage/LineageEventBuilder.java      |  18 +-
 .../metrics/event/lineage/LineageInfo.java      |  68 ++++---
 .../metrics/event/lineage/LineageEventTest.java |  27 +--
 .../gobblin/azkaban/AzkabanJobLauncher.java     |   8 +-
 .../salesforce/SalesforceSourceTest.java        |   5 +-
 .../gobblin/util/io/GsonInterfaceAdapter.java   | 195 -------------------
 .../util/io/GsonInterfaceAdapterTest.java       |  46 -----
 .../org/apache/gobblin/util/test/BaseClass.java |  37 ----
 .../apache/gobblin/util/test/ExtendedClass.java |  33 ----
 .../org/apache/gobblin/util/test/TestClass.java |  69 -------
 30 files changed, 810 insertions(+), 504 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java
index 2e3daa5..3e30ab8 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java
@@ -18,13 +18,18 @@
 package org.apache.gobblin.dataset;
 
 import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
 
 import com.google.gson.Gson;
-import com.google.gson.JsonObject;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+
 
 /**
  * A descriptor is a simplified representation of a resource, which could be a dataset, dataset partition, file, etc.
@@ -32,11 +37,6 @@ import lombok.RequiredArgsConstructor;
  * primary keys, version, etc
  *
  * <p>
- *   The class provides {@link #serialize(Descriptor)} and {@link #deserialize(String)} util methods pair to send
- *   a descriptor object over the wire
- * </p>
- *
- * <p>
  *   When the original object has complicated inner structure and there is a requirement to send it over the wire,
  *   it's a time to define a corresponding {@link Descriptor} becomes. In this case, the {@link Descriptor} can just
  *   have minimal information enough to construct the original object on the other side of the wire
@@ -52,7 +52,10 @@ import lombok.RequiredArgsConstructor;
 public class Descriptor {
 
   /** Use gson for ser/de */
-  private static final Gson GSON = new Gson();
+  public static final Gson GSON =
+      new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(Descriptor.class)).create();
+  /** Type token for ser/de descriptor list */
+  private static final Type DESCRIPTOR_LIST_TYPE = new TypeToken<ArrayList<Descriptor>>(){}.getType();
 
   /** Name of the resource */
   @Getter
@@ -68,41 +71,41 @@ public class Descriptor {
   }
 
   /**
-   * A helper class for ser/de of a {@link Descriptor}
+   * Serialize any {@link Descriptor} object as json string
+   *
+   * <p>
+   *   Note: it can serialize subclasses
+   * </p>
+   */
+  public static String toJson(Descriptor descriptor) {
+    return GSON.toJson(descriptor);
+  }
+
+  /**
+   * Deserialize the json string to the a {@link Descriptor} object
+   */
+  public static Descriptor fromJson(String json) {
+    return fromJson(json, Descriptor.class);
+  }
+
+  /**
+   * Deserialize the json string to the specified {@link Descriptor} object
    */
-  @RequiredArgsConstructor
-  private static class Wrap {
-    /** The actual class name of the {@link Descriptor} */
-    private final String clazz;
-    /** A json representation of the {@link Descriptor}*/
-    private final JsonObject data;
+  public static <T extends Descriptor> T fromJson(String json, Class<T> clazz) {
+    return GSON.fromJson(json, clazz);
   }
 
   /**
-   * Serialize any {@link Descriptor} object to a string
+   * Serialize a list of descriptors as json string
    */
-  public static String serialize(Descriptor descriptor) {
-    if (descriptor == null) {
-      return GSON.toJson(null);
-    }
-    JsonObject data = GSON.toJsonTree(descriptor).getAsJsonObject();
-    return GSON.toJson(new Wrap(descriptor.getClass().getName(), data));
+  public static String toJson(List<Descriptor> descriptors) {
+    return GSON.toJson(descriptors, DESCRIPTOR_LIST_TYPE);
   }
 
   /**
-   * Deserialize a string, which results from {@link #serialize(Descriptor)}, into the original
-   * {@link Descriptor} object
+   * Deserialize the string, resulted from {@link #toJson(List)}, to a list of descriptors
    */
-  public static Descriptor deserialize(String serialized) {
-    Wrap wrap = GSON.fromJson(serialized, Wrap.class);
-    if (wrap == null) {
-      return null;
-    }
-
-    try {
-      return GSON.fromJson(wrap.data, (Type) Class.forName(wrap.clazz));
-    } catch (ClassNotFoundException e) {
-      return null;
-    }
+  public static List<Descriptor> fromJsonList(String jsonList) {
+    return GSON.fromJson(jsonList, DESCRIPTOR_LIST_TYPE);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java
index f0b4dcf..2a5370e 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java
@@ -17,6 +17,12 @@
 
 package org.apache.gobblin.dataset;
 
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.gson.reflect.TypeToken;
+
 import lombok.Getter;
 
 
@@ -24,6 +30,10 @@ import lombok.Getter;
  * A {@link Descriptor} to identifies a partition of a dataset
  */
 public class PartitionDescriptor extends Descriptor {
+
+  /** Type token for ser/de partition descriptor list */
+  private static final Type DESCRIPTOR_LIST_TYPE = new TypeToken<ArrayList<PartitionDescriptor>>(){}.getType();
+
   @Getter
   private final DatasetDescriptor dataset;
 
@@ -37,6 +47,10 @@ public class PartitionDescriptor extends Descriptor {
     return new PartitionDescriptor(getName(), dataset);
   }
 
+  public PartitionDescriptor copyWithNewDataset(DatasetDescriptor dataset) {
+    return new PartitionDescriptor(getName(), dataset);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -56,4 +70,18 @@ public class PartitionDescriptor extends Descriptor {
     result = 31 * result + getName().hashCode();
     return result;
   }
+
+  /**
+   * Serialize a list of partition descriptors as json string
+   */
+  public static String toPartitionJsonList(List<PartitionDescriptor> descriptors) {
+    return Descriptor.GSON.toJson(descriptors, DESCRIPTOR_LIST_TYPE);
+  }
+
+  /**
+   * Deserialize the string, resulted from {@link #toPartitionJsonList(List)}, to a list of partition descriptors
+   */
+  public static List<PartitionDescriptor> fromPartitionJsonList(String jsonList) {
+    return Descriptor.GSON.fromJson(jsonList, DESCRIPTOR_LIST_TYPE);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java b/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
new file mode 100644
index 0000000..5973aa9
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.io;
+
+import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
+
+import java.io.IOException;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.ParameterizedType;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.TypeAdapter;
+import com.google.gson.TypeAdapterFactory;
+import com.google.gson.internal.Streams;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+
+
+/**
+ * A {@link Gson} interface adapter that makes it possible to serialize and deserialize polymorphic objects.
+ *
+ * <p>
+ *   This adapter will capture all instances of {@link #baseClass} and write them as
+ *   {"object-type":"class.name", "object-data":"data"}, allowing for correct serialization and deserialization of
+ *   polymorphic objects. The following types will not be captured by the adapter (i.e. they will be written by the
+ *   default GSON writer):
+ *   - Primitives and boxed primitives
+ *   - Arrays
+ *   - Collections
+ *   - Maps
+ *   Additionally, generic classes (e.g. class MyClass<T>) cannot be correctly decoded.
+ * </p>
+ *
+ * <p>
+ *   To use:
+ *    <pre>
+ *          {@code
+ *            MyClass object = new MyClass();
+ *            Gson gson = GsonInterfaceAdapter.getGson(MyBaseClass.class);
+ *            String json = gson.toJson(object);
+ *            Myclass object2 = gson.fromJson(json, MyClass.class);
+ *          }
+ *    </pre>
+ * </p>
+ *
+ * <p>
+ *   Note: a useful case is GsonInterfaceAdapter.getGson(Object.class), which will correctly serialize / deserialize
+ *   all types except for java generics.
+ * </p>
+ *
+ * @param <T> The interface or abstract type to be serialized and deserialized with {@link Gson}.
+ */
+@RequiredArgsConstructor
+public class GsonInterfaceAdapter implements TypeAdapterFactory {
+
+  protected static final String OBJECT_TYPE = "object-type";
+  protected static final String OBJECT_DATA = "object-data";
+
+  private final Class<?> baseClass;
+
+  @Override
+  public <R> TypeAdapter<R> create(Gson gson, TypeToken<R> type) {
+    if (ClassUtils.isPrimitiveOrWrapper(type.getRawType()) || type.getType() instanceof GenericArrayType
+        || CharSequence.class.isAssignableFrom(type.getRawType())
+        || (type.getType() instanceof ParameterizedType && (Collection.class.isAssignableFrom(type.getRawType())
+            || Map.class.isAssignableFrom(type.getRawType())))) {
+      // delegate primitives, arrays, collections, and maps
+      return null;
+    }
+    if (!this.baseClass.isAssignableFrom(type.getRawType())) {
+      // delegate anything not assignable from base class
+      return null;
+    }
+    TypeAdapter<R> adapter = new InterfaceAdapter<>(gson, this, type);
+    return adapter;
+  }
+
+  @AllArgsConstructor
+  private static class InterfaceAdapter<R> extends TypeAdapter<R> {
+
+    private final Gson gson;
+    private final TypeAdapterFactory factory;
+    private final TypeToken<R> typeToken;
+
+    @Override
+    public void write(JsonWriter out, R value) throws IOException {
+      if (Optional.class.isAssignableFrom(this.typeToken.getRawType())) {
+        Optional opt = (Optional) value;
+        if (opt != null && opt.isPresent()) {
+          Object actualValue = opt.get();
+          writeObject(actualValue, out);
+        } else {
+          out.beginObject();
+          out.endObject();
+        }
+      } else {
+        writeObject(value, out);
+      }
+    }
+
+    @Override
+    public R read(JsonReader in) throws IOException {
+      JsonElement element = Streams.parse(in);
+      if (element.isJsonNull()) {
+        return readNull();
+      }
+      JsonObject jsonObject = element.getAsJsonObject();
+
+      if (this.typeToken.getRawType() == Optional.class) {
+        if (jsonObject.has(OBJECT_TYPE)) {
+          return (R) Optional.of(readValue(jsonObject, null));
+        } else if (jsonObject.entrySet().isEmpty()) {
+          return (R) Optional.absent();
+        } else {
+          throw new IOException("No class found for Optional value.");
+        }
+      }
+      return this.readValue(jsonObject, this.typeToken);
+    }
+
+    private <S> S readNull() {
+      if (this.typeToken.getRawType() == Optional.class) {
+        return (S) Optional.absent();
+      }
+      return null;
+    }
+
+    private <S> void writeObject(S value, JsonWriter out) throws IOException {
+      if (value != null) {
+        JsonObject jsonObject = new JsonObject();
+        jsonObject.add(OBJECT_TYPE, new JsonPrimitive(value.getClass().getName()));
+        TypeAdapter<S> delegate =
+            (TypeAdapter<S>) this.gson.getDelegateAdapter(this.factory, TypeToken.get(value.getClass()));
+        jsonObject.add(OBJECT_DATA, delegate.toJsonTree(value));
+        Streams.write(jsonObject, out);
+      } else {
+        out.nullValue();
+      }
+    }
+
+    private <S> S readValue(JsonObject jsonObject, TypeToken<S> defaultTypeToken) throws IOException {
+      try {
+        TypeToken<S> actualTypeToken;
+        if (jsonObject.isJsonNull()) {
+          return null;
+        } else if (jsonObject.has(OBJECT_TYPE)) {
+          String className = jsonObject.get(OBJECT_TYPE).getAsString();
+          Class<S> klazz = (Class<S>) Class.forName(className);
+          actualTypeToken = TypeToken.get(klazz);
+        } else if (defaultTypeToken != null) {
+          actualTypeToken = defaultTypeToken;
+        } else {
+          throw new IOException("Could not determine TypeToken.");
+        }
+        TypeAdapter<S> delegate = this.gson.getDelegateAdapter(this.factory, actualTypeToken);
+        S value = delegate.fromJsonTree(jsonObject.get(OBJECT_DATA));
+        return value;
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException(cnfe);
+      }
+    }
+
+  }
+
+  public static <T> Gson getGson(Class<T> clazz) {
+    Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(clazz)).create();
+    return gson;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java b/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
index 18d11d6..400dcdf 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
@@ -21,13 +21,18 @@ import java.io.Closeable;
 import java.io.Flushable;
 import java.io.IOException;
 
+import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.records.ControlMessageHandler;
 import org.apache.gobblin.records.FlushControlMessageHandler;
 import org.apache.gobblin.stream.RecordEnvelope;
 
 
 /**
- * An interface for data writers.
+ * An interface for data writers
+ *
+ * <p>
+ *   Generally, one work unit has a dedicated {@link DataWriter} instance, which processes only one dataset
+ * </p>
  *
  * @param <D> data record type
  *
@@ -77,6 +82,22 @@ public interface DataWriter<D> extends Closeable, Flushable {
       throws IOException;
 
   /**
+   * The method should return a {@link Descriptor} that represents what the writer is writing
+   *
+   * <p>
+   *   Note that, this information might be useless and discarded by a
+   *   {@link org.apache.gobblin.publisher.DataPublisher}, which determines the final form of dataset or partition
+   * </p>
+   *
+   * @return a {@link org.apache.gobblin.dataset.DatasetDescriptor} if it writes files of a dataset or
+   *         a {@link org.apache.gobblin.dataset.PartitionDescriptor} if it writes files of a dataset partition or
+   *         {@code null} if it is useless
+   */
+  default Descriptor getDataDescriptor() {
+    return null;
+  }
+
+  /**
    * Write the input {@link RecordEnvelope}. By default, just call {@link #write(Object)}.
    */
   default void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
index 6ac875b..d7b0409 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
@@ -38,20 +38,15 @@ public class DescriptorTest {
 
   @Test
   public void testPartitionDescriptor() {
-    // Test serialization
-    String partitionJson = "{\"clazz\":\"org.apache.gobblin.dataset.PartitionDescriptor\",\"data\":{\"dataset\":{\"platform\":\"hdfs\",\"metadata\":{},\"name\":\"/data/tracking/PageViewEvent\"},\"name\":\"hourly/2018/08/14/18\"}}";
-
     DatasetDescriptor dataset = new DatasetDescriptor("hdfs", "/data/tracking/PageViewEvent");
     String partitionName = "hourly/2018/08/14/18";
     PartitionDescriptor partition = new PartitionDescriptor(partitionName, dataset);
-    Assert.assertEquals(Descriptor.serialize(partition), partitionJson);
-    System.out.println(partitionJson);
 
-    Descriptor partition2 = Descriptor.deserialize(partitionJson);
+    // Test copy with new dataset
+    DatasetDescriptor dataset2 = new DatasetDescriptor("hive", "/data/tracking/PageViewEvent");
+    Descriptor partition2 = partition.copyWithNewDataset(dataset2);
     Assert.assertEquals(partition2.getName(), partition.getName());
-    Assert.assertEquals(((PartitionDescriptor)partition2).getDataset(), partition.getDataset());
-    Assert.assertEquals(partition, partition2);
-    Assert.assertEquals(partition.hashCode(), partition2.hashCode());
+    Assert.assertEquals(((PartitionDescriptor)partition2).getDataset(), dataset2);
 
     // Test copy
     PartitionDescriptor partition3 = partition.copy();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java b/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java
new file mode 100644
index 0000000..17f1ac0
--- /dev/null
+++ b/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.io;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+
+import org.apache.gobblin.util.test.BaseClass;
+import org.apache.gobblin.util.test.TestClass;
+
+
+public class GsonInterfaceAdapterTest {
+
+  @Test(groups = {"gobblin.util.io"})
+  public void test() {
+    Gson gson = GsonInterfaceAdapter.getGson(Object.class);
+
+    TestClass test = new TestClass();
+    test.absent = Optional.absent();
+    Assert.assertNotEquals(test, new TestClass());
+
+    String ser = gson.toJson(test);
+    BaseClass deser = gson.fromJson(ser, BaseClass.class);
+    Assert.assertEquals(test, deser);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java b/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java
new file mode 100644
index 0000000..4d1d7c7
--- /dev/null
+++ b/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.test;
+
+import lombok.EqualsAndHashCode;
+
+import java.util.Random;
+
+
+/**
+ * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}.
+ */
+@EqualsAndHashCode
+public class BaseClass {
+
+  public BaseClass() {
+    this.field = Integer.toString(new Random().nextInt());
+  }
+
+  private String field;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java b/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java
new file mode 100644
index 0000000..32d43cc
--- /dev/null
+++ b/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.test;
+
+import lombok.EqualsAndHashCode;
+
+import java.util.Random;
+
+
+/**
+ * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}.
+ */
+@EqualsAndHashCode(callSuper = true)
+public class ExtendedClass extends BaseClass {
+
+  private final int otherField = new Random().nextInt();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java b/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java
new file mode 100644
index 0000000..fd1947c
--- /dev/null
+++ b/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.test;
+
+import lombok.EqualsAndHashCode;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+
+/**
+ * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}.
+ */
+@EqualsAndHashCode(callSuper = true)
+public class TestClass extends BaseClass {
+
+  private static final Random random = new Random();
+
+  private final int intValue = random.nextInt();
+  private final long longValue = random.nextLong();
+  private final double doubleValue = random.nextLong();
+  private final Map<String, Integer> map = createRandomMap();
+  private final List<String> list = createRandomList();
+  private final Optional<String> present = Optional.of(Integer.toString(random.nextInt()));
+  // Set manually to absent
+  public Optional<String> absent = Optional.of("a");
+  private final Optional<BaseClass> optionalObject = Optional.of(new BaseClass());
+  private final BaseClass polymorphic = new ExtendedClass();
+  private final Optional<? extends BaseClass> polymorphicOptional = Optional.of(new ExtendedClass());
+
+  private static Map<String, Integer> createRandomMap() {
+    Map<String, Integer> map = Maps.newHashMap();
+    int size = random.nextInt(5);
+    for (int i = 0; i < size; i++) {
+      map.put("value" + random.nextInt(), random.nextInt());
+    }
+    return map;
+  }
+
+  private static List<String> createRandomList() {
+    List<String> list = Lists.newArrayList();
+    int size = random.nextInt(5);
+    for (int i = 0; i < size; i++) {
+      list.add("value" + random.nextInt());
+    }
+    return list;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
index e90f895..efc349f 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
@@ -24,6 +24,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.records.ControlMessageHandler;
@@ -127,6 +128,11 @@ public class InstrumentedDataWriterDecorator<D> extends InstrumentedDataWriterBa
   }
 
   @Override
+  public Descriptor getDataDescriptor() {
+    return this.embeddedWriter.getDataDescriptor();
+  }
+
+  @Override
   public Object getDecoratedObject() {
     return this.embeddedWriter;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 2ddcd76..56b326e 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.publisher;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -59,19 +60,20 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
 import org.apache.gobblin.metadata.MetadataMerger;
 import org.apache.gobblin.metadata.types.StaticStringMetadataMerger;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
-import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.ForkOperatorUtils;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.ParallelRunner;
-import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.WriterUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.writer.FsDataWriter;
 import org.apache.gobblin.writer.FsWriterMetrics;
 import org.apache.gobblin.writer.PartitionIdentifier;
+import org.apache.gobblin.writer.PartitionedDataWriter;
 
 import static org.apache.gobblin.util.retry.RetryerFactory.*;
 
@@ -294,12 +296,31 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
   }
 
   private void addLineageInfo(WorkUnitState state, int branchId) {
-    DatasetDescriptor destination = createDestinationDescriptor(state, branchId);
-    if (this.lineageInfo.isPresent()) {
-      this.lineageInfo.get().putDestination(destination, branchId, state);
+    if (!this.lineageInfo.isPresent()) {
+      LOG.info("Will not add lineage info");
+      return;
+    }
+
+    // Final dataset descriptor
+    DatasetDescriptor datasetDescriptor = createDestinationDescriptor(state, branchId);
+
+    List<PartitionDescriptor> partitions = PartitionedDataWriter.getPartitionInfoAndClean(state, branchId);
+    List<Descriptor> descriptors = new ArrayList<>();
+    if (partitions.size() == 0) {
+      // Report as dataset level lineage
+      descriptors.add(datasetDescriptor);
+    } else {
+      // Report as partition level lineage
+      for (PartitionDescriptor partition : partitions) {
+        descriptors.add(partition.copyWithNewDataset(datasetDescriptor));
+      }
     }
+    this.lineageInfo.get().putDestination(descriptors, branchId, state);
   }
 
+  /**
+   * Create destination dataset descriptor
+   */
   protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
     Path publisherOutputDir = getPublisherOutputDir(state, branchId);
     FileSystem fs = this.publisherFileSystemByBranches.get(branchId);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
index 157552e..e3ca31c 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
@@ -18,13 +18,17 @@
 package org.apache.gobblin.publisher;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.collect.Lists;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.ForkOperatorUtils;
 import org.apache.gobblin.util.ParallelRunner;
@@ -68,21 +72,4 @@ public class TimePartitionedDataPublisher extends BaseDataPublisher {
       movePath(parallelRunner, workUnitState, status.getPath(), outputPath, branchId);
     }
   }
-
-  @Override
-  protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
-    // Get base descriptor
-    DatasetDescriptor descriptor = super.createDestinationDescriptor(state, branchId);
-
-    // Decorate with partition prefix
-    String propName = ForkOperatorUtils
-        .getPropertyNameForBranch(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, numBranches, branchId);
-    String timePrefix = state.getProp(propName, "");
-    Path pathWithTimePrefix = new Path(descriptor.getName(), timePrefix);
-    DatasetDescriptor destination = new DatasetDescriptor(descriptor.getPlatform(), pathWithTimePrefix.toString());
-    // Add back the metadata
-    descriptor.getMetadata().forEach(destination::addMetadata);
-
-    return destination;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
index 66e5a26..5e912bb 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.records.ControlMessageHandler;
 import org.apache.gobblin.records.FlushControlMessageHandler;
 import org.apache.gobblin.stream.ControlMessage;
@@ -144,6 +145,11 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
   }
 
   @Override
+  public Descriptor getDataDescriptor() {
+    return writer.getDataDescriptor();
+  }
+
+  @Override
   public ControlMessageHandler getMessageHandler() {
     return this.controlMessageHandler;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index f4e4931..aa228af 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -39,6 +39,10 @@ import org.apache.gobblin.codec.StreamCodec;
 import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
 import org.apache.gobblin.metadata.types.GlobalMetadata;
 import org.apache.gobblin.util.FinalState;
 import org.apache.gobblin.util.ForkOperatorUtils;
@@ -155,6 +159,19 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
     }
   }
 
+  @Override
+  public Descriptor getDataDescriptor() {
+    // Dataset is resulted from WriterUtils.getWriterOutputDir(properties, this.numBranches, this.branchId)
+    // The writer dataset might not be same as the published dataset
+    DatasetDescriptor datasetDescriptor = new DatasetDescriptor(fs.getScheme(), outputFile.getParent().toString());
+
+    if (partitionKey == null) {
+      return datasetDescriptor;
+    }
+
+    return new PartitionDescriptor(partitionKey, datasetDescriptor);
+  }
+
   /**
    * Create the staging output file and an {@link OutputStream} to write to the file.
    *

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 83dd074..cdfc11b 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.writer;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
@@ -29,9 +31,11 @@ import org.apache.commons.lang3.reflect.ConstructorUtils;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
 
 import lombok.extern.slf4j.Slf4j;
@@ -39,6 +43,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
 import org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
 import org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
 import org.apache.gobblin.records.ControlMessageHandler;
@@ -66,6 +72,9 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
 
   private int writerIdSuffix = 0;
   private final String baseWriterId;
+  private final State state;
+  private final int branchId;
+
   private final Optional<WriterPartitioner> partitioner;
   private final LoadingCache<GenericRecord, DataWriter<D>> partitionWriters;
   private final Optional<PartitionAwareDataWriterBuilder> builder;
@@ -78,6 +87,9 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
 
   public PartitionedDataWriter(DataWriterBuilder<S, D> builder, final State state)
       throws IOException {
+    this.state = state;
+    this.branchId = builder.branch;
+
     this.isSpeculativeAttemptSafe = true;
     this.isWatermarkCapable = true;
     this.baseWriterId = builder.getWriterId();
@@ -227,7 +239,11 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
   @Override
   public void close()
       throws IOException {
-    this.closer.close();
+    try {
+      serializePartitionInfoToState();
+    } finally {
+      this.closer.close();
+    }
   }
 
   private DataWriter<D> createPartitionWriter(GenericRecord partition)
@@ -352,4 +368,58 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
       cloner.close();
     }
   }
+
+  /**
+   * Get the serialized key to partitions info in {@link #state}
+   */
+  private static String getPartitionsKey(int branchId) {
+    return String.format("writer.%d.partitions", branchId);
+  }
+
+  /**
+   * Serialize partitions info to {@link #state} if they are any
+   */
+  private void serializePartitionInfoToState() {
+    List<PartitionDescriptor> descriptors = new ArrayList<>();
+
+    for (DataWriter writer : partitionWriters.asMap().values()) {
+      Descriptor descriptor = writer.getDataDescriptor();
+      if (null == descriptor) {
+        log.warn("Drop partition info as writer {} returns a null PartitionDescriptor", writer.toString());
+        continue;
+      }
+
+      if (!(descriptor instanceof PartitionDescriptor)) {
+        log.warn("Drop partition info as writer {} does not return a PartitionDescriptor", writer.toString());
+        continue;
+      }
+
+      descriptors.add((PartitionDescriptor)descriptor);
+    }
+
+    if (descriptors.size() > 0) {
+      state.setProp(getPartitionsKey(branchId), PartitionDescriptor.toPartitionJsonList(descriptors));
+    } else {
+      log.info("Partitions info not available. Will not serialize partitions");
+    }
+  }
+
+  /**
+   * Get the partition info of a work unit from the {@code state}. Then partition info will be removed from the
+   * {@code state} to avoid persisting useless information
+   *
+   * <p>
+   *   In Gobblin, only the {@link PartitionedDataWriter} knows all partitions written for a work unit. Each partition
+   *   {@link DataWriter} decides the actual form of a dataset partition
+   * </p>
+   */
+  public static List<PartitionDescriptor> getPartitionInfoAndClean(State state, int branchId) {
+    String partitionsKey = getPartitionsKey(branchId);
+    String json = state.getProp(partitionsKey);
+    if (Strings.isNullOrEmpty(json)) {
+      return Lists.newArrayList();
+    }
+    state.removeProp(partitionsKey);
+    return PartitionDescriptor.fromPartitionJsonList(json);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
index d46d6e3..469b72f 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
@@ -21,25 +21,27 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Type;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
 import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
@@ -47,16 +49,19 @@ import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
 import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
-import org.apache.gobblin.broker.iface.SubscopedBrokerBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
 import org.apache.gobblin.metadata.MetadataMerger;
 import org.apache.gobblin.metadata.types.GlobalMetadata;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
 import org.apache.gobblin.writer.FsDataWriter;
 import org.apache.gobblin.writer.FsWriterMetrics;
 import org.apache.gobblin.writer.PartitionIdentifier;
@@ -66,6 +71,10 @@ import org.apache.gobblin.writer.PartitionIdentifier;
  * Tests for BaseDataPublisher
  */
 public class BaseDataPublisherTest {
+  private static final Type PARTITION_LIST_TYPE = new TypeToken<ArrayList<PartitionDescriptor>>(){}.getType();
+  private static final Gson GSON =
+      new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(Descriptor.class)).create();
+
   /**
    * Test DATA_PUBLISHER_METADATA_STR: a user should be able to put an arbitrary metadata string in job configuration
    * and have that written out.
@@ -532,6 +541,9 @@ public class BaseDataPublisherTest {
     }
   }
 
+  /**
+   * Test lineage info is set on publishing single task
+   */
   @Test
   public void testPublishSingleTask()
       throws IOException {
@@ -545,6 +557,9 @@ public class BaseDataPublisherTest {
     Assert.assertFalse(state.contains("gobblin.event.lineage.branch.1.destination"));
   }
 
+  /**
+   * Test lineage info is set on publishing multiple tasks
+   */
   @Test
   public void testPublishMultiTasks()
       throws IOException {
@@ -562,6 +577,69 @@ public class BaseDataPublisherTest {
     Assert.assertTrue(state2.contains("gobblin.event.lineage.branch.1.destination"));
   }
 
+  /**
+   * Test partition level lineages are set
+   */
+  @Test
+  public void testPublishedPartitionsLineage()
+      throws IOException {
+    int numBranches = 2;
+    int numPartitionsPerBranch = 2;
+
+    WorkUnitState state = buildTaskState(numBranches);
+    LineageInfo lineageInfo = LineageInfo.getLineageInfo(state.getTaskBroker()).get();
+    DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic");
+    lineageInfo.setSource(source, state);
+    BaseDataPublisher publisher = new BaseDataPublisher(state);
+
+    // Set up writer partition descriptors
+    DatasetDescriptor datasetAtWriter = new DatasetDescriptor("dummy", "dummy");
+    for (int i = 0; i < numBranches; i++) {
+      List<PartitionDescriptor> partitions = new ArrayList<>();
+      for (int j = 0; j < numPartitionsPerBranch; j++) {
+        // Dummy dataset descriptor will be discarded by publisher
+        partitions.add(new PartitionDescriptor("partition" + i + j, datasetAtWriter));
+      }
+      String partitionsKey = "writer." + i + ".partitions";
+      state.setProp(partitionsKey, GSON.toJson(partitions, PARTITION_LIST_TYPE));
+    }
+
+    publisher.publish(ImmutableList.of(state));
+
+    Assert.assertTrue(state.contains("gobblin.event.lineage.branch.0.destination"));
+    Assert.assertTrue(state.contains("gobblin.event.lineage.branch.1.destination"));
+
+    Collection<LineageEventBuilder> events = LineageInfo.load(ImmutableList.of(state));
+    Assert.assertTrue(events.size() == 4);
+
+    // Find the partition lineage and assert
+    for (int i = 0; i < numBranches; i++) {
+      String outputPath = String.format("/data/output/branch%d/namespace/table", i);
+      DatasetDescriptor destinationDataset = new DatasetDescriptor("file", outputPath);
+      destinationDataset.addMetadata("fsUri", "file:///");
+      destinationDataset.addMetadata("branch", "" + i);
+
+      for (int j = 0; j < numPartitionsPerBranch; j++) {
+        LineageEventBuilder event = find(events, "partition" + i + j);
+        Assert.assertTrue(null != event);
+        Assert.assertEquals(event.getSource(), source);
+        Assert.assertEquals(event.getDestination(),
+            // Dataset written by the writer is discarded
+            new PartitionDescriptor("partition" + i + j, destinationDataset));
+      }
+    }
+  }
+
+  private static LineageEventBuilder find(Collection<LineageEventBuilder> events, String partitionName) {
+    for (LineageEventBuilder event : events) {
+      if (event.getDestination().getName().equals(partitionName)) {
+        return event;
+      }
+    }
+
+    return null;
+  }
+
   public static class TestAdditionMerger implements MetadataMerger<String> {
     private int sum = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
index 0dc9846..20984ef 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
@@ -19,14 +19,18 @@ package org.apache.gobblin.writer;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.gobblin.ack.BasicAckableForTesting;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
 import org.apache.gobblin.stream.FlushControlMessage;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+import org.testng.util.Strings;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -107,6 +111,10 @@ public class PartitionedWriterTest {
     action = builder.actions.poll();
     Assert.assertEquals(action.getType(), TestPartitionAwareWriterBuilder.Actions.CLEANUP);
 
+    // Before close, partitions info is not serialized
+    String partitionsKey = "writer.0.partitions";
+    Assert.assertTrue(state.getProp(partitionsKey) == null);
+
     writer.close();
     Assert.assertEquals(builder.actions.size(), 2);
     action = builder.actions.poll();
@@ -114,6 +122,16 @@ public class PartitionedWriterTest {
     action = builder.actions.poll();
     Assert.assertEquals(action.getType(), TestPartitionAwareWriterBuilder.Actions.CLOSE);
 
+    // After close, partitions info is available
+    Assert.assertFalse(Strings.isNullOrEmpty(state.getProp(partitionsKey)));
+    List<PartitionDescriptor> partitions = PartitionedDataWriter.getPartitionInfoAndClean(state, 0);
+    Assert.assertTrue(state.getProp(partitionsKey) == null);
+    Assert.assertEquals(partitions.size(), 2);
+
+    DatasetDescriptor dataset = new DatasetDescriptor("testPlatform", "testDataset");
+    Assert.assertEquals(partitions.get(0), new PartitionDescriptor("a", dataset));
+    Assert.assertEquals(partitions.get(1), new PartitionDescriptor("1", dataset));
+
     writer.commit();
     Assert.assertEquals(builder.actions.size(), 2);
     action = builder.actions.poll();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
index 8b1e0ca..8da42d7 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
@@ -25,6 +25,9 @@ import org.apache.avro.Schema;
 import com.google.common.collect.Queues;
 
 import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
 import org.apache.gobblin.writer.DataWriter;
 import org.apache.gobblin.writer.PartitionAwareDataWriterBuilder;
 
@@ -118,6 +121,12 @@ public class TestPartitionAwareWriterBuilder extends PartitionAwareDataWriterBui
     public boolean isSpeculativeAttemptSafe() {
       return true;
     }
+
+    @Override
+    public Descriptor getDataDescriptor() {
+      DatasetDescriptor dataset = new DatasetDescriptor("testPlatform", "testDataset");
+      return new PartitionDescriptor(this.partition, dataset);
+    }
   }
 
   @Data

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
index eddd852..bfa76c5 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
@@ -18,10 +18,13 @@ package org.apache.gobblin.data.management.conversion.hive.dataset;
 
 import com.google.common.base.Optional;
 import java.io.InputStream;
+import java.lang.reflect.Type;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
@@ -53,18 +56,28 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset.ConversionConfig;
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
 
 import static org.mockito.Mockito.when;
 
 
 @Test(groups = { "gobblin.data.management.conversion" })
 public class ConvertibleHiveDatasetTest {
+
+  /** Lineage info ser/de */
+  private static final Type DESCRIPTOR_LIST_TYPE = new TypeToken<ArrayList<Descriptor>>(){}.getType();
+  private static final Gson GSON =
+      new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(Descriptor.class)).create();
+
+
   /**
    * Test if lineage information is properly set in the workunit for convertible hive datasets
    */
@@ -73,7 +86,6 @@ public class ConvertibleHiveDatasetTest {
     String testConfFilePath = "convertibleHiveDatasetTest/flattenedAndNestedOrc.conf";
     Config config = ConfigFactory.parseResources(testConfFilePath).getConfig("hive.conversion.avro");
     // Set datasetResolverFactory to convert Hive Lineage event to Hdfs Lineage event
-    Gson GSON = new Gson();
     ConvertibleHiveDataset testConvertibleDataset = createTestConvertibleDataset(config);
     HiveWorkUnit workUnit = new HiveWorkUnit(testConvertibleDataset);
     workUnit.setProp("gobblin.broker.lineageInfo.datasetResolverFactory",
@@ -104,7 +116,7 @@ public class ConvertibleHiveDatasetTest {
     // Assert that source is correct for lineage event
     Assert.assertTrue(props.containsKey("gobblin.event.lineage.source"));
     DatasetDescriptor sourceDD =
-        (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.source"));
+        GSON.fromJson(props.getProperty("gobblin.event.lineage.source"), DatasetDescriptor.class);
     Assert.assertEquals(sourceDD.getPlatform(), "file");
     Assert.assertEquals(sourceDD.getName(), "/tmp/test");
     Assert.assertEquals(sourceDD.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), "db1.tb1");
@@ -112,7 +124,7 @@ public class ConvertibleHiveDatasetTest {
     // Assert that first dest is correct for lineage event
     Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.1.destination"));
     DatasetDescriptor destDD1 =
-        (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.branch.1.destination"));
+        (DatasetDescriptor) firstDescriptor(props, "gobblin.event.lineage.branch.1.destination");
     Assert.assertEquals(destDD1.getPlatform(), "file");
     Assert.assertEquals(destDD1.getName(), "/tmp/data_nestedOrc/db1/tb1/final");
     Assert.assertEquals(destDD1.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
@@ -121,13 +133,18 @@ public class ConvertibleHiveDatasetTest {
     // Assert that second dest is correct for lineage event
     Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.2.destination"));
     DatasetDescriptor destDD2 =
-        (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.branch.2.destination"));
+        (DatasetDescriptor) firstDescriptor(props, "gobblin.event.lineage.branch.2.destination");
     Assert.assertEquals(destDD2.getPlatform(), "file");
     Assert.assertEquals(destDD2.getName(), "/tmp/data_flattenedOrc/db1/tb1/final");
     Assert.assertEquals(destDD2.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
         "db1_flattenedOrcDb.tb1_flattenedOrc");
   }
 
+  private Descriptor firstDescriptor(Properties prop, String destinationKey) {
+    List<Descriptor> descriptors = GSON.fromJson(prop.getProperty(destinationKey), DESCRIPTOR_LIST_TYPE);
+    return descriptors.get(0);
+  }
+
   @Test
   public void testFlattenedOrcConfig() throws Exception {
     String testConfFilePath = "convertibleHiveDatasetTest/flattenedOrc.conf";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
index 46f8ab1..c712c4d 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
@@ -733,7 +733,7 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
      */
     public MetricContext buildStrict() throws NameConflictException {
       if(this.parent == null) {
-        this.parent = RootMetricContext.get();
+        hasParent(RootMetricContext.get());
       }
       return new MetricContext(this.name, this.parent, this.tags, false);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
index c920cc3..63d7237 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
@@ -17,16 +17,10 @@
 
 package org.apache.gobblin.metrics.event.lineage;
 
-
 import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.gobblin.dataset.DatasetDescriptor;
-import org.apache.gobblin.dataset.Descriptor;
-import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.event.GobblinEventBuilder;
-
 import com.google.common.base.Joiner;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
@@ -35,6 +29,10 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
 
 /**
  * The builder builds a specific {@link GobblinTrackingEvent} whose metadata has {@value GobblinEventBuilder#EVENT_TYPE}
@@ -65,8 +63,8 @@ public final class LineageEventBuilder extends GobblinEventBuilder {
   @Override
   public GobblinTrackingEvent build() {
     Map<String, String> dataMap = Maps.newHashMap(metadata);
-    dataMap.put(SOURCE, Descriptor.serialize(source));
-    dataMap.put(DESTINATION, Descriptor.serialize(destination));
+    dataMap.put(SOURCE, Descriptor.toJson(source));
+    dataMap.put(DESTINATION, Descriptor.toJson(destination));
     return new GobblinTrackingEvent(0L, namespace, name, dataMap);
   }
 
@@ -126,10 +124,10 @@ public final class LineageEventBuilder extends GobblinEventBuilder {
     metadata.forEach((key, value) -> {
       switch (key) {
         case SOURCE:
-          lineageEvent.setSource(Descriptor.deserialize(value));
+          lineageEvent.setSource(Descriptor.fromJson(value));
           break;
         case DESTINATION:
-          lineageEvent.setDestination(Descriptor.deserialize(value));
+          lineageEvent.setDestination(Descriptor.fromJson(value));
           break;
         default:
           lineageEvent.addMetadata(key, value);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
index 0311df7..3e65105 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -17,7 +17,10 @@
 
 package org.apache.gobblin.metrics.event.lineage;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -25,9 +28,9 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import com.google.gson.Gson;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -80,7 +83,6 @@ public final class LineageInfo {
   private static final String DATASET_RESOLVER_CONFIG_NAMESPACE = "datasetResolver";
 
   private static final String BRANCH = "branch";
-  private static final Gson GSON = new Gson();
   private static final String NAME_KEY = "name";
 
   private static final Config FALLBACK =
@@ -112,7 +114,7 @@ public final class LineageInfo {
     }
 
     state.setProp(getKey(NAME_KEY), descriptor.getName());
-    state.setProp(getKey(LineageEventBuilder.SOURCE), Descriptor.serialize(descriptor));
+    state.setProp(getKey(LineageEventBuilder.SOURCE), Descriptor.toJson(descriptor));
   }
 
   /**
@@ -123,20 +125,41 @@ public final class LineageInfo {
    *   is supposed to put the destination dataset information. Since different branches may concurrently put,
    *   the method is implemented to be threadsafe
    * </p>
+   *
+   * @deprecated Use {@link #putDestination(List, int, State)}
    */
+  @Deprecated
   public void putDestination(Descriptor destination, int branchId, State state) {
+    putDestination(Lists.newArrayList(destination), branchId, state);
+  }
+
+  /**
+   * Put data {@link Descriptor}s of a destination dataset to a state
+   *
+   * @param descriptors It can be a single item list which just has the dataset descriptor or a list
+   *                    of dataset partition descriptors
+   */
+  public void putDestination(List<Descriptor> descriptors, int branchId, State state) {
+
     if (!hasLineageInfo(state)) {
-      log.warn("State has no lineage info but branch " + branchId + " puts a destination: " + GSON.toJson(destination));
+      log.warn("State has no lineage info but branch " + branchId + " puts {} descriptors", descriptors.size());
       return;
     }
-    log.debug(String.format("Put destination %s for branch %d", GSON.toJson(destination), branchId));
+
+    log.info(String.format("Put destination %s for branch %d", Descriptor.toJson(descriptors), branchId));
+
     synchronized (state.getProp(getKey(NAME_KEY))) {
-      Descriptor descriptor = resolver.resolve(destination, state);
-      if (descriptor == null) {
-        return;
+      List<Descriptor> resolvedDescriptors = new ArrayList<>();
+      for (Descriptor descriptor : descriptors) {
+        Descriptor resolvedDescriptor = resolver.resolve(descriptor, state);
+        if (resolvedDescriptor == null) {
+          continue;
+        }
+        resolvedDescriptors.add(resolvedDescriptor);
       }
 
-      state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), Descriptor.serialize(descriptor));
+      state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION),
+          Descriptor.toJson(resolvedDescriptors));
     }
   }
 
@@ -150,8 +173,8 @@ public final class LineageInfo {
     Preconditions.checkArgument(states != null && !states.isEmpty());
     Set<LineageEventBuilder> allEvents = Sets.newHashSet();
     for (State state : states) {
-      Map<String, LineageEventBuilder> branchedEvents = load(state);
-      allEvents.addAll(branchedEvents.values());
+      Map<String, Set<LineageEventBuilder>> branchedEvents = load(state);
+      branchedEvents.values().forEach(allEvents::addAll);
     }
     return allEvents;
   }
@@ -161,12 +184,12 @@ public final class LineageInfo {
    *
    * @return A map from branch to its lineage info. If there is no destination info, return an empty map
    */
-  static Map<String, LineageEventBuilder> load(State state) {
+  static Map<String, Set<LineageEventBuilder>> load(State state) {
     String name = state.getProp(getKey(NAME_KEY));
-    Descriptor source = Descriptor.deserialize(state.getProp(getKey(LineageEventBuilder.SOURCE)));
+    Descriptor source = Descriptor.fromJson(state.getProp(getKey(LineageEventBuilder.SOURCE)));
 
     String branchedPrefix = getKey(BRANCH, "");
-    Map<String, LineageEventBuilder> events = Maps.newHashMap();
+    Map<String, Set<LineageEventBuilder>> events = Maps.newHashMap();
     if (source == null) {
       return events;
     }
@@ -180,16 +203,17 @@ public final class LineageInfo {
       String[] parts = key.substring(branchedPrefix.length()).split("\\.");
       assert parts.length == 2;
       String branchId = parts[0];
-      LineageEventBuilder event = events.get(branchId);
-      if (event == null) {
-        event = new LineageEventBuilder(name);
-        event.setSource(source.copy());
-        events.put(parts[0], event);
-      }
+      Set<LineageEventBuilder> branchEvents = events.computeIfAbsent(branchId, k -> new HashSet<>());
+
       switch (parts[1]) {
         case LineageEventBuilder.DESTINATION:
-          Descriptor destination = Descriptor.deserialize(entry.getValue().toString());
-          event.setDestination(destination);
+          List<Descriptor> descriptors = Descriptor.fromJsonList(entry.getValue().toString());
+          for (Descriptor descriptor : descriptors) {
+            LineageEventBuilder event = new LineageEventBuilder(name);
+            event.setSource(source);
+            event.setDestination(descriptor);
+            branchEvents.add(event);
+          }
           break;
         default:
           throw new RuntimeException("Unsupported lineage key: " + key);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
index 8ca6d5e..5fd7952 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.metrics.event.lineage;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
@@ -65,9 +66,9 @@ public class LineageEventTest {
     destination01.addMetadata(branch, "1");
     lineageInfo.putDestination(destination01, 1, state0);
 
-    Map<String, LineageEventBuilder> events = LineageInfo.load(state0);
-    verify(events.get("0"), topic, source, destination00);
-    verify(events.get("1"), topic, source, destination01);
+    Map<String, Set<LineageEventBuilder>> events = LineageInfo.load(state0);
+    verify(first(events.get("0")), topic, source, destination00);
+    verify(first(events.get("1")), topic, source, destination01);
 
     State state1 = new State();
     lineageInfo.setSource(source, state1);
@@ -78,8 +79,8 @@ public class LineageEventTest {
     // Test only full fledged lineage events are loaded
     Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
     Assert.assertTrue(eventsList.size() == 2);
-    Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0"));
-    Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1"));
+    Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), first(events.get("0")));
+    Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), first(events.get("1")));
 
     // There are 3 full fledged lineage events
     DatasetDescriptor destination12 = new DatasetDescriptor(mysql, "kafka.testTopic2");
@@ -87,8 +88,8 @@ public class LineageEventTest {
     lineageInfo.putDestination(destination12, 2, state1);
     eventsList = LineageInfo.load(states);
     Assert.assertTrue(eventsList.size() == 3);
-    Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0"));
-    Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1"));
+    Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), first(events.get("0")));
+    Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), first(events.get("1")));
     verify(getLineageEvent(eventsList, 2, mysql), topic, source, destination12);
 
 
@@ -100,8 +101,8 @@ public class LineageEventTest {
     lineageInfo.putDestination(destination11, 1, state1);
     eventsList = LineageInfo.load(states);
     Assert.assertTrue(eventsList.size() == 4);
-    Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0"));
-    Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1"));
+    Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), first(events.get("0")));
+    Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), first(events.get("1")));
     // Either branch 0 or 2 of state 1 is selected
     LineageEventBuilder event12 = getLineageEvent(eventsList, 0, mysql);
     if (event12 == null) {
@@ -127,8 +128,8 @@ public class LineageEventTest {
     PartitionDescriptor destination = new PartitionDescriptor(partitionName, destinationDataset);
     lineageInfo.putDestination(destination, 0, state);
 
-    Map<String, LineageEventBuilder> events = LineageInfo.load(state);
-    LineageEventBuilder event = events.get("0");
+    Map<String, Set<LineageEventBuilder>> events = LineageInfo.load(state);
+    LineageEventBuilder event = first(events.get("0"));
     verify(event, topic, source, destination);
 
     // Verify gobblin tracking event
@@ -170,4 +171,8 @@ public class LineageEventTest {
     Assert.assertTrue(event.getSource().equals(source));
     Assert.assertTrue(event.getDestination().equals(destination));
   }
+
+  private <T> T first(Collection<T> collection) {
+    return collection.iterator().next();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index c78d098..3a28b18 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -130,6 +130,11 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
 
     HadoopUtils.addGobblinSite();
 
+    // Configure root metric context
+    List<Tag<?>> tags = Lists.newArrayList();
+    tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
+    RootMetricContext.get(tags);
+
     if (props.containsKey(GOBBLIN_LOG_LEVEL_KEY)) {
       Level logLevel = Level.toLevel(props.getProperty(GOBBLIN_LOG_LEVEL_KEY), Level.INFO);
       Logger.getLogger("org.apache.gobblin").setLevel(logLevel);
@@ -198,9 +203,6 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
       jobProps = ConfigUtils.configToProperties(resolvedJob);
     }
 
-    List<Tag<?>> tags = Lists.newArrayList();
-    tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
-    RootMetricContext.get(tags);
     GobblinMetrics.addCustomTagsToProperties(jobProps, tags);
 
     // If the job launcher type is not specified in the job configuration,

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
index 0b0e06d..95931a6 100644
--- a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
+++ b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
@@ -49,8 +49,9 @@ public class SalesforceSourceTest {
     List<WorkUnit> workUnits = source.generateWorkUnits(sourceEntity, sourceState, 20140213000000L);
     Assert.assertEquals(workUnits.size(), 1);
 
-    DatasetDescriptor sourceDataset = new DatasetDescriptor("salesforce", "contacts");
-    Assert.assertEquals(Descriptor.serialize(sourceDataset), workUnits.get(0).getProp("gobblin.event.lineage.source"));
+    String expected = "{\"object-type\":\"org.apache.gobblin.dataset.DatasetDescriptor\","
+        + "\"object-data\":{\"platform\":\"salesforce\",\"metadata\":{},\"name\":\"contacts\"}}";
+    Assert.assertEquals(expected, workUnits.get(0).getProp("gobblin.event.lineage.source"));
     Assert.assertEquals(workUnits.get(0).getProp("gobblin.event.lineage.name"), "contacts");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
deleted file mode 100644
index 5973aa9..0000000
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.util.io;
-
-import lombok.AllArgsConstructor;
-import lombok.RequiredArgsConstructor;
-
-import java.io.IOException;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.ParameterizedType;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.commons.lang3.ClassUtils;
-
-import com.google.common.base.Optional;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-import com.google.gson.TypeAdapter;
-import com.google.gson.TypeAdapterFactory;
-import com.google.gson.internal.Streams;
-import com.google.gson.reflect.TypeToken;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonWriter;
-
-
-/**
- * A {@link Gson} interface adapter that makes it possible to serialize and deserialize polymorphic objects.
- *
- * <p>
- *   This adapter will capture all instances of {@link #baseClass} and write them as
- *   {"object-type":"class.name", "object-data":"data"}, allowing for correct serialization and deserialization of
- *   polymorphic objects. The following types will not be captured by the adapter (i.e. they will be written by the
- *   default GSON writer):
- *   - Primitives and boxed primitives
- *   - Arrays
- *   - Collections
- *   - Maps
- *   Additionally, generic classes (e.g. class MyClass<T>) cannot be correctly decoded.
- * </p>
- *
- * <p>
- *   To use:
- *    <pre>
- *          {@code
- *            MyClass object = new MyClass();
- *            Gson gson = GsonInterfaceAdapter.getGson(MyBaseClass.class);
- *            String json = gson.toJson(object);
- *            Myclass object2 = gson.fromJson(json, MyClass.class);
- *          }
- *    </pre>
- * </p>
- *
- * <p>
- *   Note: a useful case is GsonInterfaceAdapter.getGson(Object.class), which will correctly serialize / deserialize
- *   all types except for java generics.
- * </p>
- *
- * @param <T> The interface or abstract type to be serialized and deserialized with {@link Gson}.
- */
-@RequiredArgsConstructor
-public class GsonInterfaceAdapter implements TypeAdapterFactory {
-
-  protected static final String OBJECT_TYPE = "object-type";
-  protected static final String OBJECT_DATA = "object-data";
-
-  private final Class<?> baseClass;
-
-  @Override
-  public <R> TypeAdapter<R> create(Gson gson, TypeToken<R> type) {
-    if (ClassUtils.isPrimitiveOrWrapper(type.getRawType()) || type.getType() instanceof GenericArrayType
-        || CharSequence.class.isAssignableFrom(type.getRawType())
-        || (type.getType() instanceof ParameterizedType && (Collection.class.isAssignableFrom(type.getRawType())
-            || Map.class.isAssignableFrom(type.getRawType())))) {
-      // delegate primitives, arrays, collections, and maps
-      return null;
-    }
-    if (!this.baseClass.isAssignableFrom(type.getRawType())) {
-      // delegate anything not assignable from base class
-      return null;
-    }
-    TypeAdapter<R> adapter = new InterfaceAdapter<>(gson, this, type);
-    return adapter;
-  }
-
-  @AllArgsConstructor
-  private static class InterfaceAdapter<R> extends TypeAdapter<R> {
-
-    private final Gson gson;
-    private final TypeAdapterFactory factory;
-    private final TypeToken<R> typeToken;
-
-    @Override
-    public void write(JsonWriter out, R value) throws IOException {
-      if (Optional.class.isAssignableFrom(this.typeToken.getRawType())) {
-        Optional opt = (Optional) value;
-        if (opt != null && opt.isPresent()) {
-          Object actualValue = opt.get();
-          writeObject(actualValue, out);
-        } else {
-          out.beginObject();
-          out.endObject();
-        }
-      } else {
-        writeObject(value, out);
-      }
-    }
-
-    @Override
-    public R read(JsonReader in) throws IOException {
-      JsonElement element = Streams.parse(in);
-      if (element.isJsonNull()) {
-        return readNull();
-      }
-      JsonObject jsonObject = element.getAsJsonObject();
-
-      if (this.typeToken.getRawType() == Optional.class) {
-        if (jsonObject.has(OBJECT_TYPE)) {
-          return (R) Optional.of(readValue(jsonObject, null));
-        } else if (jsonObject.entrySet().isEmpty()) {
-          return (R) Optional.absent();
-        } else {
-          throw new IOException("No class found for Optional value.");
-        }
-      }
-      return this.readValue(jsonObject, this.typeToken);
-    }
-
-    private <S> S readNull() {
-      if (this.typeToken.getRawType() == Optional.class) {
-        return (S) Optional.absent();
-      }
-      return null;
-    }
-
-    private <S> void writeObject(S value, JsonWriter out) throws IOException {
-      if (value != null) {
-        JsonObject jsonObject = new JsonObject();
-        jsonObject.add(OBJECT_TYPE, new JsonPrimitive(value.getClass().getName()));
-        TypeAdapter<S> delegate =
-            (TypeAdapter<S>) this.gson.getDelegateAdapter(this.factory, TypeToken.get(value.getClass()));
-        jsonObject.add(OBJECT_DATA, delegate.toJsonTree(value));
-        Streams.write(jsonObject, out);
-      } else {
-        out.nullValue();
-      }
-    }
-
-    private <S> S readValue(JsonObject jsonObject, TypeToken<S> defaultTypeToken) throws IOException {
-      try {
-        TypeToken<S> actualTypeToken;
-        if (jsonObject.isJsonNull()) {
-          return null;
-        } else if (jsonObject.has(OBJECT_TYPE)) {
-          String className = jsonObject.get(OBJECT_TYPE).getAsString();
-          Class<S> klazz = (Class<S>) Class.forName(className);
-          actualTypeToken = TypeToken.get(klazz);
-        } else if (defaultTypeToken != null) {
-          actualTypeToken = defaultTypeToken;
-        } else {
-          throw new IOException("Could not determine TypeToken.");
-        }
-        TypeAdapter<S> delegate = this.gson.getDelegateAdapter(this.factory, actualTypeToken);
-        S value = delegate.fromJsonTree(jsonObject.get(OBJECT_DATA));
-        return value;
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException(cnfe);
-      }
-    }
-
-  }
-
-  public static <T> Gson getGson(Class<T> clazz) {
-    Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(clazz)).create();
-    return gson;
-  }
-}