You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/08/28 18:46:50 UTC

incubator-gobblin git commit: [GOBBLIN-225] Fix cloning of ControlMessages in PartitionDataWriterMessageHandler

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 4e9453fba -> d769b2144


[GOBBLIN-225] Fix cloning of ControlMessages in PartitionDataWriterMessageHandler

Closes #2079 from htran1/partition_handler


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d769b214
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d769b214
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d769b214

Branch: refs/heads/master
Commit: d769b2144c7239a4c084c7ba65daadf37903b2ae
Parents: 4e9453f
Author: Hung Tran <hu...@linkedin.com>
Authored: Mon Aug 28 11:46:45 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Aug 28 11:46:45 2017 -0700

----------------------------------------------------------------------
 .../gobblin/writer/PartitionedDataWriter.java   |  8 ++++-
 .../gobblin/writer/PartitionedWriterTest.java   | 37 ++++++++++++++++++++
 2 files changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d769b214/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index d915730..a667b86 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -45,6 +45,7 @@ import org.apache.gobblin.records.ControlMessageHandler;
 import org.apache.gobblin.source.extractor.CheckpointableWatermark;
 import org.apache.gobblin.stream.ControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.stream.StreamEntity;
 import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.FinalState;
 import org.apache.gobblin.writer.partitioner.WriterPartitioner;
@@ -330,9 +331,14 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
   private class PartitionDataWriterMessageHandler implements ControlMessageHandler {
     @Override
     public void handleMessage(ControlMessage message) {
+      StreamEntity.ForkCloner cloner = message.forkCloner();
+
       for (DataWriter writer : PartitionedDataWriter.this.partitionWriters.asMap().values()) {
-        writer.getMessageHandler().handleMessage((ControlMessage) message.getSingleClone());
+        ControlMessage cloned = (ControlMessage) cloner.getClone();
+        writer.getMessageHandler().handleMessage(cloned);
       }
+
+      cloner.close();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d769b214/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
index 3d5923d..c00c823 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
@@ -23,6 +23,8 @@ import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.ack.BasicAckableForTesting;
+import org.apache.gobblin.stream.FlushControlMessage;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -164,4 +166,39 @@ public class PartitionedWriterTest {
     }
   }
 
+  @Test
+  public void testControlMessageHandler() throws IOException {
+
+    State state = new State();
+    state.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS, TestPartitioner.class.getCanonicalName());
+
+    TestPartitionAwareWriterBuilder builder = new TestPartitionAwareWriterBuilder();
+
+    PartitionedDataWriter writer = new PartitionedDataWriter<String, String>(builder, state);
+
+    Assert.assertEquals(builder.actions.size(), 0);
+
+    String record1 = "abc";
+    writer.writeEnvelope(new RecordEnvelope(record1));
+
+    String record2 = "123";
+    writer.writeEnvelope(new RecordEnvelope(record2));
+
+    FlushControlMessage controlMessage = new FlushControlMessage<>(new FlushControlMessage.FlushReason("test"));
+    BasicAckableForTesting ackable = new BasicAckableForTesting();
+
+    controlMessage.addCallBack(ackable);
+    Assert.assertEquals(ackable.acked, 0);
+
+    // when the control message is cloned properly then this does not raise an error
+    writer.getMessageHandler().handleMessage(controlMessage);
+
+    // message handler does not ack since consumeRecordStream does acking for control messages
+    // this should be revisited when control message error handling is changed
+    controlMessage.ack();
+
+    Assert.assertEquals(ackable.acked, 1);
+
+    writer.close();
+  }
 }