You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/05/21 03:55:44 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-774] Send nack
when a control message handler fails in Fork
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6c327be [GOBBLIN-774] Send nack when a control message handler fails in Fork
6c327be is described below
commit 6c327bea2d3da1122c349ad698903be1c35dcf82
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Mon May 20 20:55:36 2019 -0700
[GOBBLIN-774] Send nack when a control message handler fails in Fork
Closes #2639 from
htran1/control_message_nack_in_fork
---
.../records/FlushControlMessageHandler.java | 2 +-
.../java/org/apache/gobblin/runtime/fork/Fork.java | 10 ++-
.../apache/gobblin/runtime/TestRecordStream.java | 95 +++++++++++++++++++++-
3 files changed, 104 insertions(+), 3 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/records/FlushControlMessageHandler.java b/gobblin-api/src/main/java/org/apache/gobblin/records/FlushControlMessageHandler.java
index 3047a2d..081d87c 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/records/FlushControlMessageHandler.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/records/FlushControlMessageHandler.java
@@ -27,7 +27,7 @@ import org.apache.gobblin.stream.FlushControlMessage;
* Flush control message handler that will flush a {@link Flushable} when handling a {@link FlushControlMessage}
*/
public class FlushControlMessageHandler implements ControlMessageHandler {
- Flushable flushable;
+ protected Flushable flushable;
/**
* Create a flush control message that will flush the given {@link Flushable}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index 7ce2bdf..bbe26dd 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -217,7 +217,15 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
if (r instanceof RecordEnvelope) {
this.writer.get().writeEnvelope((RecordEnvelope) r);
} else if (r instanceof ControlMessage) {
- this.writer.get().getMessageHandler().handleMessage((ControlMessage) r);
+ // Nack with error and reraise the error if the control messsage handling raises an error.
+ // This is to avoid missing an ack/nack in the error path.
+ try {
+ this.writer.get().getMessageHandler().handleMessage((ControlMessage) r);
+ } catch (Throwable error) {
+ r.nack(error);
+ throw error;
+ }
+
r.ack();
}
}, e -> logger.error("Failed to process record.", e),
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
index 620e401..d48349a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TestRecordStream.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.runtime;
+import java.io.Flushable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -33,6 +34,7 @@ import org.testng.annotations.Test;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
+import org.apache.gobblin.ack.Ackable;
import org.apache.gobblin.ack.BasicAckableForTesting;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
@@ -117,6 +119,40 @@ public class TestRecordStream {
Assert.assertEquals(writer.flush_messages, Lists.newArrayList("flush called", "flush called"));
}
+ @Test
+ public void testFlushFailure() throws Exception {
+ FlushAckable flushAckable1 = new FlushAckable();
+ FlushAckable flushAckable2 = new FlushAckable();
+
+ MyExtractor extractor = new MyExtractor(new StreamEntity[]{new RecordEnvelope<>("a"),
+ FlushControlMessage.builder().flushReason("flush1").build().addCallBack(flushAckable1), new RecordEnvelope<>("b"),
+ FlushControlMessage.builder().flushReason("flushFail1").build().addCallBack(flushAckable2)});
+ MyConverter converter = new MyConverter();
+ MyFlushDataWriter writer = new MyFlushDataWriter();
+
+ Task task = setupTask(extractor, writer, converter);
+
+ task.run();
+
+ // first flush should succeed, but second one should fail
+ Throwable error = flushAckable1.waitForAck();
+ Assert.assertNull(error);
+
+ error = flushAckable2.waitForAck();
+ Assert.assertNotNull(error);
+
+ task.commit();
+ Assert.assertEquals(task.getTaskState().getWorkingState(), WorkUnitState.WorkingState.SUCCESSFUL);
+
+ Assert.assertEquals(converter.records, Lists.newArrayList("a", "b"));
+ Assert.assertEquals(converter.messages, Lists.newArrayList(
+ FlushControlMessage.builder().flushReason("flush1").build(),
+ FlushControlMessage.builder().flushReason("flushFail1").build()));
+
+ Assert.assertEquals(writer.records, Lists.newArrayList("a", "b"));
+ Assert.assertEquals(writer.flush_messages, Lists.newArrayList("flush called"));
+ }
+
/**
* Test of metadata update control messages that signal the converters to change schemas
* @throws Exception
@@ -489,12 +525,69 @@ public class TestRecordStream {
}
}
+ static class MyFlushControlMessageHandler extends FlushControlMessageHandler {
+ public MyFlushControlMessageHandler(Flushable flushable) {
+ super(flushable);
+ }
+
+ @Override
+ public void handleMessage(ControlMessage message) {
+ if (message instanceof FlushControlMessage) {
+ if (((FlushControlMessage) message).getFlushReason().contains("Fail")) {
+ throw new RuntimeException("Flush failed: " + ((FlushControlMessage) message).getFlushReason());
+ }
+
+ try {
+ flushable.flush();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not flush when handling FlushControlMessage", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * {@link Ackable} for waiting for the flush control message to be processed
+ */
+ private static class FlushAckable implements Ackable {
+ private Throwable error;
+ private final CountDownLatch processed;
+
+ public FlushAckable() {
+ this.processed = new CountDownLatch(1);
+ }
+
+ @Override
+ public void ack() {
+ this.processed.countDown();
+ }
+
+ @Override
+ public void nack(Throwable error) {
+ this.error = error;
+ this.processed.countDown();
+ }
+
+ /**
+ * Wait for ack
+ * @return any error encountered
+ */
+ public Throwable waitForAck() {
+ try {
+ this.processed.await();
+ return this.error;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("interrupted while waiting for ack");
+ }
+ }
+ }
+
static class MyFlushDataWriter extends MyDataWriter {
private List<String> flush_messages = new ArrayList<>();
@Override
public ControlMessageHandler getMessageHandler() {
- return new FlushControlMessageHandler(this);
+ return new MyFlushControlMessageHandler(this);
}
@Override