You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/02/06 06:21:10 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-676] Add record metadata support to the RecordEnvelope

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b226d4  [GOBBLIN-676] Add record metadata support to the RecordEnvelope
5b226d4 is described below

commit 5b226d4d21cad0ea99fae9d025bbc75805bbd98f
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Tue Feb 5 22:21:07 2019 -0800

    [GOBBLIN-676] Add record metadata support to the RecordEnvelope
    
    Closes #2546 from htran1/record_envelope_metadata
---
 .../org/apache/gobblin/converter/Converter.java    |  53 +++++----
 .../org/apache/gobblin/stream/RecordEnvelope.java  |  46 ++++++++
 .../apache/gobblin/converter/ConverterTest.java    |  53 +++++++++
 .../apache/gobblin/stream/RecordEnvelopeTest.java  | 125 +++++++++++++++++++++
 4 files changed, 257 insertions(+), 20 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
index 514f5be..b022304 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
@@ -111,6 +111,38 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState
       throws DataConversionException;
 
   /**
+   * Converts a {@link RecordEnvelope}. This method can be overridden by implementations that need to manipulate the
+   * {@link RecordEnvelope}, such as to set watermarks or metadata.
+   * @param outputSchema output schema converted using the {@link Converter#convertSchema} method
+   * @param inputRecordEnvelope input record envelope with data record to be converted
+   * @param workUnitState a {@link WorkUnitState} object carrying configuration properties
+   * @return a {@link Flowable} emitting the converted {@link RecordEnvelope}s
+   * @throws DataConversionException
+   */
+  protected Flowable<RecordEnvelope<DO>> convertRecordEnvelope(SO outputSchema, RecordEnvelope<DI> inputRecordEnvelope,
+      WorkUnitState workUnitState) throws DataConversionException {
+    Iterator<DO> convertedIterable = convertRecord(outputSchema,
+        inputRecordEnvelope.getRecord(), workUnitState).iterator();
+
+    if (!convertedIterable.hasNext()) {
+      inputRecordEnvelope.ack();
+      // if the iterable is empty, ack the record, return an empty flowable
+      return Flowable.empty();
+    }
+
+    DO firstRecord = convertedIterable.next();
+    if (!convertedIterable.hasNext()) {
+      // if the iterable has only one element, use RecordEnvelope.withRecord, which is more efficient
+      return Flowable.just(inputRecordEnvelope.withRecord(firstRecord));
+    } else {
+      // if the iterable has multiple records, use a ForkRecordBuilder
+      RecordEnvelope<DI>.ForkRecordBuilder<DO> forkRecordBuilder = inputRecordEnvelope.forkRecordBuilder();
+      return Flowable.just(firstRecord).concatWith(Flowable.fromIterable(() -> convertedIterable))
+          .map(forkRecordBuilder::childRecord).doOnComplete(forkRecordBuilder::close);
+    }
+  }
+
+  /**
    * Get final state for this object. By default this returns an empty {@link org.apache.gobblin.configuration.State}, but
    * concrete subclasses can add information that will be added to the task state.
    * @return Empty {@link org.apache.gobblin.configuration.State}.
@@ -149,26 +181,7 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState
                 return Flowable.just(((ControlMessage<DO>) out));
               } else if (in instanceof RecordEnvelope) {
                 RecordEnvelope<DI> recordEnvelope = (RecordEnvelope<DI>) in;
-                Iterator<DO> convertedIterable = convertRecord(this.outputGlobalMetadata.getSchema(),
-                    recordEnvelope.getRecord(), workUnitState).iterator();
-
-                if (!convertedIterable.hasNext()) {
-                  // if the iterable is empty, ack the record, return an empty flowable
-                  in.ack();
-                  return Flowable.empty();
-                }
-
-                DO firstRecord = convertedIterable.next();
-                if (!convertedIterable.hasNext()) {
-                  // if the iterable has only one element, use RecordEnvelope.withRecord, which is more efficient
-                  return Flowable.just(recordEnvelope.withRecord(firstRecord));
-                } else {
-                  // if the iterable has multiple records, use a ForkRecordBuilder
-                  RecordEnvelope<DI>.ForkRecordBuilder<DO> forkRecordBuilder = recordEnvelope.forkRecordBuilder();
-                  return Flowable.just(firstRecord).concatWith(Flowable.fromIterable(() -> convertedIterable))
-                      .map(forkRecordBuilder::childRecord).doOnComplete(forkRecordBuilder::close);
-                }
-
+                return convertRecordEnvelope(this.outputGlobalMetadata.getSchema(), recordEnvelope, workUnitState);
               } else {
                 throw new UnsupportedOperationException();
               }
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/stream/RecordEnvelope.java b/gobblin-api/src/main/java/org/apache/gobblin/stream/RecordEnvelope.java
index edfa1f0..f14372e 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/stream/RecordEnvelope.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/stream/RecordEnvelope.java
@@ -17,6 +17,9 @@
 
 package org.apache.gobblin.stream;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.fork.CopyHelper;
 import org.apache.gobblin.fork.CopyNotSupportedException;
@@ -52,6 +55,12 @@ public class RecordEnvelope<D> extends StreamEntity<D> {
   private final D _record;
   @Nullable
   private final CheckpointableWatermark _watermark;
+  /**
+   * The container is lazily created when the first entry is set. Copies of the {@link RecordEnvelope} will copy the
+   * top-level entries into a new container, but the values will not be cloned. So adding new entries has no effect on
+   * copies, but values should not be modified in-place if the intention is to not affect the copies.
+   */
+  private Map<String, Object> _recordMetadata;
 
   public RecordEnvelope(D record) {
     this(record, (CheckpointableWatermark) null);
@@ -61,12 +70,22 @@ public class RecordEnvelope<D> extends StreamEntity<D> {
     super(parentRecord, copyCallbacks);
     _record = record;
     _watermark = parentRecord._watermark;
+
+    if (parentRecord._recordMetadata != null) {
+      _recordMetadata = new HashMap<>();
+      _recordMetadata.putAll(parentRecord._recordMetadata);
+    }
   }
 
   private RecordEnvelope(D record, RecordEnvelope<?>.ForkRecordBuilder<D> forkRecordBuilder, boolean copyCallbacks) {
     super(forkRecordBuilder, copyCallbacks);
     _record = record;
     _watermark = forkRecordBuilder.getRecordEnvelope()._watermark;
+
+    if (forkRecordBuilder.getRecordEnvelope()._recordMetadata != null) {
+      _recordMetadata = new HashMap<>();
+      _recordMetadata.putAll(forkRecordBuilder.getRecordEnvelope()._recordMetadata);
+    }
   }
 
   public RecordEnvelope(D record, CheckpointableWatermark watermark) {
@@ -77,6 +96,7 @@ public class RecordEnvelope<D> extends StreamEntity<D> {
 
     _record = record;
     _watermark = watermark;
+    _recordMetadata = null;
   }
 
   /**
@@ -100,6 +120,32 @@ public class RecordEnvelope<D> extends StreamEntity<D> {
     return _watermark;
   }
 
+  /**
+   * @return The record metadata with the given key or null if not present
+   */
+  public Object getRecordMetadata(String key) {
+    if (_recordMetadata != null) {
+      return _recordMetadata.get(key);
+    }
+
+    return null;
+  }
+
+  /**
+   * Set the record metadata
+   * @param key key for the metadata
+   * @param value value of the metadata
+   *
+   * @implNote should not be called concurrently
+   */
+  public void setRecordMetadata(String key, Object value) {
+    if (_recordMetadata == null) {
+      _recordMetadata = new HashMap<>();
+    }
+
+    _recordMetadata.put(key, value);
+  }
+
   @Override
   protected StreamEntity<D> buildClone() {
     try {
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java b/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java
index 00ac4f9..cf30a5d 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/converter/ConverterTest.java
@@ -123,6 +123,33 @@ public class ConverterTest {
     Assert.assertTrue(outputRecords.get(1) instanceof MyControlMessage);
   }
 
+  @Test
+  public void testAddRecordMetadata() throws Exception {
+    MyConverter2 converter = new MyConverter2();
+    BasicAckableForTesting ackable = new BasicAckableForTesting();
+
+    RecordStreamWithMetadata<Integer, String> stream =
+        new RecordStreamWithMetadata<>(Flowable.just(new RecordEnvelope<>(1), new RecordEnvelope<>(2)),
+            GlobalMetadata.<String>builder().schema("schema").build()).mapRecords(r -> {
+          r.addCallBack(ackable);
+          return r;
+        });
+
+    List<StreamEntity<Integer>> outputRecords = Lists.newArrayList();
+    converter.processStream(stream, new WorkUnitState()).getRecordStream().subscribe(outputRecords::add);
+
+    Assert.assertEquals(outputRecords.size(), 2);
+
+    RecordEnvelope<Integer> envelope = (RecordEnvelope<Integer>)outputRecords.get(0);
+    Assert.assertEquals(envelope.getRecord().intValue(), 2);
+    Assert.assertEquals(((Integer)envelope.getRecordMetadata("original_value")).intValue(), 1);
+
+    envelope = (RecordEnvelope<Integer>)outputRecords.get(1);
+    Assert.assertEquals(envelope.getRecord().intValue(), 3);
+    Assert.assertEquals(((Integer)envelope.getRecordMetadata("original_value")).intValue(), 2);
+
+  }
+
   public static class MyConverter extends Converter<String, String, Integer, Integer> {
     @Override
     public String convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
@@ -140,6 +167,32 @@ public class ConverterTest {
     }
   }
 
+  // for testing the overriding of convertRecordEnvelope to add record metadata
+  public static class MyConverter2 extends Converter<String, String, Integer, Integer> {
+    @Override
+    public String convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
+      return inputSchema;
+    }
+
+    @Override
+    public Iterable<Integer> convertRecord(String outputSchema, Integer inputRecord, WorkUnitState workUnit)
+        throws DataConversionException {
+      throw new UnsupportedOperationException("not supported");
+    }
+
+    @Override
+    public Flowable<RecordEnvelope<Integer>> convertRecordEnvelope(String outputSchema,
+        RecordEnvelope<Integer> inputRecordEnvelope, WorkUnitState workUnitState)
+        throws DataConversionException {
+
+      RecordEnvelope<Integer> outputRecord =
+          inputRecordEnvelope.withRecord(Integer.valueOf(inputRecordEnvelope.getRecord() + 1));
+      outputRecord.setRecordMetadata("original_value", inputRecordEnvelope.getRecord());
+
+      return Flowable.just(outputRecord);
+    }
+  }
+
   public static class MyControlMessage<D> extends ControlMessage<D> {
     @Override
     protected StreamEntity<D> buildClone() {
diff --git a/gobblin-api/src/test/java/org/apache/gobblin/stream/RecordEnvelopeTest.java b/gobblin-api/src/test/java/org/apache/gobblin/stream/RecordEnvelopeTest.java
index 2e276fb..f838b9f 100644
--- a/gobblin-api/src/test/java/org/apache/gobblin/stream/RecordEnvelopeTest.java
+++ b/gobblin-api/src/test/java/org/apache/gobblin/stream/RecordEnvelopeTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.stream;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 import org.testng.Assert;
@@ -122,6 +124,129 @@ public class RecordEnvelopeTest {
     Assert.assertEquals(ackable.acked, 1);
   }
 
+  @Test
+  public void testRecordMetadata() {
+    RecordEnvelope<String> record = new RecordEnvelope<>("test", new MyWatermark(110));
+
+    record.setRecordMetadata("meta1", "value1");
+    Assert.assertEquals(record.getRecordMetadata("meta1"), "value1");
+  }
+
+  @Test
+  public void testRecordMetadataWithDerivedRecords() {
+    RecordEnvelope<String> record = new RecordEnvelope<>("test", new MyWatermark(110));
+
+    record.setRecordMetadata("meta1", "value1");
+    List list = new ArrayList();
+    list.add("item1");
+    record.setRecordMetadata("list", list);
+
+    RecordEnvelope<String>.ForkRecordBuilder<String> forkRecordBuilder = record.forkRecordBuilder();
+
+    RecordEnvelope<String> derived1 = forkRecordBuilder.childRecord("testDerived1");
+    RecordEnvelope<String> derived2 = forkRecordBuilder.childRecord("testDerived2");
+    RecordEnvelope<String> derived3 = derived2.withRecord("testDerived3");
+
+
+    forkRecordBuilder.close();
+
+    record.setRecordMetadata("meta2", "value2");
+    derived1.setRecordMetadata("meta3", "value3");
+    derived2.setRecordMetadata("meta4", "value4");
+    derived3.setRecordMetadata("meta5", "value5");
+
+    // clones should inherit the metadata at the time of the copy
+    Assert.assertEquals(record.getRecordMetadata("meta1"), "value1");
+    Assert.assertEquals(derived1.getRecordMetadata("meta1"), "value1");
+    Assert.assertEquals(derived2.getRecordMetadata("meta1"), "value1");
+    Assert.assertEquals(derived3.getRecordMetadata("meta1"), "value1");
+
+    // new entries should not affect any copies
+    Assert.assertEquals(record.getRecordMetadata("meta2"), "value2");
+    Assert.assertNull(derived1.getRecordMetadata("meta2"));
+    Assert.assertNull(derived2.getRecordMetadata("meta2"));
+    Assert.assertNull(derived3.getRecordMetadata("meta2"));
+
+    Assert.assertEquals(derived1.getRecordMetadata("meta3"), "value3");
+    Assert.assertNull(record.getRecordMetadata("meta3"));
+    Assert.assertNull(derived2.getRecordMetadata("meta3"));
+    Assert.assertNull(derived3.getRecordMetadata("meta3"));
+
+    Assert.assertEquals(derived2.getRecordMetadata("meta4"), "value4");
+    Assert.assertNull(derived1.getRecordMetadata("meta4"));
+    Assert.assertNull(derived3.getRecordMetadata("meta4"));
+    Assert.assertNull(record.getRecordMetadata("meta4"));
+
+    Assert.assertEquals(derived3.getRecordMetadata("meta5"), "value5");
+    Assert.assertNull(derived1.getRecordMetadata("meta5"));
+    Assert.assertNull(derived2.getRecordMetadata("meta5"));
+    Assert.assertNull(record.getRecordMetadata("meta5"));
+
+    // no deep copy for values
+    ((List)record.getRecordMetadata("list")).add("item2");
+    Assert.assertEquals(record.getRecordMetadata("list"), list);
+    Assert.assertEquals(derived1.getRecordMetadata("list"), list);
+    Assert.assertEquals(derived2.getRecordMetadata("list"), list);
+    Assert.assertEquals(derived3.getRecordMetadata("list"), list);
+  }
+
+  @Test
+  public void testRecordMetadataWithClones() {
+    RecordEnvelope<String> record = new RecordEnvelope<>("test", new MyWatermark(110));
+
+    record.setRecordMetadata("meta1", "value1");
+    List list = new ArrayList();
+    list.add("item1");
+    record.setRecordMetadata("list", list);
+
+    StreamEntity.ForkCloner cloner = record.forkCloner();
+
+    RecordEnvelope<String> copy1 = (RecordEnvelope<String>) cloner.getClone();
+    RecordEnvelope<String> copy2 = (RecordEnvelope<String>) cloner.getClone();
+    cloner.close();
+    RecordEnvelope<String> copy3 = (RecordEnvelope<String>)record.buildClone();
+
+
+    record.setRecordMetadata("meta2", "value2");
+    copy1.setRecordMetadata("meta3", "value3");
+    copy2.setRecordMetadata("meta4", "value4");
+    copy3.setRecordMetadata("meta5", "value5");
+
+    // clones should inherit the metadata at the time of the copy
+    Assert.assertEquals(record.getRecordMetadata("meta1"), "value1");
+    Assert.assertEquals(copy1.getRecordMetadata("meta1"), "value1");
+    Assert.assertEquals(copy2.getRecordMetadata("meta1"), "value1");
+    Assert.assertEquals(copy3.getRecordMetadata("meta1"), "value1");
+
+    // new entries should not affect any copies
+    Assert.assertEquals(record.getRecordMetadata("meta2"), "value2");
+    Assert.assertNull(copy1.getRecordMetadata("meta2"));
+    Assert.assertNull(copy2.getRecordMetadata("meta2"));
+    Assert.assertNull(copy3.getRecordMetadata("meta2"));
+
+    Assert.assertEquals(copy1.getRecordMetadata("meta3"), "value3");
+    Assert.assertNull(record.getRecordMetadata("meta3"));
+    Assert.assertNull(copy2.getRecordMetadata("meta3"));
+    Assert.assertNull(copy3.getRecordMetadata("meta3"));
+
+    Assert.assertEquals(copy2.getRecordMetadata("meta4"), "value4");
+    Assert.assertNull(copy1.getRecordMetadata("meta4"));
+    Assert.assertNull(copy3.getRecordMetadata("meta4"));
+    Assert.assertNull(record.getRecordMetadata("meta4"));
+
+    Assert.assertEquals(copy3.getRecordMetadata("meta5"), "value5");
+    Assert.assertNull(copy1.getRecordMetadata("meta5"));
+    Assert.assertNull(copy2.getRecordMetadata("meta5"));
+    Assert.assertNull(record.getRecordMetadata("meta5"));
+
+    // no deep copy for values
+    ((List)record.getRecordMetadata("list")).add("item2");
+    Assert.assertEquals(record.getRecordMetadata("list"), list);
+    Assert.assertEquals(copy1.getRecordMetadata("list"), list);
+    Assert.assertEquals(copy2.getRecordMetadata("list"), list);
+    Assert.assertEquals(copy3.getRecordMetadata("list"), list);
+  }
+
   @AllArgsConstructor
   public static class MyWatermark implements CheckpointableWatermark {
     @Getter