You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/15 01:50:47 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-859] let writer pass latest schema to WorkUnitState

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c7e4a6e  [GOBBLIN-859] let writer pass latest schema to WorkUnitState
c7e4a6e is described below

commit c7e4a6efbb08c433c2be69958eebae3b328e347a
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Wed Aug 14 18:50:40 2019 -0700

    [GOBBLIN-859] let writer pass latest schema to WorkUnitState
    
    Closes #2714 from ZihanLi58/GOBBLIN-859
---
 .../apache/gobblin/writer/PartitionedDataWriter.java  | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 3dad0ef..3181bd6 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -48,7 +48,6 @@ import org.apache.gobblin.dataset.PartitionDescriptor;
 import org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
 import org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
 import org.apache.gobblin.records.ControlMessageHandler;
-import org.apache.gobblin.source.extractor.CheckpointableWatermark;
 import org.apache.gobblin.stream.ControlMessage;
 import org.apache.gobblin.stream.MetadataUpdateControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
@@ -67,6 +66,7 @@ import org.apache.gobblin.writer.partitioner.WriterPartitioner;
 @Slf4j
 public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements FinalState, SpeculativeAttemptAwareConstruct, WatermarkAwareWriter<D> {
 
+  public static final String WRITER_LATEST_SCHEMA = "writer.latest.schema";
   private static final GenericRecord NON_PARTITIONED_WRITER_KEY =
       new GenericData.Record(SchemaBuilder.record("Dummy").fields().endRecord());
 
@@ -96,6 +96,9 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
     this.closer = Closer.create();
     this.writerBuilder = builder;
     this.controlMessageHandler = new PartitionDataWriterMessageHandler();
+    if(builder.schema != null) {
+      this.state.setProp(WRITER_LATEST_SCHEMA, builder.getSchema());
+    }
     this.partitionWriters = CacheBuilder.newBuilder().build(new CacheLoader<GenericRecord, DataWriter<D>>() {
       @Override
       public DataWriter<D> load(final GenericRecord key)
@@ -118,19 +121,19 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
 
     if (state.contains(ConfigurationKeys.WRITER_PARTITIONER_CLASS)) {
       Preconditions.checkArgument(builder instanceof PartitionAwareDataWriterBuilder, String
-              .format("%s was specified but the writer %s does not support partitioning.",
-                  ConfigurationKeys.WRITER_PARTITIONER_CLASS, builder.getClass().getCanonicalName()));
+          .format("%s was specified but the writer %s does not support partitioning.",
+              ConfigurationKeys.WRITER_PARTITIONER_CLASS, builder.getClass().getCanonicalName()));
 
       try {
         this.shouldPartition = true;
         this.builder = Optional.of(PartitionAwareDataWriterBuilder.class.cast(builder));
         this.partitioner = Optional.of(WriterPartitioner.class.cast(ConstructorUtils
-                .invokeConstructor(Class.forName(state.getProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS)), state,
-                    builder.getBranches(), builder.getBranch())));
+            .invokeConstructor(Class.forName(state.getProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS)), state,
+                builder.getBranches(), builder.getBranch())));
         Preconditions
             .checkArgument(this.builder.get().validatePartitionSchema(this.partitioner.get().partitionSchema()), String
-                    .format("Writer %s does not support schema from partitioner %s",
-                        builder.getClass().getCanonicalName(), this.partitioner.getClass().getCanonicalName()));
+                .format("Writer %s does not support schema from partitioner %s",
+                    builder.getClass().getCanonicalName(), this.partitioner.getClass().getCanonicalName()));
       } catch (ReflectiveOperationException roe) {
         throw new IOException(roe);
       }
@@ -321,6 +324,8 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
       if (message instanceof MetadataUpdateControlMessage) {
         PartitionedDataWriter.this.writerBuilder.withSchema(((MetadataUpdateControlMessage) message)
             .getGlobalMetadata().getSchema());
+        state.setProp(WRITER_LATEST_SCHEMA, ((MetadataUpdateControlMessage) message)
+            .getGlobalMetadata().getSchema());
       }
 
       for (DataWriter writer : PartitionedDataWriter.this.partitionWriters.asMap().values()) {