You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/15 00:02:02 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-736] Skip flush and control message handlers on closed writers in the CloseOnFlushWriterWrapper

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f16462  [GOBBLIN-736] Skip flush and control message handlers on closed writers in the CloseOnFlushWriterWrapper
0f16462 is described below

commit 0f164628b350e0870268c9e088053c4469c2e9fa
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Sun Apr 14 17:01:55 2019 -0700

    [GOBBLIN-736] Skip flush and control message handlers on closed writers in the CloseOnFlushWriterWrapper
    
    Closes #2602 from htran1/close_on_flush_fix
---
 .../gobblin/writer/CloseOnFlushWriterWrapper.java  | 10 ++++
 .../writer/CloseOnFlushWriterWrapperTest.java      | 56 ++++++++++++++++++++++
 2 files changed, 66 insertions(+)

diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
index bc6c9b9..9ff6407 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -169,6 +169,11 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
   }
 
   private void flush(boolean close) throws IOException {
+    // nothing to flush, so don't call flush on the underlying writer since it may not support flush after close
+    if (this.closed) {
+      return;
+    }
+
     this.writer.flush();
 
     // commit data then close the writer
@@ -184,6 +189,11 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
   private class CloseOnFlushWriterMessageHandler implements ControlMessageHandler {
     @Override
     public void handleMessage(ControlMessage message) {
+      // nothing to do if already closed, so don't call then underlying handler since it may not work on closed objects
+      if (CloseOnFlushWriterWrapper.this.closed) {
+        return;
+      }
+
       ControlMessageHandler underlyingHandler = CloseOnFlushWriterWrapper.this.writer.getMessageHandler();
 
       // let underlying writer handle the control messages first
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
index ef435b1..1246f39 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
@@ -130,6 +130,62 @@ public class CloseOnFlushWriterWrapperTest {
     Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
   }
 
+  @Test
+  public void testDirectFlushAfterFlush()
+      throws IOException {
+    WorkUnitState state = new WorkUnitState();
+    state.getJobState().setProp(CloseOnFlushWriterWrapper.WRITER_CLOSE_ON_FLUSH_KEY, "true");
+    List<DummyWriter> dummyWriters = new ArrayList<>();
+    CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters, state);
+
+    byte[] record = new byte[]{'a', 'b', 'c', 'd'};
+
+    writer.writeEnvelope(new RecordEnvelope(record));
+    writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
+
+    Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
+    Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
+    Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
+    Assert.assertTrue(dummyWriters.get(0).committed);
+    Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1);
+
+    writer.flush();
+    writer.close();
+
+    // writer should not be flushed or closed multiple times
+    Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
+    Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
+  }
+
+  @Test
+  public void testBackToBackFlushMessages()
+      throws IOException {
+    WorkUnitState state = new WorkUnitState();
+    state.getJobState().setProp(CloseOnFlushWriterWrapper.WRITER_CLOSE_ON_FLUSH_KEY, "true");
+    List<DummyWriter> dummyWriters = new ArrayList<>();
+    CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters, state);
+
+    byte[] record = new byte[]{'a', 'b', 'c', 'd'};
+
+    writer.writeEnvelope(new RecordEnvelope(record));
+    writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
+
+    Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
+    Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
+    Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
+    Assert.assertTrue(dummyWriters.get(0).committed);
+    Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1);
+
+    writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
+
+    // a flush control message on a closed writer should be a noop
+    Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1);
+
+    writer.close();
+
+    // writer should not be closed multiple times
+    Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
+  }
 
   private CloseOnFlushWriterWrapper getCloseOnFlushWriter(List<DummyWriter> dummyWriters, WorkUnitState state) {
     return new CloseOnFlushWriterWrapper<>(new Supplier<DataWriter<byte[]>>() {