You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/08/09 16:04:17 UTC

incubator-gobblin git commit: [GOBBLIN-184] Call the flush method of CloseOnFlushWriterWrapper when a FlushControlMessage is received

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a28ff2185 -> f502cbee3


[GOBBLIN-184] Call the flush method of CloseOnFlushWriterWrapper when a FlushControlMessage is received

Closes #2040 from
htran1/close_on_flush_handler_fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f502cbee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f502cbee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f502cbee

Branch: refs/heads/master
Commit: f502cbee33282c89438613c5882018006c5f3225
Parents: a28ff21
Author: Hung Tran <hu...@linkedin.com>
Authored: Wed Aug 9 09:04:03 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Aug 9 09:04:03 2017 -0700

----------------------------------------------------------------------
 .../writer/CloseOnFlushWriterWrapper.java       |  9 +++-
 .../writer/CloseOnFlushWriterWrapperTest.java   | 46 +++++++++++++++-----
 2 files changed, 42 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f502cbee/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
index 2c81e52..c244b2d 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -28,6 +28,7 @@ import com.google.common.base.Preconditions;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.records.ControlMessageHandler;
+import org.apache.gobblin.records.FlushControlMessageHandler;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.Decorator;
 import org.apache.gobblin.util.FinalState;
@@ -126,7 +127,13 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
 
   @Override
   public ControlMessageHandler getMessageHandler() {
-    return this.writer.getMessageHandler();
+    // if close on flush is configured then create a handler that will invoke the wrapper's flush to perform close
+    // on flush operations, otherwise return the wrapped writer's handler.
+    if (this.closeOnFlush) {
+      return new FlushControlMessageHandler(this);
+    } else {
+      return this.writer.getMessageHandler();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f502cbee/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
index 11b2274..b14793a 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java
@@ -26,6 +26,9 @@ import org.testng.annotations.Test;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.records.ControlMessageHandler;
+import org.apache.gobblin.stream.ControlMessage;
+import org.apache.gobblin.stream.FlushControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
 
 public class CloseOnFlushWriterWrapperTest {
@@ -40,12 +43,13 @@ public class CloseOnFlushWriterWrapperTest {
     byte[] record = new byte[]{'a', 'b', 'c', 'd'};
 
     writer.writeEnvelope(new RecordEnvelope(record));
-    writer.flush();
+    writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
 
     Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
-    Assert.assertEquals(dummyWriters.get(0).closed, false);
-    Assert.assertEquals(dummyWriters.get(0).committed, false);
+    Assert.assertFalse(dummyWriters.get(0).closed);
+    Assert.assertFalse(dummyWriters.get(0).committed);
+    Assert.assertTrue(dummyWriters.get(0).handlerCalled);
   }
 
   @Test
@@ -59,12 +63,14 @@ public class CloseOnFlushWriterWrapperTest {
     byte[] record = new byte[]{'a', 'b', 'c', 'd'};
 
     writer.writeEnvelope(new RecordEnvelope(record));
-    writer.flush();
+    writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
 
     Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
-    Assert.assertEquals(dummyWriters.get(0).closed, true);
-    Assert.assertEquals(dummyWriters.get(0).committed, true);
+    Assert.assertTrue(dummyWriters.get(0).closed);
+    Assert.assertTrue(dummyWriters.get(0).committed);
+    // handler from CloseOnFlushWriterWrapper should have been called instead
+    Assert.assertFalse(dummyWriters.get(0).handlerCalled);
   }
 
   @Test
@@ -78,22 +84,24 @@ public class CloseOnFlushWriterWrapperTest {
     byte[] record = new byte[]{'a', 'b', 'c', 'd'};
 
     writer.writeEnvelope(new RecordEnvelope(record));
-    writer.flush();
+    writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
 
     Assert.assertEquals(dummyWriters.size(), 1);
     Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
-    Assert.assertEquals(dummyWriters.get(0).closed, true);
-    Assert.assertEquals(dummyWriters.get(0).committed, true);
+    Assert.assertTrue(dummyWriters.get(0).closed);
+    Assert.assertTrue(dummyWriters.get(0).committed);
+    Assert.assertFalse(dummyWriters.get(0).handlerCalled);
 
     writer.writeEnvelope(new RecordEnvelope(record));
-    writer.flush();
+    writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));
 
     Assert.assertEquals(dummyWriters.size(), 2);
     Assert.assertEquals(dummyWriters.get(1).recordsWritten(), 1);
     Assert.assertEquals(dummyWriters.get(1).flushCount, 1);
-    Assert.assertEquals(dummyWriters.get(1).closed, true);
-    Assert.assertEquals(dummyWriters.get(1).committed, true);
+    Assert.assertTrue(dummyWriters.get(1).closed);
+    Assert.assertTrue(dummyWriters.get(1).committed);
+    Assert.assertFalse(dummyWriters.get(1).handlerCalled);
   }
 
   private CloseOnFlushWriterWrapper getCloseOnFlushWriter(List<DummyWriter> dummyWriters, WorkUnitState state) {
@@ -113,6 +121,7 @@ public class CloseOnFlushWriterWrapperTest {
     private int flushCount = 0;
     private boolean committed = false;
     private boolean closed = false;
+    private boolean handlerCalled = false;
 
     DummyWriter() {
     }
@@ -153,6 +162,19 @@ public class CloseOnFlushWriterWrapperTest {
     }
 
     @Override
+    public ControlMessageHandler getMessageHandler() {
+      return new ControlMessageHandler() {
+        @Override
+        public void handleMessage(ControlMessage message) {
+          handlerCalled = true;
+          if (message instanceof FlushControlMessage) {
+            flush();
+          }
+        }
+      };
+    }
+
+    @Override
     public void flush() {
       this.flushCount++;
     }