You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/09 00:44:09 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-727] Skip
commit in CloseOnFlushWriterWrapper if a commit has already been invoked on
the underlying writer.[]
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4c1fd44 [GOBBLIN-727] Skip commit in CloseOnFlushWriterWrapper if a commit has already been invoked on the underlying writer.[]
4c1fd44 is described below
commit 4c1fd444d05ead53d5df9999d2263035f623bdd3
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Apr 8 17:44:03 2019 -0700
[GOBBLIN-727] Skip commit in CloseOnFlushWriterWrapper if a commit has already been invoked on the underlying writer.[]
Closes #2594 from sv2000/closeOnFlushWriterWrapper
---
.../org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
index 5e912bb..bc6c9b9 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java
@@ -57,6 +57,8 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
private DataWriter<D> writer;
private final Supplier<DataWriter<D>> writerSupplier;
private boolean closed;
+ private boolean committed;
+
// is the close functionality enabled?
private final boolean closeOnFlush;
private final ControlMessageHandler controlMessageHandler;
@@ -90,6 +92,7 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
if (this.closed) {
this.writer = writerSupplier.get();
this.closed = false;
+ this.committed = false;
}
this.writer.writeEnvelope(record);
}
@@ -104,13 +107,15 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De
@Override
public void commit() throws IOException {
- writer.commit();
+ if (!this.committed) {
+ writer.commit();
+ this.committed = true;
+ }
}
@Override
public void cleanup() throws IOException {
writer.cleanup();
-
}
@Override