You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/09/14 19:31:09 UTC
[2/2] incubator-gobblin git commit: [GOBBLIN-587] Implement partition
level lineage for fs based destination
[GOBBLIN-587] Implement partition level lineage for fs based destination
Closes #2453 from zxcware/pd
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ef59a151
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ef59a151
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ef59a151
Branch: refs/heads/master
Commit: ef59a1517575f41671b0ec4ffa6ac53b3648e30c
Parents: e74c8b7
Author: zhchen <zh...@linkedin.com>
Authored: Fri Sep 14 12:31:02 2018 -0700
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Fri Sep 14 12:31:02 2018 -0700
----------------------------------------------------------------------
.../org/apache/gobblin/dataset/Descriptor.java | 71 +++----
.../gobblin/dataset/PartitionDescriptor.java | 28 +++
.../gobblin/util/io/GsonInterfaceAdapter.java | 195 +++++++++++++++++++
.../org/apache/gobblin/writer/DataWriter.java | 23 ++-
.../apache/gobblin/dataset/DescriptorTest.java | 13 +-
.../util/io/GsonInterfaceAdapterTest.java | 46 +++++
.../org/apache/gobblin/util/test/BaseClass.java | 37 ++++
.../apache/gobblin/util/test/ExtendedClass.java | 33 ++++
.../org/apache/gobblin/util/test/TestClass.java | 69 +++++++
.../writer/InstrumentedDataWriterDecorator.java | 6 +
.../gobblin/publisher/BaseDataPublisher.java | 31 ++-
.../publisher/TimePartitionedDataPublisher.java | 21 +-
.../writer/CloseOnFlushWriterWrapper.java | 6 +
.../org/apache/gobblin/writer/FsDataWriter.java | 17 ++
.../gobblin/writer/PartitionedDataWriter.java | 72 ++++++-
.../publisher/BaseDataPublisherTest.java | 86 +++++++-
.../gobblin/writer/PartitionedWriterTest.java | 18 ++
.../test/TestPartitionAwareWriterBuilder.java | 9 +
.../dataset/ConvertibleHiveDatasetTest.java | 25 ++-
.../apache/gobblin/metrics/MetricContext.java | 2 +-
.../event/lineage/LineageEventBuilder.java | 18 +-
.../metrics/event/lineage/LineageInfo.java | 68 ++++---
.../metrics/event/lineage/LineageEventTest.java | 27 +--
.../gobblin/azkaban/AzkabanJobLauncher.java | 8 +-
.../salesforce/SalesforceSourceTest.java | 5 +-
.../gobblin/util/io/GsonInterfaceAdapter.java | 195 -------------------
.../util/io/GsonInterfaceAdapterTest.java | 46 -----
.../org/apache/gobblin/util/test/BaseClass.java | 37 ----
.../apache/gobblin/util/test/ExtendedClass.java | 33 ----
.../org/apache/gobblin/util/test/TestClass.java | 69 -------
30 files changed, 810 insertions(+), 504 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java
index 2e3daa5..3e30ab8 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Descriptor.java
@@ -18,13 +18,18 @@
package org.apache.gobblin.dataset;
import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
import com.google.gson.Gson;
-import com.google.gson.JsonObject;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
+
/**
* A descriptor is a simplified representation of a resource, which could be a dataset, dataset partition, file, etc.
@@ -32,11 +37,6 @@ import lombok.RequiredArgsConstructor;
* primary keys, version, etc
*
* <p>
- * The class provides {@link #serialize(Descriptor)} and {@link #deserialize(String)} util methods pair to send
- * a descriptor object over the wire
- * </p>
- *
- * <p>
* When the original object has complicated inner structure and there is a requirement to send it over the wire,
* it's a time to define a corresponding {@link Descriptor} becomes. In this case, the {@link Descriptor} can just
* have minimal information enough to construct the original object on the other side of the wire
@@ -52,7 +52,10 @@ import lombok.RequiredArgsConstructor;
public class Descriptor {
/** Use gson for ser/de */
- private static final Gson GSON = new Gson();
+ public static final Gson GSON =
+ new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(Descriptor.class)).create();
+ /** Type token for ser/de descriptor list */
+ private static final Type DESCRIPTOR_LIST_TYPE = new TypeToken<ArrayList<Descriptor>>(){}.getType();
/** Name of the resource */
@Getter
@@ -68,41 +71,41 @@ public class Descriptor {
}
/**
- * A helper class for ser/de of a {@link Descriptor}
+ * Serialize any {@link Descriptor} object as json string
+ *
+ * <p>
+ * Note: it can serialize subclasses
+ * </p>
+ */
+ public static String toJson(Descriptor descriptor) {
+ return GSON.toJson(descriptor);
+ }
+
+ /**
+ * Deserialize the json string to the a {@link Descriptor} object
+ */
+ public static Descriptor fromJson(String json) {
+ return fromJson(json, Descriptor.class);
+ }
+
+ /**
+ * Deserialize the json string to the specified {@link Descriptor} object
*/
- @RequiredArgsConstructor
- private static class Wrap {
- /** The actual class name of the {@link Descriptor} */
- private final String clazz;
- /** A json representation of the {@link Descriptor}*/
- private final JsonObject data;
+ public static <T extends Descriptor> T fromJson(String json, Class<T> clazz) {
+ return GSON.fromJson(json, clazz);
}
/**
- * Serialize any {@link Descriptor} object to a string
+ * Serialize a list of descriptors as json string
*/
- public static String serialize(Descriptor descriptor) {
- if (descriptor == null) {
- return GSON.toJson(null);
- }
- JsonObject data = GSON.toJsonTree(descriptor).getAsJsonObject();
- return GSON.toJson(new Wrap(descriptor.getClass().getName(), data));
+ public static String toJson(List<Descriptor> descriptors) {
+ return GSON.toJson(descriptors, DESCRIPTOR_LIST_TYPE);
}
/**
- * Deserialize a string, which results from {@link #serialize(Descriptor)}, into the original
- * {@link Descriptor} object
+ * Deserialize the string, resulted from {@link #toJson(List)}, to a list of descriptors
*/
- public static Descriptor deserialize(String serialized) {
- Wrap wrap = GSON.fromJson(serialized, Wrap.class);
- if (wrap == null) {
- return null;
- }
-
- try {
- return GSON.fromJson(wrap.data, (Type) Class.forName(wrap.clazz));
- } catch (ClassNotFoundException e) {
- return null;
- }
+ public static List<Descriptor> fromJsonList(String jsonList) {
+ return GSON.fromJson(jsonList, DESCRIPTOR_LIST_TYPE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java
index f0b4dcf..2a5370e 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionDescriptor.java
@@ -17,6 +17,12 @@
package org.apache.gobblin.dataset;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.gson.reflect.TypeToken;
+
import lombok.Getter;
@@ -24,6 +30,10 @@ import lombok.Getter;
* A {@link Descriptor} to identifies a partition of a dataset
*/
public class PartitionDescriptor extends Descriptor {
+
+ /** Type token for ser/de partition descriptor list */
+ private static final Type DESCRIPTOR_LIST_TYPE = new TypeToken<ArrayList<PartitionDescriptor>>(){}.getType();
+
@Getter
private final DatasetDescriptor dataset;
@@ -37,6 +47,10 @@ public class PartitionDescriptor extends Descriptor {
return new PartitionDescriptor(getName(), dataset);
}
+ public PartitionDescriptor copyWithNewDataset(DatasetDescriptor dataset) {
+ return new PartitionDescriptor(getName(), dataset);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -56,4 +70,18 @@ public class PartitionDescriptor extends Descriptor {
result = 31 * result + getName().hashCode();
return result;
}
+
+ /**
+ * Serialize a list of partition descriptors as json string
+ */
+ public static String toPartitionJsonList(List<PartitionDescriptor> descriptors) {
+ return Descriptor.GSON.toJson(descriptors, DESCRIPTOR_LIST_TYPE);
+ }
+
+ /**
+ * Deserialize the string, resulted from {@link #toPartitionJsonList(List)}, to a list of partition descriptors
+ */
+ public static List<PartitionDescriptor> fromPartitionJsonList(String jsonList) {
+ return Descriptor.GSON.fromJson(jsonList, DESCRIPTOR_LIST_TYPE);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java b/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
new file mode 100644
index 0000000..5973aa9
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.io;
+
+import lombok.AllArgsConstructor;
+import lombok.RequiredArgsConstructor;
+
+import java.io.IOException;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.ParameterizedType;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.TypeAdapter;
+import com.google.gson.TypeAdapterFactory;
+import com.google.gson.internal.Streams;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+
+
+/**
+ * A {@link Gson} interface adapter that makes it possible to serialize and deserialize polymorphic objects.
+ *
+ * <p>
+ * This adapter will capture all instances of {@link #baseClass} and write them as
+ * {"object-type":"class.name", "object-data":"data"}, allowing for correct serialization and deserialization of
+ * polymorphic objects. The following types will not be captured by the adapter (i.e. they will be written by the
+ * default GSON writer):
+ * - Primitives and boxed primitives
+ * - Arrays
+ * - Collections
+ * - Maps
+ * Additionally, generic classes (e.g. class MyClass<T>) cannot be correctly decoded.
+ * </p>
+ *
+ * <p>
+ * To use:
+ * <pre>
+ * {@code
+ * MyClass object = new MyClass();
+ * Gson gson = GsonInterfaceAdapter.getGson(MyBaseClass.class);
+ * String json = gson.toJson(object);
+ * Myclass object2 = gson.fromJson(json, MyClass.class);
+ * }
+ * </pre>
+ * </p>
+ *
+ * <p>
+ * Note: a useful case is GsonInterfaceAdapter.getGson(Object.class), which will correctly serialize / deserialize
+ * all types except for java generics.
+ * </p>
+ *
+ * @param <T> The interface or abstract type to be serialized and deserialized with {@link Gson}.
+ */
+@RequiredArgsConstructor
+public class GsonInterfaceAdapter implements TypeAdapterFactory {
+
+ protected static final String OBJECT_TYPE = "object-type";
+ protected static final String OBJECT_DATA = "object-data";
+
+ private final Class<?> baseClass;
+
+ @Override
+ public <R> TypeAdapter<R> create(Gson gson, TypeToken<R> type) {
+ if (ClassUtils.isPrimitiveOrWrapper(type.getRawType()) || type.getType() instanceof GenericArrayType
+ || CharSequence.class.isAssignableFrom(type.getRawType())
+ || (type.getType() instanceof ParameterizedType && (Collection.class.isAssignableFrom(type.getRawType())
+ || Map.class.isAssignableFrom(type.getRawType())))) {
+ // delegate primitives, arrays, collections, and maps
+ return null;
+ }
+ if (!this.baseClass.isAssignableFrom(type.getRawType())) {
+ // delegate anything not assignable from base class
+ return null;
+ }
+ TypeAdapter<R> adapter = new InterfaceAdapter<>(gson, this, type);
+ return adapter;
+ }
+
+ @AllArgsConstructor
+ private static class InterfaceAdapter<R> extends TypeAdapter<R> {
+
+ private final Gson gson;
+ private final TypeAdapterFactory factory;
+ private final TypeToken<R> typeToken;
+
+ @Override
+ public void write(JsonWriter out, R value) throws IOException {
+ if (Optional.class.isAssignableFrom(this.typeToken.getRawType())) {
+ Optional opt = (Optional) value;
+ if (opt != null && opt.isPresent()) {
+ Object actualValue = opt.get();
+ writeObject(actualValue, out);
+ } else {
+ out.beginObject();
+ out.endObject();
+ }
+ } else {
+ writeObject(value, out);
+ }
+ }
+
+ @Override
+ public R read(JsonReader in) throws IOException {
+ JsonElement element = Streams.parse(in);
+ if (element.isJsonNull()) {
+ return readNull();
+ }
+ JsonObject jsonObject = element.getAsJsonObject();
+
+ if (this.typeToken.getRawType() == Optional.class) {
+ if (jsonObject.has(OBJECT_TYPE)) {
+ return (R) Optional.of(readValue(jsonObject, null));
+ } else if (jsonObject.entrySet().isEmpty()) {
+ return (R) Optional.absent();
+ } else {
+ throw new IOException("No class found for Optional value.");
+ }
+ }
+ return this.readValue(jsonObject, this.typeToken);
+ }
+
+ private <S> S readNull() {
+ if (this.typeToken.getRawType() == Optional.class) {
+ return (S) Optional.absent();
+ }
+ return null;
+ }
+
+ private <S> void writeObject(S value, JsonWriter out) throws IOException {
+ if (value != null) {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.add(OBJECT_TYPE, new JsonPrimitive(value.getClass().getName()));
+ TypeAdapter<S> delegate =
+ (TypeAdapter<S>) this.gson.getDelegateAdapter(this.factory, TypeToken.get(value.getClass()));
+ jsonObject.add(OBJECT_DATA, delegate.toJsonTree(value));
+ Streams.write(jsonObject, out);
+ } else {
+ out.nullValue();
+ }
+ }
+
+ private <S> S readValue(JsonObject jsonObject, TypeToken<S> defaultTypeToken) throws IOException {
+ try {
+ TypeToken<S> actualTypeToken;
+ if (jsonObject.isJsonNull()) {
+ return null;
+ } else if (jsonObject.has(OBJECT_TYPE)) {
+ String className = jsonObject.get(OBJECT_TYPE).getAsString();
+ Class<S> klazz = (Class<S>) Class.forName(className);
+ actualTypeToken = TypeToken.get(klazz);
+ } else if (defaultTypeToken != null) {
+ actualTypeToken = defaultTypeToken;
+ } else {
+ throw new IOException("Could not determine TypeToken.");
+ }
+ TypeAdapter<S> delegate = this.gson.getDelegateAdapter(this.factory, actualTypeToken);
+ S value = delegate.fromJsonTree(jsonObject.get(OBJECT_DATA));
+ return value;
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+ }
+
+ }
+
+ public static <T> Gson getGson(Class<T> clazz) {
+ Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(clazz)).create();
+ return gson;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java b/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
index 18d11d6..400dcdf 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/writer/DataWriter.java
@@ -21,13 +21,18 @@ import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
+import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.FlushControlMessageHandler;
import org.apache.gobblin.stream.RecordEnvelope;
/**
- * An interface for data writers.
+ * An interface for data writers
+ *
+ * <p>
+ * Generally, one work unit has a dedicated {@link DataWriter} instance, which processes only one dataset
+ * </p>
*
* @param <D> data record type
*
@@ -77,6 +82,22 @@ public interface DataWriter<D> extends Closeable, Flushable {
throws IOException;
/**
+ * The method should return a {@link Descriptor} that represents what the writer is writing
+ *
+ * <p>
+ * Note that, this information might be useless and discarded by a
+ * {@link org.apache.gobblin.publisher.DataPublisher}, which determines the final form of dataset or partition
+ * </p>
+ *
+ * @return a {@link org.apache.gobblin.dataset.DatasetDescriptor} if it writes files of a dataset or
+ * a {@link org.apache.gobblin.dataset.PartitionDescriptor} if it writes files of a dataset partition or
+ * {@code null} if it is useless
+ */
+ default Descriptor getDataDescriptor() {
+ return null;
+ }
+
+ /**
* Write the input {@link RecordEnvelope}. By default, just call {@link #write(Object)}.
*/
default void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
index 6ac875b..d7b0409 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/dataset/DescriptorTest.java
@@ -38,20 +38,15 @@ public class DescriptorTest {
@Test
public void testPartitionDescriptor() {
- // Test serialization
- String partitionJson = "{\"clazz\":\"org.apache.gobblin.dataset.PartitionDescriptor\",\"data\":{\"dataset\":{\"platform\":\"hdfs\",\"metadata\":{},\"name\":\"/data/tracking/PageViewEvent\"},\"name\":\"hourly/2018/08/14/18\"}}";
-
DatasetDescriptor dataset = new DatasetDescriptor("hdfs", "/data/tracking/PageViewEvent");
String partitionName = "hourly/2018/08/14/18";
PartitionDescriptor partition = new PartitionDescriptor(partitionName, dataset);
- Assert.assertEquals(Descriptor.serialize(partition), partitionJson);
- System.out.println(partitionJson);
- Descriptor partition2 = Descriptor.deserialize(partitionJson);
+ // Test copy with new dataset
+ DatasetDescriptor dataset2 = new DatasetDescriptor("hive", "/data/tracking/PageViewEvent");
+ Descriptor partition2 = partition.copyWithNewDataset(dataset2);
Assert.assertEquals(partition2.getName(), partition.getName());
- Assert.assertEquals(((PartitionDescriptor)partition2).getDataset(), partition.getDataset());
- Assert.assertEquals(partition, partition2);
- Assert.assertEquals(partition.hashCode(), partition2.hashCode());
+ Assert.assertEquals(((PartitionDescriptor)partition2).getDataset(), dataset2);
// Test copy
PartitionDescriptor partition3 = partition.copy();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java b/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java
new file mode 100644
index 0000000..17f1ac0
--- /dev/null
+++ b/gobblin-api/src/test/java/org/apache/gobblin/util/io/GsonInterfaceAdapterTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.io;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+
+import org.apache.gobblin.util.test.BaseClass;
+import org.apache.gobblin.util.test.TestClass;
+
+
+public class GsonInterfaceAdapterTest {
+
+ @Test(groups = {"gobblin.util.io"})
+ public void test() {
+ Gson gson = GsonInterfaceAdapter.getGson(Object.class);
+
+ TestClass test = new TestClass();
+ test.absent = Optional.absent();
+ Assert.assertNotEquals(test, new TestClass());
+
+ String ser = gson.toJson(test);
+ BaseClass deser = gson.fromJson(ser, BaseClass.class);
+ Assert.assertEquals(test, deser);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java b/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java
new file mode 100644
index 0000000..4d1d7c7
--- /dev/null
+++ b/gobblin-api/src/test/java/org/apache/gobblin/util/test/BaseClass.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.test;
+
+import lombok.EqualsAndHashCode;
+
+import java.util.Random;
+
+
+/**
+ * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}.
+ */
+@EqualsAndHashCode
+public class BaseClass {
+
+ public BaseClass() {
+ this.field = Integer.toString(new Random().nextInt());
+ }
+
+ private String field;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java b/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java
new file mode 100644
index 0000000..32d43cc
--- /dev/null
+++ b/gobblin-api/src/test/java/org/apache/gobblin/util/test/ExtendedClass.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.test;
+
+import lombok.EqualsAndHashCode;
+
+import java.util.Random;
+
+
+/**
+ * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}.
+ */
+@EqualsAndHashCode(callSuper = true)
+public class ExtendedClass extends BaseClass {
+
+ private final int otherField = new Random().nextInt();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java b/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java
new file mode 100644
index 0000000..fd1947c
--- /dev/null
+++ b/gobblin-api/src/test/java/org/apache/gobblin/util/test/TestClass.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.test;
+
+import lombok.EqualsAndHashCode;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+
+/**
+ * Used for {@link org.apache.gobblin.util.io.GsonInterfaceAdapterTest}.
+ */
+@EqualsAndHashCode(callSuper = true)
+public class TestClass extends BaseClass {
+
+ private static final Random random = new Random();
+
+ private final int intValue = random.nextInt();
+ private final long longValue = random.nextLong();
+ private final double doubleValue = random.nextLong();
+ private final Map<String, Integer> map = createRandomMap();
+ private final List<String> list = createRandomList();
+ private final Optional<String> present = Optional.of(Integer.toString(random.nextInt()));
+ // Set manually to absent
+ public Optional<String> absent = Optional.of("a");
+ private final Optional<BaseClass> optionalObject = Optional.of(new BaseClass());
+ private final BaseClass polymorphic = new ExtendedClass();
+ private final Optional<? extends BaseClass> polymorphicOptional = Optional.of(new ExtendedClass());
+
+ private static Map<String, Integer> createRandomMap() {
+ Map<String, Integer> map = Maps.newHashMap();
+ int size = random.nextInt(5);
+ for (int i = 0; i < size; i++) {
+ map.put("value" + random.nextInt(), random.nextInt());
+ }
+ return map;
+ }
+
+ private static List<String> createRandomList() {
+ List<String> list = Lists.newArrayList();
+ int size = random.nextInt(5);
+ for (int i = 0; i < size; i++) {
+ list.add("value" + random.nextInt());
+ }
+ return list;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
index e90f895..efc349f 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
@@ -24,6 +24,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.records.ControlMessageHandler;
@@ -127,6 +128,11 @@ public class InstrumentedDataWriterDecorator<D> extends InstrumentedDataWriterBa
}
@Override
+ public Descriptor getDataDescriptor() {
+ return this.embeddedWriter.getDataDescriptor();
+ }
+
+ @Override
public Object getDecoratedObject() {
return this.embeddedWriter;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 2ddcd76..56b326e 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.publisher;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -59,19 +60,20 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.metadata.MetadataMerger;
import org.apache.gobblin.metadata.types.StaticStringMetadataMerger;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
-import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
-import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.FsDataWriter;
import org.apache.gobblin.writer.FsWriterMetrics;
import org.apache.gobblin.writer.PartitionIdentifier;
+import org.apache.gobblin.writer.PartitionedDataWriter;
import static org.apache.gobblin.util.retry.RetryerFactory.*;
@@ -294,12 +296,31 @@ public class BaseDataPublisher extends SingleTaskDataPublisher {
}
private void addLineageInfo(WorkUnitState state, int branchId) {
- DatasetDescriptor destination = createDestinationDescriptor(state, branchId);
- if (this.lineageInfo.isPresent()) {
- this.lineageInfo.get().putDestination(destination, branchId, state);
+ if (!this.lineageInfo.isPresent()) {
+ LOG.info("Will not add lineage info");
+ return;
+ }
+
+ // Final dataset descriptor
+ DatasetDescriptor datasetDescriptor = createDestinationDescriptor(state, branchId);
+
+ List<PartitionDescriptor> partitions = PartitionedDataWriter.getPartitionInfoAndClean(state, branchId);
+ List<Descriptor> descriptors = new ArrayList<>();
+ if (partitions.size() == 0) {
+ // Report as dataset level lineage
+ descriptors.add(datasetDescriptor);
+ } else {
+ // Report as partition level lineage
+ for (PartitionDescriptor partition : partitions) {
+ descriptors.add(partition.copyWithNewDataset(datasetDescriptor));
+ }
}
+ this.lineageInfo.get().putDestination(descriptors, branchId, state);
}
+ /**
+ * Create destination dataset descriptor
+ */
protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
Path publisherOutputDir = getPublisherOutputDir(state, branchId);
FileSystem fs = this.publisherFileSystemByBranches.get(branchId);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
index 157552e..e3ca31c 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
@@ -18,13 +18,17 @@
package org.apache.gobblin.publisher;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import com.google.common.collect.Lists;
+
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.ForkOperatorUtils;
import org.apache.gobblin.util.ParallelRunner;
@@ -68,21 +72,4 @@ public class TimePartitionedDataPublisher extends BaseDataPublisher {
movePath(parallelRunner, workUnitState, status.getPath(), outputPath, branchId);
}
}
-
- @Override
- protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) {
- // Get base descriptor
- DatasetDescriptor descriptor = super.createDestinationDescriptor(state, branchId);
-
- // Decorate with partition prefix
- String propName = ForkOperatorUtils
- .getPropertyNameForBranch(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, numBranches, branchId);
- String timePrefix = state.getProp(propName, "");
- Path pathWithTimePrefix = new Path(descriptor.getName(), timePrefix);
- DatasetDescriptor destination = new DatasetDescriptor(descriptor.getPlatform(), pathWithTimePrefix.toString());
- // Add back the metadata
- descriptor.getMetadata().forEach(destination::addMetadata);
-
- return destination;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
index 66e5a26..5e912bb 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.FlushControlMessageHandler;
import org.apache.gobblin.stream.ControlMessage;
@@ -144,6 +145,11 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
}
@Override
+ public Descriptor getDataDescriptor() {
+ return writer.getDataDescriptor();
+ }
+
+ @Override
public ControlMessageHandler getMessageHandler() {
return this.controlMessageHandler;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index f4e4931..aa228af 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -39,6 +39,10 @@ import org.apache.gobblin.codec.StreamCodec;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.metadata.types.GlobalMetadata;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.util.ForkOperatorUtils;
@@ -155,6 +159,19 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta
}
}
+ @Override
+ public Descriptor getDataDescriptor() {
+ // Dataset is resulted from WriterUtils.getWriterOutputDir(properties, this.numBranches, this.branchId)
+ // The writer dataset might not be same as the published dataset
+ DatasetDescriptor datasetDescriptor = new DatasetDescriptor(fs.getScheme(), outputFile.getParent().toString());
+
+ if (partitionKey == null) {
+ return datasetDescriptor;
+ }
+
+ return new PartitionDescriptor(partitionKey, datasetDescriptor);
+ }
+
/**
* Create the staging output file and an {@link OutputStream} to write to the file.
*
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 83dd074..cdfc11b 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -18,6 +18,8 @@
package org.apache.gobblin.writer;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
@@ -29,9 +31,11 @@ import org.apache.commons.lang3.reflect.ConstructorUtils;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import lombok.extern.slf4j.Slf4j;
@@ -39,6 +43,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
import org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
import org.apache.gobblin.records.ControlMessageHandler;
@@ -66,6 +72,9 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
private int writerIdSuffix = 0;
private final String baseWriterId;
+ private final State state;
+ private final int branchId;
+
private final Optional<WriterPartitioner> partitioner;
private final LoadingCache<GenericRecord, DataWriter<D>> partitionWriters;
private final Optional<PartitionAwareDataWriterBuilder> builder;
@@ -78,6 +87,9 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
public PartitionedDataWriter(DataWriterBuilder<S, D> builder, final State state)
throws IOException {
+ this.state = state;
+ this.branchId = builder.branch;
+
this.isSpeculativeAttemptSafe = true;
this.isWatermarkCapable = true;
this.baseWriterId = builder.getWriterId();
@@ -227,7 +239,11 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
@Override
public void close()
throws IOException {
- this.closer.close();
+ try {
+ serializePartitionInfoToState();
+ } finally {
+ this.closer.close();
+ }
}
private DataWriter<D> createPartitionWriter(GenericRecord partition)
@@ -352,4 +368,58 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
cloner.close();
}
}
+
+ /**
+ * Get the serialized key to partitions info in {@link #state}
+ */
+ private static String getPartitionsKey(int branchId) {
+ return String.format("writer.%d.partitions", branchId);
+ }
+
+ /**
+ * Serialize partitions info to {@link #state} if they are any
+ */
+ private void serializePartitionInfoToState() {
+ List<PartitionDescriptor> descriptors = new ArrayList<>();
+
+ for (DataWriter writer : partitionWriters.asMap().values()) {
+ Descriptor descriptor = writer.getDataDescriptor();
+ if (null == descriptor) {
+ log.warn("Drop partition info as writer {} returns a null PartitionDescriptor", writer.toString());
+ continue;
+ }
+
+ if (!(descriptor instanceof PartitionDescriptor)) {
+ log.warn("Drop partition info as writer {} does not return a PartitionDescriptor", writer.toString());
+ continue;
+ }
+
+ descriptors.add((PartitionDescriptor)descriptor);
+ }
+
+ if (descriptors.size() > 0) {
+ state.setProp(getPartitionsKey(branchId), PartitionDescriptor.toPartitionJsonList(descriptors));
+ } else {
+ log.info("Partitions info not available. Will not serialize partitions");
+ }
+ }
+
+ /**
+ * Get the partition info of a work unit from the {@code state}. Then partition info will be removed from the
+ * {@code state} to avoid persisting useless information
+ *
+ * <p>
+ * In Gobblin, only the {@link PartitionedDataWriter} knows all partitions written for a work unit. Each partition
+ * {@link DataWriter} decides the actual form of a dataset partition
+ * </p>
+ */
+ public static List<PartitionDescriptor> getPartitionInfoAndClean(State state, int branchId) {
+ String partitionsKey = getPartitionsKey(branchId);
+ String json = state.getProp(partitionsKey);
+ if (Strings.isNullOrEmpty(json)) {
+ return Lists.newArrayList();
+ }
+ state.removeProp(partitionsKey);
+ return PartitionDescriptor.fromPartitionJsonList(json);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
index d46d6e3..469b72f 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
@@ -21,25 +21,27 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
@@ -47,16 +49,19 @@ import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
-import org.apache.gobblin.broker.iface.SubscopedBrokerBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.metadata.MetadataMerger;
import org.apache.gobblin.metadata.types.GlobalMetadata;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
import org.apache.gobblin.writer.FsDataWriter;
import org.apache.gobblin.writer.FsWriterMetrics;
import org.apache.gobblin.writer.PartitionIdentifier;
@@ -66,6 +71,10 @@ import org.apache.gobblin.writer.PartitionIdentifier;
* Tests for BaseDataPublisher
*/
public class BaseDataPublisherTest {
+ private static final Type PARTITION_LIST_TYPE = new TypeToken<ArrayList<PartitionDescriptor>>(){}.getType();
+ private static final Gson GSON =
+ new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(Descriptor.class)).create();
+
/**
* Test DATA_PUBLISHER_METADATA_STR: a user should be able to put an arbitrary metadata string in job configuration
* and have that written out.
@@ -532,6 +541,9 @@ public class BaseDataPublisherTest {
}
}
+ /**
+ * Test lineage info is set on publishing single task
+ */
@Test
public void testPublishSingleTask()
throws IOException {
@@ -545,6 +557,9 @@ public class BaseDataPublisherTest {
Assert.assertFalse(state.contains("gobblin.event.lineage.branch.1.destination"));
}
+ /**
+ * Test lineage info is set on publishing multiple tasks
+ */
@Test
public void testPublishMultiTasks()
throws IOException {
@@ -562,6 +577,69 @@ public class BaseDataPublisherTest {
Assert.assertTrue(state2.contains("gobblin.event.lineage.branch.1.destination"));
}
+ /**
+ * Test partition level lineages are set
+ */
+ @Test
+ public void testPublishedPartitionsLineage()
+ throws IOException {
+ int numBranches = 2;
+ int numPartitionsPerBranch = 2;
+
+ WorkUnitState state = buildTaskState(numBranches);
+ LineageInfo lineageInfo = LineageInfo.getLineageInfo(state.getTaskBroker()).get();
+ DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic");
+ lineageInfo.setSource(source, state);
+ BaseDataPublisher publisher = new BaseDataPublisher(state);
+
+ // Set up writer partition descriptors
+ DatasetDescriptor datasetAtWriter = new DatasetDescriptor("dummy", "dummy");
+ for (int i = 0; i < numBranches; i++) {
+ List<PartitionDescriptor> partitions = new ArrayList<>();
+ for (int j = 0; j < numPartitionsPerBranch; j++) {
+ // Dummy dataset descriptor will be discarded by publisher
+ partitions.add(new PartitionDescriptor("partition" + i + j, datasetAtWriter));
+ }
+ String partitionsKey = "writer." + i + ".partitions";
+ state.setProp(partitionsKey, GSON.toJson(partitions, PARTITION_LIST_TYPE));
+ }
+
+ publisher.publish(ImmutableList.of(state));
+
+ Assert.assertTrue(state.contains("gobblin.event.lineage.branch.0.destination"));
+ Assert.assertTrue(state.contains("gobblin.event.lineage.branch.1.destination"));
+
+ Collection<LineageEventBuilder> events = LineageInfo.load(ImmutableList.of(state));
+ Assert.assertTrue(events.size() == 4);
+
+ // Find the partition lineage and assert
+ for (int i = 0; i < numBranches; i++) {
+ String outputPath = String.format("/data/output/branch%d/namespace/table", i);
+ DatasetDescriptor destinationDataset = new DatasetDescriptor("file", outputPath);
+ destinationDataset.addMetadata("fsUri", "file:///");
+ destinationDataset.addMetadata("branch", "" + i);
+
+ for (int j = 0; j < numPartitionsPerBranch; j++) {
+ LineageEventBuilder event = find(events, "partition" + i + j);
+ Assert.assertTrue(null != event);
+ Assert.assertEquals(event.getSource(), source);
+ Assert.assertEquals(event.getDestination(),
+ // Dataset written by the writer is discarded
+ new PartitionDescriptor("partition" + i + j, destinationDataset));
+ }
+ }
+ }
+
+ private static LineageEventBuilder find(Collection<LineageEventBuilder> events, String partitionName) {
+ for (LineageEventBuilder event : events) {
+ if (event.getDestination().getName().equals(partitionName)) {
+ return event;
+ }
+ }
+
+ return null;
+ }
+
public static class TestAdditionMerger implements MetadataMerger<String> {
private int sum = 0;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
index 0dc9846..20984ef 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
@@ -19,14 +19,18 @@ package org.apache.gobblin.writer;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.gobblin.ack.BasicAckableForTesting;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.stream.FlushControlMessage;
import org.testng.Assert;
import org.testng.annotations.Test;
+import org.testng.util.Strings;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -107,6 +111,10 @@ public class PartitionedWriterTest {
action = builder.actions.poll();
Assert.assertEquals(action.getType(), TestPartitionAwareWriterBuilder.Actions.CLEANUP);
+ // Before close, partitions info is not serialized
+ String partitionsKey = "writer.0.partitions";
+ Assert.assertTrue(state.getProp(partitionsKey) == null);
+
writer.close();
Assert.assertEquals(builder.actions.size(), 2);
action = builder.actions.poll();
@@ -114,6 +122,16 @@ public class PartitionedWriterTest {
action = builder.actions.poll();
Assert.assertEquals(action.getType(), TestPartitionAwareWriterBuilder.Actions.CLOSE);
+ // After close, partitions info is available
+ Assert.assertFalse(Strings.isNullOrEmpty(state.getProp(partitionsKey)));
+ List<PartitionDescriptor> partitions = PartitionedDataWriter.getPartitionInfoAndClean(state, 0);
+ Assert.assertTrue(state.getProp(partitionsKey) == null);
+ Assert.assertEquals(partitions.size(), 2);
+
+ DatasetDescriptor dataset = new DatasetDescriptor("testPlatform", "testDataset");
+ Assert.assertEquals(partitions.get(0), new PartitionDescriptor("a", dataset));
+ Assert.assertEquals(partitions.get(1), new PartitionDescriptor("1", dataset));
+
writer.commit();
Assert.assertEquals(builder.actions.size(), 2);
action = builder.actions.poll();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
index 8b1e0ca..8da42d7 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/test/TestPartitionAwareWriterBuilder.java
@@ -25,6 +25,9 @@ import org.apache.avro.Schema;
import com.google.common.collect.Queues;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.writer.DataWriter;
import org.apache.gobblin.writer.PartitionAwareDataWriterBuilder;
@@ -118,6 +121,12 @@ public class TestPartitionAwareWriterBuilder extends PartitionAwareDataWriterBui
public boolean isSpeculativeAttemptSafe() {
return true;
}
+
+ @Override
+ public Descriptor getDataDescriptor() {
+ DatasetDescriptor dataset = new DatasetDescriptor("testPlatform", "testDataset");
+ return new PartitionDescriptor(this.partition, dataset);
+ }
}
@Data
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
index eddd852..bfa76c5 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDatasetTest.java
@@ -18,10 +18,13 @@ package org.apache.gobblin.data.management.conversion.hive.dataset;
import com.google.common.base.Optional;
import java.io.InputStream;
+import java.lang.reflect.Type;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Properties;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
@@ -53,18 +56,28 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset.ConversionConfig;
import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.io.GsonInterfaceAdapter;
import static org.mockito.Mockito.when;
@Test(groups = { "gobblin.data.management.conversion" })
public class ConvertibleHiveDatasetTest {
+
+ /** Lineage info ser/de */
+ private static final Type DESCRIPTOR_LIST_TYPE = new TypeToken<ArrayList<Descriptor>>(){}.getType();
+ private static final Gson GSON =
+ new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(Descriptor.class)).create();
+
+
/**
* Test if lineage information is properly set in the workunit for convertible hive datasets
*/
@@ -73,7 +86,6 @@ public class ConvertibleHiveDatasetTest {
String testConfFilePath = "convertibleHiveDatasetTest/flattenedAndNestedOrc.conf";
Config config = ConfigFactory.parseResources(testConfFilePath).getConfig("hive.conversion.avro");
// Set datasetResolverFactory to convert Hive Lineage event to Hdfs Lineage event
- Gson GSON = new Gson();
ConvertibleHiveDataset testConvertibleDataset = createTestConvertibleDataset(config);
HiveWorkUnit workUnit = new HiveWorkUnit(testConvertibleDataset);
workUnit.setProp("gobblin.broker.lineageInfo.datasetResolverFactory",
@@ -104,7 +116,7 @@ public class ConvertibleHiveDatasetTest {
// Assert that source is correct for lineage event
Assert.assertTrue(props.containsKey("gobblin.event.lineage.source"));
DatasetDescriptor sourceDD =
- (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.source"));
+ GSON.fromJson(props.getProperty("gobblin.event.lineage.source"), DatasetDescriptor.class);
Assert.assertEquals(sourceDD.getPlatform(), "file");
Assert.assertEquals(sourceDD.getName(), "/tmp/test");
Assert.assertEquals(sourceDD.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE), "db1.tb1");
@@ -112,7 +124,7 @@ public class ConvertibleHiveDatasetTest {
// Assert that first dest is correct for lineage event
Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.1.destination"));
DatasetDescriptor destDD1 =
- (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.branch.1.destination"));
+ (DatasetDescriptor) firstDescriptor(props, "gobblin.event.lineage.branch.1.destination");
Assert.assertEquals(destDD1.getPlatform(), "file");
Assert.assertEquals(destDD1.getName(), "/tmp/data_nestedOrc/db1/tb1/final");
Assert.assertEquals(destDD1.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
@@ -121,13 +133,18 @@ public class ConvertibleHiveDatasetTest {
// Assert that second dest is correct for lineage event
Assert.assertTrue(props.containsKey("gobblin.event.lineage.branch.2.destination"));
DatasetDescriptor destDD2 =
- (DatasetDescriptor) Descriptor.deserialize(props.getProperty("gobblin.event.lineage.branch.2.destination"));
+ (DatasetDescriptor) firstDescriptor(props, "gobblin.event.lineage.branch.2.destination");
Assert.assertEquals(destDD2.getPlatform(), "file");
Assert.assertEquals(destDD2.getName(), "/tmp/data_flattenedOrc/db1/tb1/final");
Assert.assertEquals(destDD2.getMetadata().get(HiveToHdfsDatasetResolver.HIVE_TABLE),
"db1_flattenedOrcDb.tb1_flattenedOrc");
}
+ private Descriptor firstDescriptor(Properties prop, String destinationKey) {
+ List<Descriptor> descriptors = GSON.fromJson(prop.getProperty(destinationKey), DESCRIPTOR_LIST_TYPE);
+ return descriptors.get(0);
+ }
+
@Test
public void testFlattenedOrcConfig() throws Exception {
String testConfFilePath = "convertibleHiveDatasetTest/flattenedOrc.conf";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
index 46f8ab1..c712c4d 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/MetricContext.java
@@ -733,7 +733,7 @@ public class MetricContext extends MetricRegistry implements ReportableContext,
*/
public MetricContext buildStrict() throws NameConflictException {
if(this.parent == null) {
- this.parent = RootMetricContext.get();
+ hasParent(RootMetricContext.get());
}
return new MetricContext(this.name, this.parent, this.tags, false);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
index c920cc3..63d7237 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
@@ -17,16 +17,10 @@
package org.apache.gobblin.metrics.event.lineage;
-
import java.util.Map;
import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.dataset.DatasetDescriptor;
-import org.apache.gobblin.dataset.Descriptor;
-import org.apache.gobblin.metrics.GobblinTrackingEvent;
-import org.apache.gobblin.metrics.event.GobblinEventBuilder;
-
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
@@ -35,6 +29,10 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
/**
* The builder builds a specific {@link GobblinTrackingEvent} whose metadata has {@value GobblinEventBuilder#EVENT_TYPE}
@@ -65,8 +63,8 @@ public final class LineageEventBuilder extends GobblinEventBuilder {
@Override
public GobblinTrackingEvent build() {
Map<String, String> dataMap = Maps.newHashMap(metadata);
- dataMap.put(SOURCE, Descriptor.serialize(source));
- dataMap.put(DESTINATION, Descriptor.serialize(destination));
+ dataMap.put(SOURCE, Descriptor.toJson(source));
+ dataMap.put(DESTINATION, Descriptor.toJson(destination));
return new GobblinTrackingEvent(0L, namespace, name, dataMap);
}
@@ -126,10 +124,10 @@ public final class LineageEventBuilder extends GobblinEventBuilder {
metadata.forEach((key, value) -> {
switch (key) {
case SOURCE:
- lineageEvent.setSource(Descriptor.deserialize(value));
+ lineageEvent.setSource(Descriptor.fromJson(value));
break;
case DESTINATION:
- lineageEvent.setDestination(Descriptor.deserialize(value));
+ lineageEvent.setDestination(Descriptor.fromJson(value));
break;
default:
lineageEvent.addMetadata(key, value);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
index 0311df7..3e65105 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -17,7 +17,10 @@
package org.apache.gobblin.metrics.event.lineage;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -25,9 +28,9 @@ import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.gson.Gson;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -80,7 +83,6 @@ public final class LineageInfo {
private static final String DATASET_RESOLVER_CONFIG_NAMESPACE = "datasetResolver";
private static final String BRANCH = "branch";
- private static final Gson GSON = new Gson();
private static final String NAME_KEY = "name";
private static final Config FALLBACK =
@@ -112,7 +114,7 @@ public final class LineageInfo {
}
state.setProp(getKey(NAME_KEY), descriptor.getName());
- state.setProp(getKey(LineageEventBuilder.SOURCE), Descriptor.serialize(descriptor));
+ state.setProp(getKey(LineageEventBuilder.SOURCE), Descriptor.toJson(descriptor));
}
/**
@@ -123,20 +125,41 @@ public final class LineageInfo {
* is supposed to put the destination dataset information. Since different branches may concurrently put,
* the method is implemented to be threadsafe
* </p>
+ *
+ * @deprecated Use {@link #putDestination(List, int, State)}
*/
+ @Deprecated
public void putDestination(Descriptor destination, int branchId, State state) {
+ putDestination(Lists.newArrayList(destination), branchId, state);
+ }
+
+ /**
+ * Put data {@link Descriptor}s of a destination dataset to a state
+ *
+ * @param descriptors It can be a single item list which just has the dataset descriptor or a list
+ * of dataset partition descriptors
+ */
+ public void putDestination(List<Descriptor> descriptors, int branchId, State state) {
+
if (!hasLineageInfo(state)) {
- log.warn("State has no lineage info but branch " + branchId + " puts a destination: " + GSON.toJson(destination));
+ log.warn("State has no lineage info but branch " + branchId + " puts {} descriptors", descriptors.size());
return;
}
- log.debug(String.format("Put destination %s for branch %d", GSON.toJson(destination), branchId));
+
+ log.info(String.format("Put destination %s for branch %d", Descriptor.toJson(descriptors), branchId));
+
synchronized (state.getProp(getKey(NAME_KEY))) {
- Descriptor descriptor = resolver.resolve(destination, state);
- if (descriptor == null) {
- return;
+ List<Descriptor> resolvedDescriptors = new ArrayList<>();
+ for (Descriptor descriptor : descriptors) {
+ Descriptor resolvedDescriptor = resolver.resolve(descriptor, state);
+ if (resolvedDescriptor == null) {
+ continue;
+ }
+ resolvedDescriptors.add(resolvedDescriptor);
}
- state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION), Descriptor.serialize(descriptor));
+ state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION),
+ Descriptor.toJson(resolvedDescriptors));
}
}
@@ -150,8 +173,8 @@ public final class LineageInfo {
Preconditions.checkArgument(states != null && !states.isEmpty());
Set<LineageEventBuilder> allEvents = Sets.newHashSet();
for (State state : states) {
- Map<String, LineageEventBuilder> branchedEvents = load(state);
- allEvents.addAll(branchedEvents.values());
+ Map<String, Set<LineageEventBuilder>> branchedEvents = load(state);
+ branchedEvents.values().forEach(allEvents::addAll);
}
return allEvents;
}
@@ -161,12 +184,12 @@ public final class LineageInfo {
*
* @return A map from branch to its lineage info. If there is no destination info, return an empty map
*/
- static Map<String, LineageEventBuilder> load(State state) {
+ static Map<String, Set<LineageEventBuilder>> load(State state) {
String name = state.getProp(getKey(NAME_KEY));
- Descriptor source = Descriptor.deserialize(state.getProp(getKey(LineageEventBuilder.SOURCE)));
+ Descriptor source = Descriptor.fromJson(state.getProp(getKey(LineageEventBuilder.SOURCE)));
String branchedPrefix = getKey(BRANCH, "");
- Map<String, LineageEventBuilder> events = Maps.newHashMap();
+ Map<String, Set<LineageEventBuilder>> events = Maps.newHashMap();
if (source == null) {
return events;
}
@@ -180,16 +203,17 @@ public final class LineageInfo {
String[] parts = key.substring(branchedPrefix.length()).split("\\.");
assert parts.length == 2;
String branchId = parts[0];
- LineageEventBuilder event = events.get(branchId);
- if (event == null) {
- event = new LineageEventBuilder(name);
- event.setSource(source.copy());
- events.put(parts[0], event);
- }
+ Set<LineageEventBuilder> branchEvents = events.computeIfAbsent(branchId, k -> new HashSet<>());
+
switch (parts[1]) {
case LineageEventBuilder.DESTINATION:
- Descriptor destination = Descriptor.deserialize(entry.getValue().toString());
- event.setDestination(destination);
+ List<Descriptor> descriptors = Descriptor.fromJsonList(entry.getValue().toString());
+ for (Descriptor descriptor : descriptors) {
+ LineageEventBuilder event = new LineageEventBuilder(name);
+ event.setSource(source);
+ event.setDestination(descriptor);
+ branchEvents.add(event);
+ }
break;
default:
throw new RuntimeException("Unsupported lineage key: " + key);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
index 8ca6d5e..5fd7952 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.metrics.event.lineage;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
@@ -65,9 +66,9 @@ public class LineageEventTest {
destination01.addMetadata(branch, "1");
lineageInfo.putDestination(destination01, 1, state0);
- Map<String, LineageEventBuilder> events = LineageInfo.load(state0);
- verify(events.get("0"), topic, source, destination00);
- verify(events.get("1"), topic, source, destination01);
+ Map<String, Set<LineageEventBuilder>> events = LineageInfo.load(state0);
+ verify(first(events.get("0")), topic, source, destination00);
+ verify(first(events.get("1")), topic, source, destination01);
State state1 = new State();
lineageInfo.setSource(source, state1);
@@ -78,8 +79,8 @@ public class LineageEventTest {
// Test only full fledged lineage events are loaded
Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
Assert.assertTrue(eventsList.size() == 2);
- Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0"));
- Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1"));
+ Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), first(events.get("0")));
+ Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), first(events.get("1")));
// There are 3 full fledged lineage events
DatasetDescriptor destination12 = new DatasetDescriptor(mysql, "kafka.testTopic2");
@@ -87,8 +88,8 @@ public class LineageEventTest {
lineageInfo.putDestination(destination12, 2, state1);
eventsList = LineageInfo.load(states);
Assert.assertTrue(eventsList.size() == 3);
- Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0"));
- Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1"));
+ Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), first(events.get("0")));
+ Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), first(events.get("1")));
verify(getLineageEvent(eventsList, 2, mysql), topic, source, destination12);
@@ -100,8 +101,8 @@ public class LineageEventTest {
lineageInfo.putDestination(destination11, 1, state1);
eventsList = LineageInfo.load(states);
Assert.assertTrue(eventsList.size() == 4);
- Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0"));
- Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), events.get("1"));
+ Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), first(events.get("0")));
+ Assert.assertEquals(getLineageEvent(eventsList, 1, mysql), first(events.get("1")));
// Either branch 0 or 2 of state 1 is selected
LineageEventBuilder event12 = getLineageEvent(eventsList, 0, mysql);
if (event12 == null) {
@@ -127,8 +128,8 @@ public class LineageEventTest {
PartitionDescriptor destination = new PartitionDescriptor(partitionName, destinationDataset);
lineageInfo.putDestination(destination, 0, state);
- Map<String, LineageEventBuilder> events = LineageInfo.load(state);
- LineageEventBuilder event = events.get("0");
+ Map<String, Set<LineageEventBuilder>> events = LineageInfo.load(state);
+ LineageEventBuilder event = first(events.get("0"));
verify(event, topic, source, destination);
// Verify gobblin tracking event
@@ -170,4 +171,8 @@ public class LineageEventTest {
Assert.assertTrue(event.getSource().equals(source));
Assert.assertTrue(event.getDestination().equals(destination));
}
+
+ private <T> T first(Collection<T> collection) {
+ return collection.iterator().next();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
index c78d098..3a28b18 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobLauncher.java
@@ -130,6 +130,11 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
HadoopUtils.addGobblinSite();
+ // Configure root metric context
+ List<Tag<?>> tags = Lists.newArrayList();
+ tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
+ RootMetricContext.get(tags);
+
if (props.containsKey(GOBBLIN_LOG_LEVEL_KEY)) {
Level logLevel = Level.toLevel(props.getProperty(GOBBLIN_LOG_LEVEL_KEY), Level.INFO);
Logger.getLogger("org.apache.gobblin").setLevel(logLevel);
@@ -198,9 +203,6 @@ public class AzkabanJobLauncher extends AbstractJob implements ApplicationLaunch
jobProps = ConfigUtils.configToProperties(resolvedJob);
}
- List<Tag<?>> tags = Lists.newArrayList();
- tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
- RootMetricContext.get(tags);
GobblinMetrics.addCustomTagsToProperties(jobProps, tags);
// If the job launcher type is not specified in the job configuration,
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
index 0b0e06d..95931a6 100644
--- a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
+++ b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
@@ -49,8 +49,9 @@ public class SalesforceSourceTest {
List<WorkUnit> workUnits = source.generateWorkUnits(sourceEntity, sourceState, 20140213000000L);
Assert.assertEquals(workUnits.size(), 1);
- DatasetDescriptor sourceDataset = new DatasetDescriptor("salesforce", "contacts");
- Assert.assertEquals(Descriptor.serialize(sourceDataset), workUnits.get(0).getProp("gobblin.event.lineage.source"));
+ String expected = "{\"object-type\":\"org.apache.gobblin.dataset.DatasetDescriptor\","
+ + "\"object-data\":{\"platform\":\"salesforce\",\"metadata\":{},\"name\":\"contacts\"}}";
+ Assert.assertEquals(expected, workUnits.get(0).getProp("gobblin.event.lineage.source"));
Assert.assertEquals(workUnits.get(0).getProp("gobblin.event.lineage.name"), "contacts");
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef59a151/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
deleted file mode 100644
index 5973aa9..0000000
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/GsonInterfaceAdapter.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.util.io;
-
-import lombok.AllArgsConstructor;
-import lombok.RequiredArgsConstructor;
-
-import java.io.IOException;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.ParameterizedType;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.commons.lang3.ClassUtils;
-
-import com.google.common.base.Optional;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-import com.google.gson.TypeAdapter;
-import com.google.gson.TypeAdapterFactory;
-import com.google.gson.internal.Streams;
-import com.google.gson.reflect.TypeToken;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonWriter;
-
-
-/**
- * A {@link Gson} interface adapter that makes it possible to serialize and deserialize polymorphic objects.
- *
- * <p>
- * This adapter will capture all instances of {@link #baseClass} and write them as
- * {"object-type":"class.name", "object-data":"data"}, allowing for correct serialization and deserialization of
- * polymorphic objects. The following types will not be captured by the adapter (i.e. they will be written by the
- * default GSON writer):
- * - Primitives and boxed primitives
- * - Arrays
- * - Collections
- * - Maps
- * Additionally, generic classes (e.g. class MyClass<T>) cannot be correctly decoded.
- * </p>
- *
- * <p>
- * To use:
- * <pre>
- * {@code
- * MyClass object = new MyClass();
- * Gson gson = GsonInterfaceAdapter.getGson(MyBaseClass.class);
- * String json = gson.toJson(object);
- * Myclass object2 = gson.fromJson(json, MyClass.class);
- * }
- * </pre>
- * </p>
- *
- * <p>
- * Note: a useful case is GsonInterfaceAdapter.getGson(Object.class), which will correctly serialize / deserialize
- * all types except for java generics.
- * </p>
- *
- * @param <T> The interface or abstract type to be serialized and deserialized with {@link Gson}.
- */
-@RequiredArgsConstructor
-public class GsonInterfaceAdapter implements TypeAdapterFactory {
-
- protected static final String OBJECT_TYPE = "object-type";
- protected static final String OBJECT_DATA = "object-data";
-
- private final Class<?> baseClass;
-
- @Override
- public <R> TypeAdapter<R> create(Gson gson, TypeToken<R> type) {
- if (ClassUtils.isPrimitiveOrWrapper(type.getRawType()) || type.getType() instanceof GenericArrayType
- || CharSequence.class.isAssignableFrom(type.getRawType())
- || (type.getType() instanceof ParameterizedType && (Collection.class.isAssignableFrom(type.getRawType())
- || Map.class.isAssignableFrom(type.getRawType())))) {
- // delegate primitives, arrays, collections, and maps
- return null;
- }
- if (!this.baseClass.isAssignableFrom(type.getRawType())) {
- // delegate anything not assignable from base class
- return null;
- }
- TypeAdapter<R> adapter = new InterfaceAdapter<>(gson, this, type);
- return adapter;
- }
-
- @AllArgsConstructor
- private static class InterfaceAdapter<R> extends TypeAdapter<R> {
-
- private final Gson gson;
- private final TypeAdapterFactory factory;
- private final TypeToken<R> typeToken;
-
- @Override
- public void write(JsonWriter out, R value) throws IOException {
- if (Optional.class.isAssignableFrom(this.typeToken.getRawType())) {
- Optional opt = (Optional) value;
- if (opt != null && opt.isPresent()) {
- Object actualValue = opt.get();
- writeObject(actualValue, out);
- } else {
- out.beginObject();
- out.endObject();
- }
- } else {
- writeObject(value, out);
- }
- }
-
- @Override
- public R read(JsonReader in) throws IOException {
- JsonElement element = Streams.parse(in);
- if (element.isJsonNull()) {
- return readNull();
- }
- JsonObject jsonObject = element.getAsJsonObject();
-
- if (this.typeToken.getRawType() == Optional.class) {
- if (jsonObject.has(OBJECT_TYPE)) {
- return (R) Optional.of(readValue(jsonObject, null));
- } else if (jsonObject.entrySet().isEmpty()) {
- return (R) Optional.absent();
- } else {
- throw new IOException("No class found for Optional value.");
- }
- }
- return this.readValue(jsonObject, this.typeToken);
- }
-
- private <S> S readNull() {
- if (this.typeToken.getRawType() == Optional.class) {
- return (S) Optional.absent();
- }
- return null;
- }
-
- private <S> void writeObject(S value, JsonWriter out) throws IOException {
- if (value != null) {
- JsonObject jsonObject = new JsonObject();
- jsonObject.add(OBJECT_TYPE, new JsonPrimitive(value.getClass().getName()));
- TypeAdapter<S> delegate =
- (TypeAdapter<S>) this.gson.getDelegateAdapter(this.factory, TypeToken.get(value.getClass()));
- jsonObject.add(OBJECT_DATA, delegate.toJsonTree(value));
- Streams.write(jsonObject, out);
- } else {
- out.nullValue();
- }
- }
-
- private <S> S readValue(JsonObject jsonObject, TypeToken<S> defaultTypeToken) throws IOException {
- try {
- TypeToken<S> actualTypeToken;
- if (jsonObject.isJsonNull()) {
- return null;
- } else if (jsonObject.has(OBJECT_TYPE)) {
- String className = jsonObject.get(OBJECT_TYPE).getAsString();
- Class<S> klazz = (Class<S>) Class.forName(className);
- actualTypeToken = TypeToken.get(klazz);
- } else if (defaultTypeToken != null) {
- actualTypeToken = defaultTypeToken;
- } else {
- throw new IOException("Could not determine TypeToken.");
- }
- TypeAdapter<S> delegate = this.gson.getDelegateAdapter(this.factory, actualTypeToken);
- S value = delegate.fromJsonTree(jsonObject.get(OBJECT_DATA));
- return value;
- } catch (ClassNotFoundException cnfe) {
- throw new IOException(cnfe);
- }
- }
-
- }
-
- public static <T> Gson getGson(Class<T> clazz) {
- Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonInterfaceAdapter(clazz)).create();
- return gson;
- }
-}