You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/09 00:44:09 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-727] Skip commit in CloseOnFlushWriterWrapper if a commit has already been invoked on the underlying writer.[]

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c1fd44  [GOBBLIN-727] Skip commit in CloseOnFlushWriterWrapper if a commit has already been invoked on the underlying writer.[]
4c1fd44 is described below

commit 4c1fd444d05ead53d5df9999d2263035f623bdd3
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Apr 8 17:44:03 2019 -0700

    [GOBBLIN-727] Skip commit in CloseOnFlushWriterWrapper if a commit has already been invoked on the underlying writer.[]
    
    Closes #2594 from sv2000/closeOnFlushWriterWrapper
---
 .../org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java     | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
index 5e912bb..bc6c9b9 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -57,6 +57,8 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
   private DataWriter<D> writer;
   private final Supplier<DataWriter<D>> writerSupplier;
   private boolean closed;
+  private boolean committed;
+
   // is the close functionality enabled?
   private final boolean closeOnFlush;
   private final ControlMessageHandler controlMessageHandler;
@@ -90,6 +92,7 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
     if (this.closed) {
       this.writer = writerSupplier.get();
       this.closed = false;
+      this.committed = false;
     }
     this.writer.writeEnvelope(record);
   }
@@ -104,13 +107,15 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
 
   @Override
   public void commit() throws IOException {
-    writer.commit();
+    if (!this.committed) {
+      writer.commit();
+      this.committed = true;
+    }
   }
 
   @Override
   public void cleanup() throws IOException {
     writer.cleanup();
-
   }
 
   @Override