You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/08/09 16:04:17 UTC
incubator-gobblin git commit: [GOBBLIN-184] Call the flush method of
CloseOnFlushWriterWrapper when a FlushControlMessage is received
Repository: incubator-gobblin
Updated Branches:
refs/heads/master a28ff2185 -> f502cbee3
[GOBBLIN-184] Call the flush method of CloseOnFlushWriterWrapper when a FlushControlMessage is received
Closes #2040 from
htran1/close_on_flush_handler_fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f502cbee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f502cbee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f502cbee
Branch: refs/heads/master
Commit: f502cbee33282c89438613c5882018006c5f3225
Parents: a28ff21
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Aug 9 09:04:03 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Aug 9 09:04:03 2017 -0700
----------------------------------------------------------------------
.../writer/CloseOnFlushWriterWrapper.java | 9 +++-
.../writer/CloseOnFlushWriterWrapperTest.java | 46 +++++++++++++++-----
2 files changed, 42 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f502cbee/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
index 2c81e52..c244b2d 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -28,6 +28,7 @@ import com.google.common.base.Preconditions;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.records.ControlMessageHandler;
+import org.apache.gobblin.records.FlushControlMessageHandler;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.Decorator;
import org.apache.gobblin.util.FinalState;
@@ -126,7 +127,13 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
@Override
public ControlMessageHandler getMessageHandler() {
- return this.writer.getMessageHandler();
+ // if close on flush is configured then create a handler that will invoke the wrapper's flush to perform close
+ // on flush operations, otherwise return the wrapped writer's handler.
+ if (this.closeOnFlush) {
+ return new FlushControlMessageHandler(this);
+ } else {
+ return this.writer.getMessageHandler();
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f502cbee/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
index 11b2274..b14793a 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
@@ -26,6 +26,9 @@ import org.testng.annotations.Test;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.records.ControlMessageHandler;
+import org.apache.gobblin.stream.ControlMessage;
+import org.apache.gobblin.stream.FlushControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
public class CloseOnFlushWriterWrapperTest {
@@ -40,12 +43,13 @@ public class CloseOnFlushWriterWrapperTest {
byte[] record = new byte[]{'a', 'b', 'c', 'd'};
writer.writeEnvelope(new RecordEnvelope(record));
- writer.flush();
+ writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
- Assert.assertEquals(dummyWriters.get(0).closed, false);
- Assert.assertEquals(dummyWriters.get(0).committed, false);
+ Assert.assertFalse(dummyWriters.get(0).closed);
+ Assert.assertFalse(dummyWriters.get(0).committed);
+ Assert.assertTrue(dummyWriters.get(0).handlerCalled);
}
@Test
@@ -59,12 +63,14 @@ public class CloseOnFlushWriterWrapperTest {
byte[] record = new byte[]{'a', 'b', 'c', 'd'};
writer.writeEnvelope(new RecordEnvelope(record));
- writer.flush();
+ writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
- Assert.assertEquals(dummyWriters.get(0).closed, true);
- Assert.assertEquals(dummyWriters.get(0).committed, true);
+ Assert.assertTrue(dummyWriters.get(0).closed);
+ Assert.assertTrue(dummyWriters.get(0).committed);
+ // handler from CloseOnFlushWriterWrapper should have been called instead
+ Assert.assertFalse(dummyWriters.get(0).handlerCalled);
}
@Test
@@ -78,22 +84,24 @@ public class CloseOnFlushWriterWrapperTest {
byte[] record = new byte[]{'a', 'b', 'c', 'd'};
writer.writeEnvelope(new RecordEnvelope(record));
- writer.flush();
+ writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
Assert.assertEquals(dummyWriters.size(), 1);
Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
- Assert.assertEquals(dummyWriters.get(0).closed, true);
- Assert.assertEquals(dummyWriters.get(0).committed, true);
+ Assert.assertTrue(dummyWriters.get(0).closed);
+ Assert.assertTrue(dummyWriters.get(0).committed);
+ Assert.assertFalse(dummyWriters.get(0).handlerCalled);
writer.writeEnvelope(new RecordEnvelope(record));
- writer.flush();
+ writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
Assert.assertEquals(dummyWriters.size(), 2);
Assert.assertEquals(dummyWriters.get(1).recordsWritten(), 1);
Assert.assertEquals(dummyWriters.get(1).flushCount, 1);
- Assert.assertEquals(dummyWriters.get(1).closed, true);
- Assert.assertEquals(dummyWriters.get(1).committed, true);
+ Assert.assertTrue(dummyWriters.get(1).closed);
+ Assert.assertTrue(dummyWriters.get(1).committed);
+ Assert.assertFalse(dummyWriters.get(1).handlerCalled);
}
private CloseOnFlushWriterWrapper getCloseOnFlushWriter(List<DummyWriter> dummyWriters, WorkUnitState state) {
@@ -113,6 +121,7 @@ public class CloseOnFlushWriterWrapperTest {
private int flushCount = 0;
private boolean committed = false;
private boolean closed = false;
+ private boolean handlerCalled = false;
DummyWriter() {
}
@@ -153,6 +162,19 @@ public class CloseOnFlushWriterWrapperTest {
}
@Override
+ public ControlMessageHandler getMessageHandler() {
+ return new ControlMessageHandler() {
+ @Override
+ public void handleMessage(ControlMessage message) {
+ handlerCalled = true;
+ if (message instanceof FlushControlMessage) {
+ flush();
+ }
+ }
+ };
+ }
+
+ @Override
public void flush() {
this.flushCount++;
}