You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/06 06:21:10 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-676] Add record
metadata support to the RecordEnvelope
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5b226d4 [GOBBLIN-676] Add record metadata support to the RecordEnvelope
5b226d4 is described below
commit 5b226d4d21cad0ea99fae9d025bbc75805bbd98f
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Tue Feb 5 22:21:07 2019 -0800
[GOBBLIN-676] Add record metadata support to the RecordEnvelope
Closes #2546 from htran1/record_envelope_metadata
---
.../org/apache/gobblin/converter/Converter.java | 53 +++++----
.../org/apache/gobblin/stream/RecordEnvelope.java | 46 ++++++++
.../apache/gobblin/converter/ConverterTest.java | 53 +++++++++
.../apache/gobblin/stream/RecordEnvelopeTest.java | 125 +++++++++++++++++++++
4 files changed, 257 insertions(+), 20 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
index 514f5be..b022304 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
@@ -111,6 +111,38 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState
throws DataConversionException;
/**
+ * Converts a {@link RecordEnvelope}. This method can be overridden by implementations that need to manipulate the
+ * {@link RecordEnvelope}, such as to set watermarks or metadata.
+ * @param outputSchema output schema converted using the {@link Converter#convertSchema} method
+ * @param inputRecordEnvelope input record envelope with data record to be converted
+ * @param workUnitState a {@link WorkUnitState} object carrying configuration properties
+ * @return a {@link Flowable} emitting the converted {@link RecordEnvelope}s
+ * @throws DataConversionException
+ */
+ protected Flowable<RecordEnvelope<DO>> convertRecordEnvelope(SO outputSchema, RecordEnvelope<DI> inputRecordEnvelope,
+ WorkUnitState workUnitState) throws DataConversionException {
+ Iterator<DO> convertedIterable = convertRecord(outputSchema,
+ inputRecordEnvelope.getRecord(), workUnitState).iterator();
+
+ if (!convertedIterable.hasNext()) {
+ inputRecordEnvelope.ack();
+ // if the iterable is empty, ack the record, return an empty flowable
+ return Flowable.empty();
+ }
+
+ DO firstRecord = convertedIterable.next();
+ if (!convertedIterable.hasNext()) {
+ // if the iterable has only one element, use RecordEnvelope.withRecord, which is more efficient
+ return Flowable.just(inputRecordEnvelope.withRecord(firstRecord));
+ } else {
+ // if the iterable has multiple records, use a ForkRecordBuilder
+ RecordEnvelope<DI>.ForkRecordBuilder<DO> forkRecordBuilder = inputRecordEnvelope.forkRecordBuilder();
+ return Flowable.just(firstRecord).concatWith(Flowable.fromIterable(() -> convertedIterable))
+ .map(forkRecordBuilder::childRecord).doOnComplete(forkRecordBuilder::close);
+ }
+ }
+
+ /**
* Get final state for this object. By default this returns an empty {@link org.apache.gobblin.configuration.State}, but
* concrete subclasses can add information that will be added to the task state.
* @return Empty {@link org.apache.gobblin.configuration.State}.
@@ -149,26 +181,7 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState
return Flowable.just(((ControlMessage<DO>) out));
} else if (in instanceof RecordEnvelope) {
RecordEnvelope<DI> recordEnvelope = (RecordEnvelope<DI>) in;
- Iterator<DO> convertedIterable = convertRecord(this.outputGlobalMetadata.getSchema(),
- recordEnvelope.getRecord(), workUnitState).iterator();
-
- if (!convertedIterable.hasNext()) {
- // if the iterable is empty, ack the record, return an empty flowable
- in.ack();
- return Flowable.empty();
- }
-
- DO firstRecord = convertedIterable.next();
- if (!convertedIterable.hasNext()) {
- // if the iterable has only one element, use RecordEnvelope.withRecord, which is more efficient
- return Flowable.just(recordEnvelope.withRecord(firstRecord));
- } else {
- // if the iterable has multiple records, use a ForkRecordBuilder
- RecordEnvelope<DI>.ForkRecordBuilder<DO> forkRecordBuilder = recordEnvelope.forkRecordBuilder();
- return Flowable.just(firstRecord).concatWith(Flowable.fromIterable(() -> convertedIterable))
- .map(forkRecordBuilder::childRecord).doOnComplete(forkRecordBuilder::close);
- }
-
+ return convertRecordEnvelope(this.outputGlobalMetadata.getSchema(), recordEnvelope, workUnitState);
} else {
throw new UnsupportedOperationException();
}
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/stream/RecordEnvelope.java b/gobblin-api/src/main/java/org/apache/gobblin/stream/RecordEnvelope.java
index edfa1f0..f14372e 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/stream/RecordEnvelope.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/stream/RecordEnvelope.java
@@ -17,6 +17,9 @@
package org.apache.gobblin.stream;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.fork.CopyHelper;
import org.apache.gobblin.fork.CopyNotSupportedException;
@@ -52,6 +55,12 @@ public class RecordEnvelope<D> extends StreamEntity<D> {
private final D _record;
@Nullable
private final CheckpointableWatermark _watermark;
+ /**
+ * The container is lazily created when the first entry is set. Copies of the {@link RecordEnvelope} will copy the
+ * top-level entries into a new container, but the values will not be cloned. So adding new entries has no effect on
+ * copies, but values should not be modified in-place if the intention is to not affect the copies.
+ */
+ private Map<String, Object> _recordMetadata;
public RecordEnvelope(D record) {
this(record, (CheckpointableWatermark) null);
@@ -61,12 +70,22 @@ public class RecordEnvelope<D> extends StreamEntity<D> {
super(parentRecord, copyCallbacks);
_record = record;
_watermark = parentRecord._watermark;
+
+ if (parentRecord._recordMetadata != null) {
+ _recordMetadata = new HashMap<>();
+ _recordMetadata.putAll(parentRecord._recordMetadata);
+ }
}
private RecordEnvelope(D record, RecordEnvelope<?>.ForkRecordBuilder<D> forkRecordBuilder, boolean copyCallbacks) {
super(forkRecordBuilder, copyCallbacks);
_record = record;
_watermark = forkRecordBuilder.getRecordEnvelope()._watermark;
+
+ if (forkRecordBuilder.getRecordEnvelope()._recordMetadata != null) {
+ _recordMetadata = new HashMap<>();
+ _recordMetadata.putAll(forkRecordBuilder.getRecordEnvelope()._recordMetadata);
+ }
}
public RecordEnvelope(D record, CheckpointableWatermark watermark) {
@@ -77,6 +96,7 @@ public class RecordEnvelope<D> extends StreamEntity<D> {
_record = record;
_watermark = watermark;
+ _recordMetadata = null;
}
/**
@@ -100,6 +120,32 @@ public class RecordEnvelope<D> extends StreamEntity<D> {
return _watermark;
}
+ /**
+ * @return The record metadata with the given key or null if not present
+ */
+ public Object getRecordMetadata(String key) {
+ if (_recordMetadata != null) {
+ return _recordMetadata.get(key);
+ }
+
+ return null;
+ }
+
+ /**
+ * Set the record metadata
+ * @param key key for the metadata
+ * @param value value of the metadata
+ *
+ * @implNote should not be called concurrently
+ */
+ public void setRecordMetadata(String key, Object value) {
+ if (_recordMetadata == null) {
+ _recordMetadata = new HashMap<>();
+ }
+
+ _recordMetadata.put(key, value);
+ }
+
@Override
protected StreamEntity<D> buildClone() {
try {
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java b/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java
index 00ac4f9..cf30a5d 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java
@@ -123,6 +123,33 @@ public class ConverterTest {
Assert.assertTrue(outputRecords.get(1) instanceof MyControlMessage);
}
+ @Test
+ public void testAddRecordMetadata() throws Exception {
+ MyConverter2 converter = new MyConverter2();
+ BasicAckableForTesting ackable = new BasicAckableForTesting();
+
+ RecordStreamWithMetadata<Integer, String> stream =
+ new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(1), new RecordEnvelope<>(2)),
+ GlobalMetadata.<String>builder().schema("schema").build()).mapRecords(r -> {
+ r.addCallBack(ackable);
+ return r;
+ });
+
+ List<StreamEntity<Integer>> outputRecords = Lists.newArrayList();
+ converter.processStream(stream, new WorkUnitState()).getRecordStream().subscribe(outputRecords::add);
+
+ Assert.assertEquals(outputRecords.size(), 2);
+
+ RecordEnvelope<Integer> envelope = (RecordEnvelope<Integer>)outputRecords.get(0);
+ Assert.assertEquals(envelope.getRecord().intValue(), 2);
+ Assert.assertEquals(((Integer)envelope.getRecordMetadata("original_value")).intValue(), 1);
+
+ envelope = (RecordEnvelope<Integer>)outputRecords.get(1);
+ Assert.assertEquals(envelope.getRecord().intValue(), 3);
+ Assert.assertEquals(((Integer)envelope.getRecordMetadata("original_value")).intValue(), 2);
+
+ }
+
public static class MyConverter extends Converter<String, String, Integer, Integer> {
@Override
public String convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
@@ -140,6 +167,32 @@ public class ConverterTest {
}
}
+ // for testing the overriding of convertRecordEnvelope to add record metadata
+ public static class MyConverter2 extends Converter<String, String, Integer, Integer> {
+ @Override
+ public String convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
+ return inputSchema;
+ }
+
+ @Override
+ public Iterable<Integer> convertRecord(String outputSchema, Integer inputRecord, WorkUnitState workUnit)
+ throws DataConversionException {
+ throw new UnsupportedOperationException("not supported");
+ }
+
+ @Override
+ public Flowable<RecordEnvelope<Integer>> convertRecordEnvelope(String outputSchema,
+ RecordEnvelope<Integer> inputRecordEnvelope, WorkUnitState workUnitState)
+ throws DataConversionException {
+
+ RecordEnvelope<Integer> outputRecord =
+ inputRecordEnvelope.withRecord(Integer.valueOf(inputRecordEnvelope.getRecord() + 1));
+ outputRecord.setRecordMetadata("original_value", inputRecordEnvelope.getRecord());
+
+ return Flowable.just(outputRecord);
+ }
+ }
+
public static class MyControlMessage<D> extends ControlMessage<D> {
@Override
protected StreamEntity<D> buildClone() {
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/stream/RecordEnvelopeTest.java b/gobblin-api/src/test/java/org/apache/gobblin/stream/RecordEnvelopeTest.java
index 2e276fb..f838b9f 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/stream/RecordEnvelopeTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/stream/RecordEnvelopeTest.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.stream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Random;
import org.testng.Assert;
@@ -122,6 +124,129 @@ public class RecordEnvelopeTest {
Assert.assertEquals(ackable.acked, 1);
}
+ @Test
+ public void testRecordMetadata() {
+ RecordEnvelope<String> record = new RecordEnvelope<>("test", new MyWatermark(110));
+
+ record.setRecordMetadata("meta1", "value1");
+ Assert.assertEquals(record.getRecordMetadata("meta1"), "value1");
+ }
+
+ @Test
+ public void testRecordMetadataWithDerivedRecords() {
+ RecordEnvelope<String> record = new RecordEnvelope<>("test", new MyWatermark(110));
+
+ record.setRecordMetadata("meta1", "value1");
+ List list = new ArrayList();
+ list.add("item1");
+ record.setRecordMetadata("list", list);
+
+ RecordEnvelope<String>.ForkRecordBuilder<String> forkRecordBuilder = record.forkRecordBuilder();
+
+ RecordEnvelope<String> derived1 = forkRecordBuilder.childRecord("testDerived1");
+ RecordEnvelope<String> derived2 = forkRecordBuilder.childRecord("testDerived2");
+ RecordEnvelope<String> derived3 = derived2.withRecord("testDerived3");
+
+
+ forkRecordBuilder.close();
+
+ record.setRecordMetadata("meta2", "value2");
+ derived1.setRecordMetadata("meta3", "value3");
+ derived2.setRecordMetadata("meta4", "value4");
+ derived3.setRecordMetadata("meta5", "value5");
+
+ // clones should inherit the metadata at the time of the copy
+ Assert.assertEquals(record.getRecordMetadata("meta1"), "value1");
+ Assert.assertEquals(derived1.getRecordMetadata("meta1"), "value1");
+ Assert.assertEquals(derived2.getRecordMetadata("meta1"), "value1");
+ Assert.assertEquals(derived3.getRecordMetadata("meta1"), "value1");
+
+ // new entries should not affect any copies
+ Assert.assertEquals(record.getRecordMetadata("meta2"), "value2");
+ Assert.assertNull(derived1.getRecordMetadata("meta2"));
+ Assert.assertNull(derived2.getRecordMetadata("meta2"));
+ Assert.assertNull(derived3.getRecordMetadata("meta2"));
+
+ Assert.assertEquals(derived1.getRecordMetadata("meta3"), "value3");
+ Assert.assertNull(record.getRecordMetadata("meta3"));
+ Assert.assertNull(derived2.getRecordMetadata("meta3"));
+ Assert.assertNull(derived3.getRecordMetadata("meta3"));
+
+ Assert.assertEquals(derived2.getRecordMetadata("meta4"), "value4");
+ Assert.assertNull(derived1.getRecordMetadata("meta4"));
+ Assert.assertNull(derived3.getRecordMetadata("meta4"));
+ Assert.assertNull(record.getRecordMetadata("meta4"));
+
+ Assert.assertEquals(derived3.getRecordMetadata("meta5"), "value5");
+ Assert.assertNull(derived1.getRecordMetadata("meta5"));
+ Assert.assertNull(derived2.getRecordMetadata("meta5"));
+ Assert.assertNull(record.getRecordMetadata("meta5"));
+
+ // no deep copy for values
+ ((List)record.getRecordMetadata("list")).add("item2");
+ Assert.assertEquals(record.getRecordMetadata("list"), list);
+ Assert.assertEquals(derived1.getRecordMetadata("list"), list);
+ Assert.assertEquals(derived2.getRecordMetadata("list"), list);
+ Assert.assertEquals(derived3.getRecordMetadata("list"), list);
+ }
+
+ @Test
+ public void testRecordMetadataWithClones() {
+ RecordEnvelope<String> record = new RecordEnvelope<>("test", new MyWatermark(110));
+
+ record.setRecordMetadata("meta1", "value1");
+ List list = new ArrayList();
+ list.add("item1");
+ record.setRecordMetadata("list", list);
+
+ StreamEntity.ForkCloner cloner = record.forkCloner();
+
+ RecordEnvelope<String> copy1 = (RecordEnvelope<String>) cloner.getClone();
+ RecordEnvelope<String> copy2 = (RecordEnvelope<String>) cloner.getClone();
+ cloner.close();
+ RecordEnvelope<String> copy3 = (RecordEnvelope<String>)record.buildClone();
+
+
+ record.setRecordMetadata("meta2", "value2");
+ copy1.setRecordMetadata("meta3", "value3");
+ copy2.setRecordMetadata("meta4", "value4");
+ copy3.setRecordMetadata("meta5", "value5");
+
+ // clones should inherit the metadata at the time of the copy
+ Assert.assertEquals(record.getRecordMetadata("meta1"), "value1");
+ Assert.assertEquals(copy1.getRecordMetadata("meta1"), "value1");
+ Assert.assertEquals(copy2.getRecordMetadata("meta1"), "value1");
+ Assert.assertEquals(copy3.getRecordMetadata("meta1"), "value1");
+
+ // new entries should not affect any copies
+ Assert.assertEquals(record.getRecordMetadata("meta2"), "value2");
+ Assert.assertNull(copy1.getRecordMetadata("meta2"));
+ Assert.assertNull(copy2.getRecordMetadata("meta2"));
+ Assert.assertNull(copy3.getRecordMetadata("meta2"));
+
+ Assert.assertEquals(copy1.getRecordMetadata("meta3"), "value3");
+ Assert.assertNull(record.getRecordMetadata("meta3"));
+ Assert.assertNull(copy2.getRecordMetadata("meta3"));
+ Assert.assertNull(copy3.getRecordMetadata("meta3"));
+
+ Assert.assertEquals(copy2.getRecordMetadata("meta4"), "value4");
+ Assert.assertNull(copy1.getRecordMetadata("meta4"));
+ Assert.assertNull(copy3.getRecordMetadata("meta4"));
+ Assert.assertNull(record.getRecordMetadata("meta4"));
+
+ Assert.assertEquals(copy3.getRecordMetadata("meta5"), "value5");
+ Assert.assertNull(copy1.getRecordMetadata("meta5"));
+ Assert.assertNull(copy2.getRecordMetadata("meta5"));
+ Assert.assertNull(record.getRecordMetadata("meta5"));
+
+ // no deep copy for values
+ ((List)record.getRecordMetadata("list")).add("item2");
+ Assert.assertEquals(record.getRecordMetadata("list"), list);
+ Assert.assertEquals(copy1.getRecordMetadata("list"), list);
+ Assert.assertEquals(copy2.getRecordMetadata("list"), list);
+ Assert.assertEquals(copy3.getRecordMetadata("list"), list);
+ }
+
@AllArgsConstructor
public static class MyWatermark implements CheckpointableWatermark {
@Getter