You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/08/28 18:46:50 UTC
incubator-gobblin git commit: [GOBBLIN-225] Fix cloning of
ControlMessages in PartitionDataWriterMessageHandler
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 4e9453fba -> d769b2144
[GOBBLIN-225] Fix cloning of ControlMessages in PartitionDataWriterMessageHandler
Closes #2079 from htran1/partition_handler
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d769b214
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d769b214
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d769b214
Branch: refs/heads/master
Commit: d769b2144c7239a4c084c7ba65daadf37903b2ae
Parents: 4e9453f
Author: Hung Tran <hu...@linkedin.com>
Authored: Mon Aug 28 11:46:45 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Aug 28 11:46:45 2017 -0700
----------------------------------------------------------------------
.../gobblin/writer/PartitionedDataWriter.java | 8 ++++-
.../gobblin/writer/PartitionedWriterTest.java | 37 ++++++++++++++++++++
2 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d769b214/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index d915730..a667b86 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -45,6 +45,7 @@ import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.source.extractor.CheckpointableWatermark;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.stream.StreamEntity;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.writer.partitioner.WriterPartitioner;
@@ -330,9 +331,14 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
private class PartitionDataWriterMessageHandler implements ControlMessageHandler {
@Override
public void handleMessage(ControlMessage message) {
+ StreamEntity.ForkCloner cloner = message.forkCloner();
+
for (DataWriter writer : PartitionedDataWriter.this.partitionWriters.asMap().values()) {
- writer.getMessageHandler().handleMessage((ControlMessage) message.getSingleClone());
+ ControlMessage cloned = (ControlMessage) cloner.getClone();
+ writer.getMessageHandler().handleMessage(cloned);
}
+
+ cloner.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d769b214/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
index 3d5923d..c00c823 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
@@ -23,6 +23,8 @@ import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.ack.BasicAckableForTesting;
+import org.apache.gobblin.stream.FlushControlMessage;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -164,4 +166,39 @@ public class PartitionedWriterTest {
}
}
+ @Test
+ public void testControlMessageHandler() throws IOException {
+
+ State state = new State();
+ state.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS, TestPartitioner.class.getCanonicalName());
+
+ TestPartitionAwareWriterBuilder builder = new TestPartitionAwareWriterBuilder();
+
+ PartitionedDataWriter writer = new PartitionedDataWriter<String, String>(builder, state);
+
+ Assert.assertEquals(builder.actions.size(), 0);
+
+ String record1 = "abc";
+ writer.writeEnvelope(new RecordEnvelope(record1));
+
+ String record2 = "123";
+ writer.writeEnvelope(new RecordEnvelope(record2));
+
+ FlushControlMessage controlMessage = new FlushControlMessage<>(new FlushControlMessage.FlushReason("test"));
+ BasicAckableForTesting ackable = new BasicAckableForTesting();
+
+ controlMessage.addCallBack(ackable);
+ Assert.assertEquals(ackable.acked, 0);
+
+ // when the control message is cloned properly then this does not raise an error
+ writer.getMessageHandler().handleMessage(controlMessage);
+
+ // message handler does not ack since consumeRecordStream does acking for control messages
+ // this should be revisited when control message error handling is changed
+ controlMessage.ack();
+
+ Assert.assertEquals(ackable.acked, 1);
+
+ writer.close();
+ }
}