You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/06/06 21:25:04 UTC
asterixdb git commit: [NO ISSUE][ING] Improvements to active retry
policy
Repository: asterixdb
Updated Branches:
refs/heads/master 5c28118b5 -> 7c7ce10f5
[NO ISSUE][ING] Improvements to active retry policy
- user model changes: no
- storage format changes: no
- interface changes: yes
- IRetryPolicy.retry now takes a Throwable parameter
Details:
- This change improves the retry policy for active
entities by providing the failure causing the last
failure.
- The change also removes the lock over the active
notification handler on the recover call.
Change-Id: I4246e2a276e1f80569a07630e182aab8f49dd115
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2665
Reviewed-by: Michael Blow <mb...@apache.org>
Contrib: Michael Blow <mb...@apache.org>
Integration-Tests: Michael Blow <mb...@apache.org>
Tested-by: Michael Blow <mb...@apache.org>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/7c7ce10f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/7c7ce10f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/7c7ce10f
Branch: refs/heads/master
Commit: 7c7ce10f563dd23af8079d836dda17b77a9a2729
Parents: 5c28118
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Wed Jun 6 21:09:52 2018 +0300
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Wed Jun 6 14:19:42 2018 -0700
----------------------------------------------------------------------
.../apache/asterix/active/CountRetryPolicy.java | 2 +-
.../org/apache/asterix/active/IRetryPolicy.java | 4 +-
.../asterix/active/InfiniteRetryPolicy.java | 2 +-
.../asterix/active/NoRetryPolicyFactory.java | 2 +-
.../app/active/ActiveNotificationHandler.java | 4 +-
.../apache/asterix/app/active/RecoveryTask.java | 5 +-
.../lang/common/util/LangRecordParseUtil.java | 2 +
.../hyracks/api/client/HyracksConnection.java | 80 +++++++++++++-------
.../nc/work/EnsureAllCcTasksCompleted.java | 4 +-
.../java/org/apache/hyracks/util/ExitUtil.java | 6 +-
10 files changed, 70 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c7ce10f/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
index 6633810..b964430 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/CountRetryPolicy.java
@@ -28,7 +28,7 @@ public class CountRetryPolicy implements IRetryPolicy {
}
@Override
- public boolean retry() {
+ public boolean retry(Throwable failure) {
if (attempted < count) {
attempted++;
return true;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c7ce10f/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
index a010984..1daf07e 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
@@ -21,7 +21,9 @@ package org.apache.asterix.active;
@FunctionalInterface
public interface IRetryPolicy {
/**
+ * @param failure
+ * the cause of the active entity failure
* @return true if one more attempt should be done
*/
- boolean retry();
+ boolean retry(Throwable failure);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c7ce10f/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
index 074f8f4..fde67e6 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/InfiniteRetryPolicy.java
@@ -27,7 +27,7 @@ public class InfiniteRetryPolicy implements IRetryPolicy {
}
@Override
- public boolean retry() {
+ public boolean retry(Throwable failure) {
synchronized (listener) {
try {
listener.wait(5000); //NOSONAR this method is being called in a while loop
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c7ce10f/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
index 1596c17..a48283a 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
@@ -20,7 +20,7 @@ package org.apache.asterix.active;
public class NoRetryPolicyFactory implements IRetryPolicyFactory {
public static final NoRetryPolicyFactory INSTANCE = new NoRetryPolicyFactory();
- private static final IRetryPolicy policy = () -> false;
+ private static final IRetryPolicy policy = failure -> false;
private NoRetryPolicyFactory() {
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c7ce10f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 053e6cd..1b7d5b9 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -207,9 +207,9 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active
}
@Override
- public synchronized void recover() {
+ public void recover() {
LOGGER.log(level, "Starting active recovery");
- for (IActiveEntityEventsListener listener : entityEventListeners.values()) {
+ for (IActiveEntityEventsListener listener : getEventListeners()) {
synchronized (listener) {
LOGGER.log(level, "Entity " + listener.getEntityId() + " is " + listener.getStats());
listener.notifyAll();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c7ce10f/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
index 2de8319..1f72856 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -93,6 +93,7 @@ public class RecoveryTask {
synchronized (listener) {
if (!cancelRecovery) {
listener.setState(ActivityState.PERMANENTLY_FAILED);
+ listener.setRunning(metadataProvider, false);
}
}
} else {
@@ -112,7 +113,7 @@ public class RecoveryTask {
return null;
}
LOGGER.log(level, "calling the policy");
- while (policy.retry()) {
+ while (policy.retry(failure)) {
synchronized (listener) {
if (cancelRecovery) {
return null;
@@ -170,7 +171,9 @@ public class RecoveryTask {
return null;
}
if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+ LOGGER.warn("Recovery for {} permanently failed", listener.getEntityId());
listener.setState(ActivityState.PERMANENTLY_FAILED);
+ listener.setRunning(metadataProvider, false);
}
listener.notifyAll();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c7ce10f/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
index c4b23ef..8367fa0 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/LangRecordParseUtil.java
@@ -213,6 +213,8 @@ public class LangRecordParseUtil {
return aOrderedListToString((AOrderedList) aObj);
case STRING:
return ((AString) aObj).getStringValue();
+ case BOOLEAN:
+ return ((ABoolean) aObj).getBoolean().toString();
default:
throw new AlgebricksException("value of type " + aObj.getType() + " is not supported yet");
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c7ce10f/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
index cfa6f78..eaec3c3 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/client/HyracksConnection.java
@@ -84,8 +84,8 @@ public final class HyracksConnection implements IHyracksClientConnection {
private volatile long reqId = 0L;
- private final ExecutorService uninterruptibleExecutor = Executors.newFixedThreadPool(2,
- r -> new Thread(r, "HyracksConnection Uninterrubtible thread: " + r.getClass().getSimpleName()));
+ private final ExecutorService uninterruptibleExecutor =
+ Executors.newFixedThreadPool(2, r -> new Thread(r, "HyracksConnection Uninterrubtible thread: "));
private final BlockingQueue<UnInterruptibleRequest<?>> uninterruptibles = new ArrayBlockingQueue<>(1);
@@ -367,6 +367,11 @@ public final class HyracksConnection implements IHyracksClientConnection {
return null;
}
+ @Override
+ public String toString() {
+ return "CancelJobRequest: " + jobId.toString();
+ }
+
}
private class StartDeployedJobRequest extends UnInterruptibleRequest<JobId> {
@@ -407,24 +412,35 @@ public final class HyracksConnection implements IHyracksClientConnection {
}
}
+ @Override
+ public String toString() {
+ return "StartJobRequest";
+ }
+
}
private class UninterrubtileRequestHandler implements Runnable {
@SuppressWarnings({ "squid:S2189", "squid:S2142" })
@Override
public void run() {
- while (true) {
- try {
- UnInterruptibleRequest<?> next = uninterruptibles.take();
- reqId++;
- running = true;
- next.handle();
- } catch (InterruptedException e) {
- LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
- continue;
- } finally {
- running = false;
+ String nameBefore = Thread.currentThread().getName();
+ Thread.currentThread().setName(nameBefore + getClass().getSimpleName());
+ try {
+ while (true) {
+ try {
+ UnInterruptibleRequest<?> current = uninterruptibles.take();
+ reqId++;
+ running = true;
+ current.handle();
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
+ continue;
+ } finally {
+ running = false;
+ }
}
+ } finally {
+ Thread.currentThread().setName(nameBefore);
}
}
}
@@ -433,25 +449,31 @@ public final class HyracksConnection implements IHyracksClientConnection {
@Override
@SuppressWarnings({ "squid:S2189", "squid:S2142" })
public void run() {
- long currentReqId = 0L;
- long currentTime = System.nanoTime();
- while (true) {
- try {
- TimeUnit.MINUTES.sleep(1);
- } catch (InterruptedException e) {
- LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
- continue;
- }
- if (running) {
- if (reqId == currentReqId) {
- if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) {
- ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST);
+ String nameBefore = Thread.currentThread().getName();
+ Thread.currentThread().setName(nameBefore + getClass().getSimpleName());
+ try {
+ long currentReqId = 0L;
+ long currentTime = System.nanoTime();
+ while (true) {
+ try {
+ TimeUnit.MINUTES.sleep(1);
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARN, "Ignoring interrupt. This thread should never be interrupted.");
+ continue;
+ }
+ if (running) {
+ if (reqId == currentReqId) {
+ if (TimeUnit.NANOSECONDS.toMinutes(System.nanoTime() - currentTime) > 0) {
+ ExitUtil.halt(ExitUtil.EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST);
+ }
+ } else {
+ currentReqId = reqId;
+ currentTime = System.nanoTime();
}
- } else {
- currentReqId = reqId;
- currentTime = System.nanoTime();
}
}
+ } finally {
+ Thread.currentThread().setName(nameBefore);
}
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c7ce10f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
index 5964c04..0f36c80 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/EnsureAllCcTasksCompleted.java
@@ -66,11 +66,11 @@ public class EnsureAllCcTasksCompleted implements Runnable {
LOGGER.error("{} tasks associated with CC {} failed to complete after {}ms. Giving up",
runningTasks.size(), ccId, TIMEOUT);
logPendingTasks();
- ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+ ExitUtil.halt(ExitUtil.EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
}
} catch (Throwable th) {
LOGGER.error("Failed to abort all previous tasks associated with CC {}", ccId, th);
- ExitUtil.halt(ExitUtil.NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
+ ExitUtil.halt(ExitUtil.EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7c7ce10f/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
index c8b9112..8500842 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java
@@ -33,7 +33,7 @@ public class ExitUtil {
public static final int EC_ABNORMAL_TERMINATION = 1;
public static final int EC_FAILED_TO_STARTUP = 2;
public static final int EC_FAILED_TO_RECOVER = 3;
- public static final int NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4;
+ public static final int EC_NC_FAILED_TO_ABORT_ALL_PREVIOUS_TASKS = 4;
public static final int EC_FAILED_TO_PROCESS_UN_INTERRUPTIBLE_REQUEST = 5;
public static final int EC_UNHANDLED_EXCEPTION = 11;
public static final int EC_IMMEDIATE_HALT = 33;
@@ -75,8 +75,8 @@ public class ExitUtil {
exit(status);
}
- public static void halt(int status) {
- LOGGER.fatal("JVM halting with status " + status + "; bye!", new Throwable("halt stacktrace"));
+ public static synchronized void halt(int status) {
+ LOGGER.fatal("JVM halting with status {}; thread dump at halt: {}", status, ThreadDumpUtil.takeDumpString());
// try to give time for the log to be emitted...
LogManager.shutdown();
Runtime.getRuntime().halt(status);