You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/05/21 03:55:44 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-774] Send nack when a control message handler fails in Fork

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c327be  [GOBBLIN-774] Send nack when a control message handler fails in Fork
6c327be is described below

commit 6c327bea2d3da1122c349ad698903be1c35dcf82
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Mon May 20 20:55:36 2019 -0700

    [GOBBLIN-774] Send nack when a control message handler fails in Fork
    
    Closes #2639 from
    htran1/control_message_nack_in_fork
---
 .../records/FlushControlMessageHandler.java        |  2 +-
 .../java/org/apache/gobblin/runtime/fork/Fork.java | 10 ++-
 .../apache/gobblin/runtime/TestRecordStream.java   | 95 +++++++++++++++++++++-
 3 files changed, 104 insertions(+), 3 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/records/FlushControlMessageHandler.java b/gobblin-api/src/main/java/org/apache/gobblin/records/FlushControlMessageHandler.java
index 3047a2d..081d87c 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/records/FlushControlMessageHandler.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/records/FlushControlMessageHandler.java
@@ -27,7 +27,7 @@ import org.apache.gobblin.stream.FlushControlMessage;
  * Flush control message handler that will flush a {@link Flushable} when handling a {@link FlushControlMessage}
  */
 public class FlushControlMessageHandler implements ControlMessageHandler {
-  Flushable flushable;
+  protected Flushable flushable;
 
   /**
    * Create a flush control message that will flush the given {@link Flushable}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index 7ce2bdf..bbe26dd 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -217,7 +217,15 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
         if (r instanceof RecordEnvelope) {
           this.writer.get().writeEnvelope((RecordEnvelope) r);
         } else if (r instanceof ControlMessage) {
-          this.writer.get().getMessageHandler().handleMessage((ControlMessage) r);
+          // Nack with error and reraise the error if the control messsage handling raises an error.
+          // This is to avoid missing an ack/nack in the error path.
+          try {
+            this.writer.get().getMessageHandler().handleMessage((ControlMessage) r);
+          } catch (Throwable error) {
+            r.nack(error);
+            throw error;
+          }
+
           r.ack();
         }
       }, e -> logger.error("Failed to process record.", e),
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
index 620e401..d48349a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime;
 
+import java.io.Flushable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -33,6 +34,7 @@ import org.testng.annotations.Test;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 
+import org.apache.gobblin.ack.Ackable;
 import org.apache.gobblin.ack.BasicAckableForTesting;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -117,6 +119,40 @@ public class TestRecordStream {
     Assert.assertEquals(writer.flush_messages, Lists.newArrayList("flush called", "flush called"));
   }
 
+  @Test
+  public void testFlushFailure() throws Exception {
+    FlushAckable flushAckable1 = new FlushAckable();
+    FlushAckable flushAckable2 = new FlushAckable();
+
+    MyExtractor extractor = new MyExtractor(new StreamEntity[]{new RecordEnvelope<>("a"),
+        FlushControlMessage.builder().flushReason("flush1").build().addCallBack(flushAckable1), new RecordEnvelope<>("b"),
+        FlushControlMessage.builder().flushReason("flushFail1").build().addCallBack(flushAckable2)});
+    MyConverter converter = new MyConverter();
+    MyFlushDataWriter writer = new MyFlushDataWriter();
+
+    Task task = setupTask(extractor, writer, converter);
+
+    task.run();
+
+    // first flush should succeed, but second one should fail
+    Throwable error = flushAckable1.waitForAck();
+    Assert.assertNull(error);
+
+    error = flushAckable2.waitForAck();
+    Assert.assertNotNull(error);
+
+    task.commit();
+    Assert.assertEquals(task.getTaskState().getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL);
+
+    Assert.assertEquals(converter.records, Lists.newArrayList("a", "b"));
+    Assert.assertEquals(converter.messages, Lists.newArrayList(
+        FlushControlMessage.builder().flushReason("flush1").build(),
+        FlushControlMessage.builder().flushReason("flushFail1").build()));
+
+    Assert.assertEquals(writer.records, Lists.newArrayList("a", "b"));
+    Assert.assertEquals(writer.flush_messages, Lists.newArrayList("flush called"));
+  }
+
   /**
    * Test of metadata update control messages that signal the converters to change schemas
    * @throws Exception
@@ -489,12 +525,69 @@ public class TestRecordStream {
     }
   }
 
+  static class MyFlushControlMessageHandler extends FlushControlMessageHandler {
+    public MyFlushControlMessageHandler(Flushable flushable) {
+      super(flushable);
+    }
+
+    @Override
+    public void handleMessage(ControlMessage message) {
+      if (message instanceof FlushControlMessage) {
+        if (((FlushControlMessage) message).getFlushReason().contains("Fail")) {
+          throw new RuntimeException("Flush failed: " + ((FlushControlMessage) message).getFlushReason());
+        }
+
+        try {
+          flushable.flush();
+        } catch (IOException e) {
+          throw new RuntimeException("Could not flush when handling FlushControlMessage", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * {@link Ackable} for waiting for the flush control message to be processed
+   */
+  private static class FlushAckable implements Ackable {
+    private Throwable error;
+    private final CountDownLatch processed;
+
+    public FlushAckable() {
+      this.processed = new CountDownLatch(1);
+    }
+
+    @Override
+    public void ack() {
+      this.processed.countDown();
+    }
+
+    @Override
+    public void nack(Throwable error) {
+      this.error = error;
+      this.processed.countDown();
+    }
+
+    /**
+     * Wait for ack
+     * @return any error encountered
+     */
+    public Throwable waitForAck() {
+      try {
+        this.processed.await();
+        return this.error;
+      } catch (InterruptedException e) {
+        throw new RuntimeException("interrupted while waiting for ack");
+      }
+    }
+  }
+
   static class MyFlushDataWriter extends MyDataWriter {
     private List<String> flush_messages = new ArrayList<>();
 
     @Override
     public ControlMessageHandler getMessageHandler() {
-      return new FlushControlMessageHandler(this);
+      return new MyFlushControlMessageHandler(this);
     }
 
     @Override