You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/15 00:02:02 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-736] Skip flush
and control message handlers on closed writers in the
CloseOnFlushWriterWrapper
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0f16462 [GOBBLIN-736] Skip flush and control message handlers on closed writers in the CloseOnFlushWriterWrapper
0f16462 is described below
commit 0f164628b350e0870268c9e088053c4469c2e9fa
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Sun Apr 14 17:01:55 2019 -0700
[GOBBLIN-736] Skip flush and control message handlers on closed writers in the CloseOnFlushWriterWrapper
Closes #2602 from htran1/close_on_flush_fix
---
.../gobblin/writer/CloseOnFlushWriterWrapper.java | 10 ++++
.../writer/CloseOnFlushWriterWrapperTest.java | 56 ++++++++++++++++++++++
2 files changed, 66 insertions(+)
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
index bc6c9b9..9ff6407 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -169,6 +169,11 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
}
private void flush(boolean close) throws IOException {
+ // nothing to flush, so don't call flush on the underlying writer since it may not support flush after close
+ if (this.closed) {
+ return;
+ }
+
this.writer.flush();
// commit data then close the writer
@@ -184,6 +189,11 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
private class CloseOnFlushWriterMessageHandler implements ControlMessageHandler {
@Override
public void handleMessage(ControlMessage message) {
+ // nothing to do if already closed, so don't call then underlying handler since it may not work on closed objects
+ if (CloseOnFlushWriterWrapper.this.closed) {
+ return;
+ }
+
ControlMessageHandler underlyingHandler = CloseOnFlushWriterWrapper.this.writer.getMessageHandler();
// let underlying writer handle the control messages first
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
index ef435b1..1246f39 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
@@ -130,6 +130,62 @@ public class CloseOnFlushWriterWrapperTest {
Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
}
+ @Test
+ public void testDirectFlushAfterFlush()
+ throws IOException {
+ WorkUnitState state = new WorkUnitState();
+ state.getJobState().setProp(CloseOnFlushWriterWrapper.WRITER_CLOSE_ON_FLUSH_KEY, "true");
+ List<DummyWriter> dummyWriters = new ArrayList<>();
+ CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters, state);
+
+ byte[] record = new byte[]{'a', 'b', 'c', 'd'};
+
+ writer.writeEnvelope(new RecordEnvelope(record));
+ writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
+
+ Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
+ Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
+ Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
+ Assert.assertTrue(dummyWriters.get(0).committed);
+ Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1);
+
+ writer.flush();
+ writer.close();
+
+ // writer should not be flushed or closed multiple times
+ Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
+ Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
+ }
+
+ @Test
+ public void testBackToBackFlushMessages()
+ throws IOException {
+ WorkUnitState state = new WorkUnitState();
+ state.getJobState().setProp(CloseOnFlushWriterWrapper.WRITER_CLOSE_ON_FLUSH_KEY, "true");
+ List<DummyWriter> dummyWriters = new ArrayList<>();
+ CloseOnFlushWriterWrapper<byte[]> writer = getCloseOnFlushWriter(dummyWriters, state);
+
+ byte[] record = new byte[]{'a', 'b', 'c', 'd'};
+
+ writer.writeEnvelope(new RecordEnvelope(record));
+ writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
+
+ Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
+ Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
+ Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
+ Assert.assertTrue(dummyWriters.get(0).committed);
+ Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1);
+
+ writer.getMessageHandler().handleMessage(FlushControlMessage.builder().build());
+
+ // a flush control message on a closed writer should be a noop
+ Assert.assertEquals(dummyWriters.get(0).handlerCalled, 1);
+
+ writer.close();
+
+ // writer should not be closed multiple times
+ Assert.assertEquals(dummyWriters.get(0).closeCount, 1);
+ }
private CloseOnFlushWriterWrapper getCloseOnFlushWriter(List<DummyWriter> dummyWriters, WorkUnitState state) {
return new CloseOnFlushWriterWrapper<>(new Supplier<DataWriter<byte[]>>() {