You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/08/15 01:50:47 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-859] let writer
pass latest schema to WorkUnitState
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c7e4a6e [GOBBLIN-859] let writer pass latest schema to WorkUnitState
c7e4a6e is described below
commit c7e4a6efbb08c433c2be69958eebae3b328e347a
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Wed Aug 14 18:50:40 2019 -0700
[GOBBLIN-859] let writer pass latest schema to WorkUnitState
Closes #2714 from ZihanLi58/GOBBLIN-859
---
.../apache/gobblin/writer/PartitionedDataWriter.java | 19 ++++++++++++-------
1 file changed, 12 insertions(+), 7 deletions(-)
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 3dad0ef..3181bd6 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -48,7 +48,6 @@ import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
import org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
import org.apache.gobblin.records.ControlMessageHandler;
-import org.apache.gobblin.source.extractor.CheckpointableWatermark;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.MetadataUpdateControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
@@ -67,6 +66,7 @@ import org.apache.gobblin.writer.partitioner.WriterPartitioner;
@Slf4j
public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements FinalState, SpeculativeAttemptAwareConstruct, WatermarkAwareWriter<D> {
+ public static final String WRITER_LATEST_SCHEMA = "writer.latest.schema";
private static final GenericRecord NON_PARTITIONED_WRITER_KEY =
new GenericData.Record(SchemaBuilder.record("Dummy").fields().endRecord());
@@ -96,6 +96,9 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
this.closer = Closer.create();
this.writerBuilder = builder;
this.controlMessageHandler = new PartitionDataWriterMessageHandler();
+ if(builder.schema != null) {
+ this.state.setProp(WRITER_LATEST_SCHEMA, builder.getSchema());
+ }
this.partitionWriters = CacheBuilder.newBuilder().build(new CacheLoader<GenericRecord, DataWriter<D>>() {
@Override
public DataWriter<D> load(final GenericRecord key)
@@ -118,19 +121,19 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
if (state.contains(ConfigurationKeys.WRITER_PARTITIONER_CLASS)) {
Preconditions.checkArgument(builder instanceof PartitionAwareDataWriterBuilder, String
- .format("%s was specified but the writer %s does not support partitioning.",
- ConfigurationKeys.WRITER_PARTITIONER_CLASS, builder.getClass().getCanonicalName()));
+ .format("%s was specified but the writer %s does not support partitioning.",
+ ConfigurationKeys.WRITER_PARTITIONER_CLASS, builder.getClass().getCanonicalName()));
try {
this.shouldPartition = true;
this.builder = Optional.of(PartitionAwareDataWriterBuilder.class.cast(builder));
this.partitioner = Optional.of(WriterPartitioner.class.cast(ConstructorUtils
- .invokeConstructor(Class.forName(state.getProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS)), state,
- builder.getBranches(), builder.getBranch())));
+ .invokeConstructor(Class.forName(state.getProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS)), state,
+ builder.getBranches(), builder.getBranch())));
Preconditions
.checkArgument(this.builder.get().validatePartitionSchema(this.partitioner.get().partitionSchema()), String
- .format("Writer %s does not support schema from partitioner %s",
- builder.getClass().getCanonicalName(), this.partitioner.getClass().getCanonicalName()));
+ .format("Writer %s does not support schema from partitioner %s",
+ builder.getClass().getCanonicalName(), this.partitioner.getClass().getCanonicalName()));
} catch (ReflectiveOperationException roe) {
throw new IOException(roe);
}
@@ -321,6 +324,8 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
if (message instanceof MetadataUpdateControlMessage) {
PartitionedDataWriter.this.writerBuilder.withSchema(((MetadataUpdateControlMessage) message)
.getGlobalMetadata().getSchema());
+ state.setProp(WRITER_LATEST_SCHEMA, ((MetadataUpdateControlMessage) message)
+ .getGlobalMetadata().getSchema());
}
for (DataWriter writer : PartitionedDataWriter.this.partitionWriters.asMap().values()) {