You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2017/08/25 17:32:04 UTC
asterixdb git commit: [ASTERIXDB-2067][ING] Handle Failures in
Controller Flush
Repository: asterixdb
Updated Branches:
refs/heads/master 979012d5e -> 4f79620cd
[ASTERIXDB-2067][ING] Handle Failures in Controller Flush
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Failures that happen in feed while reading from external
sources allows ingestion pipeline to close gracefully
pushing parsed records in the frame forward before
failing.
- There was an assumption that when hasNext() or next()
are being called on a data reader and we fail, then
the failure didn't affect the integrity of the pipeline.
- This assumption is incorrect as hasNext() and next()
can themselves flush the pipeline and if the failure
happened during the flush call, the pipeline must be
failed.
Change-Id: Ib9be729088bd94338ef2353333eaea34ba3da99f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1968
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4f79620c
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4f79620c
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4f79620c
Branch: refs/heads/master
Commit: 4f79620cd71bd750548a749db352c418fbadddae
Parents: 979012d
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Thu Aug 24 17:33:29 2017 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Fri Aug 25 10:31:38 2017 -0700
----------------------------------------------------------------------
.../org/apache/asterix/app/active/RecoveryTask.java | 3 +++
.../dataflow/AbstractFeedDataFlowController.java | 3 +++
.../dataflow/FeedRecordDataFlowController.java | 16 ++++++++++++++--
3 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f79620c/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 7b7de93..73439ce 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -171,6 +171,9 @@ public class RecoveryTask {
DatasetUtil.getFullyQualifiedName(dataset));
}
synchronized (listener) {
+ if (cancelRecovery) {
+ return null;
+ }
if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
listener.setState(ActivityState.PERMANENTLY_FAILED);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f79620c/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 53fa137..3437de1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -30,6 +30,7 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl
protected final int numOfFields;
protected final ArrayTupleBuilder tb;
protected final FeedLogManager feedLogManager;
+ protected boolean flushing;
public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
FeedLogManager feedLogManager, int numOfFields) {
@@ -54,7 +55,9 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl
@Override
public void flush() throws HyracksDataException {
+ flushing = true;
tupleForwarder.flush();
+ flushing = false;
}
public abstract String getStats();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4f79620c/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 4ed1b08..c85e236 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -131,12 +131,18 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
return state;
}
- private IRawRecord<? extends T> next() throws HyracksDataException {
+ private IRawRecord<? extends T> next() throws Exception {
try {
return recordReader.next();
} catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
+ if (flushing) {
+ throw e;
+ }
throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
} catch (Exception e) {
+ if (flushing) {
+ throw e;
+ }
if (!recordReader.handleException(e)) {
throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e);
}
@@ -144,13 +150,19 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
}
}
- private boolean hasNext() throws HyracksDataException {
+ private boolean hasNext() throws Exception {
while (true) {
try {
return recordReader.hasNext();
} catch (InterruptedException e) { // NOSONAR Gracefully handling interrupt to push records in the pipeline
+ if (flushing) {
+ throw e;
+ }
throw new RuntimeDataException(ErrorCode.FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD, e);
} catch (Exception e) {
+ if (flushing) {
+ throw e;
+ }
if (!recordReader.handleException(e)) {
throw new RuntimeDataException(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD, e);
}