You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/04/27 18:17:52 UTC
[incubator-pulsar] branch master updated: Better Runtime failure
mgmt (#1671)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f307080 Better Runtime failure mgmt (#1671)
f307080 is described below
commit f307080e9558bf34edfffbecf9bdcdbe08fa7427
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Apr 27 11:17:50 2018 -0700
Better Runtime failure mgmt (#1671)
* Retooled management of Runtime failures
* Removed extra code
---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 2 +-
.../functions/instance/JavaInstanceRunnable.java | 7 +-
.../pulsar/functions/runtime/JavaInstanceMain.java | 2 +-
.../pulsar/functions/runtime/ProcessRuntime.java | 74 ++++++++++------------
.../pulsar/functions/runtime/RuntimeSpawner.java | 12 ++--
.../pulsar/functions/runtime/ThreadRuntime.java | 21 ++----
6 files changed, 50 insertions(+), 68 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 1369a69..92bd757 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -565,7 +565,7 @@ public class CmdFunctions extends CmdBase {
instanceConfig,
userCodeFile,
containerFactory,
- null);
+ 0);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ce5220f..c16bc10 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -88,8 +88,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
@Getter(AccessLevel.PACKAGE)
private Table<ByteBuf, ByteBuf> stateTable;
- @Getter
- private Exception failureException;
private JavaInstance javaInstance;
private AtomicBoolean running = new AtomicBoolean(true);
@@ -229,10 +227,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
} catch (Exception ex) {
log.error("Uncaught exception in Java Instance", ex);
- if (running.get()) {
- failureException = ex;
- throw new RuntimeException(ex);
- }
+ throw new RuntimeException(ex);
} finally {
close();
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 4b14dad..77d8e97 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -189,7 +189,7 @@ public class JavaInstanceMain {
instanceConfig,
jarFile,
containerFactory,
- null);
+ 0);
server = ServerBuilder.forPort(port)
.addService(new InstanceControlImpl(runtimeSpawner))
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 6a46097..ba510cc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -54,7 +54,8 @@ class ProcessRuntime implements Runtime {
@Getter
private List<String> processArgs;
private int instancePort;
- private Exception startupException;
+ @Getter
+ private Exception deathException;
private ManagedChannel channel;
private InstanceControlGrpc.InstanceControlFutureStub stub;
@@ -227,8 +228,8 @@ class ProcessRuntime implements Runtime {
public void onFailure(Throwable throwable) {
FunctionStatus.Builder builder = FunctionStatus.newBuilder();
builder.setRunning(false);
- if (startupException != null) {
- builder.setFailureException(startupException.getMessage());
+ if (deathException != null) {
+ builder.setFailureException(deathException.getMessage());
} else {
builder.setFailureException(throwable.getMessage());
}
@@ -280,19 +281,20 @@ class ProcessRuntime implements Runtime {
}
private void startProcess() {
- startupException = null;
+ deathException = null;
try {
ProcessBuilder processBuilder = new ProcessBuilder(processArgs);
log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command()));
process = processBuilder.start();
} catch (Exception ex) {
log.error("Starting process failed", ex);
- startupException = ex;
+ deathException = ex;
return;
}
try {
int exitValue = process.exitValue();
log.error("Instance Process quit unexpectedly with return value " + exitValue);
+ tryExtractingDeathException();
} catch (IllegalThreadStateException ex) {
log.info("Started process successfully");
}
@@ -300,49 +302,41 @@ class ProcessRuntime implements Runtime {
@Override
public boolean isAlive() {
- return process != null && process.isAlive();
+ if (process == null) {
+ return false;
+ }
+ if (!process.isAlive()) {
+ if (deathException == null) {
+ tryExtractingDeathException();
+ }
+ return false;
+ }
+ FunctionStatus status;
+ try {
+ status = getFunctionStatus().get();
+ } catch (Exception ex) {
+ return false;
+ }
+ if (!status.getRunning()) {
+ if (status.getFailureException() != null && !status.getFailureException().isEmpty()) {
+ deathException = new Exception(status.getFailureException());
+ }
+ return false;
+ }
+ return true;
}
- @Override
- public Exception getDeathException() {
- if (isAlive()) return null;
- if (startupException != null) return startupException;
+ private void tryExtractingDeathException() {
InputStream errorStream = process.getErrorStream();
try {
byte[] errorBytes = new byte[errorStream.available()];
errorStream.read(errorBytes);
String errorMessage = new String(errorBytes);
- startupException = new RuntimeException(errorMessage);
+ deathException = new RuntimeException(errorMessage);
+ log.error("Extracted Process death exception", deathException);
} catch (Exception ex) {
- startupException = ex;
+ deathException = ex;
+ log.error("Error extracting Process death exception", deathException);
}
- return startupException;
- }
-
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- int port = Integer.parseInt(args[0]);
-
- ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", port)
- .usePlaintext(true)
- .build();
- InstanceControlFutureStub stub = InstanceControlGrpc.newFutureStub(channel);
- ListenableFuture<FunctionStatus> response = stub.getFunctionStatus(Empty.newBuilder().build());
- CompletableFuture<FunctionStatus> future = new CompletableFuture<>();
- Futures.addCallback(response, new FutureCallback<FunctionStatus>() {
- @Override
- public void onFailure(Throwable throwable) {
- log.info("GetFunctionStatus:", throwable);
- future.completeExceptionally(throwable);
- }
-
- @Override
- public void onSuccess(InstanceCommunication.FunctionStatus t) {
- log.info("GetFunctionStatus: {}", t);
- future.complete(t);
- }
- });
- FunctionStatus status = future.get();
-
- log.info("Function Status : {}", status);
}
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 272ca6a..2b020df 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -43,12 +43,13 @@ public class RuntimeSpawner implements AutoCloseable {
private Runtime runtime;
private Timer processLivenessCheckTimer;
private int numRestarts;
- private Long instanceLivenessCheckFreqMs;
+ private long instanceLivenessCheckFreqMs;
+ private Exception runtimeDeathException;
public RuntimeSpawner(InstanceConfig instanceConfig,
String codeFile,
- RuntimeFactory containerFactory, Long instanceLivenessCheckFreqMs) {
+ RuntimeFactory containerFactory, long instanceLivenessCheckFreqMs) {
this.instanceConfig = instanceConfig;
this.runtimeFactory = containerFactory;
this.codeFile = codeFile;
@@ -63,7 +64,7 @@ public class RuntimeSpawner implements AutoCloseable {
runtime.start();
// monitor function runtime to make sure it is running. If not, restart the function runtime
- if (instanceLivenessCheckFreqMs != null) {
+ if (instanceLivenessCheckFreqMs > 0) {
processLivenessCheckTimer = new Timer();
processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
@Override
@@ -71,6 +72,7 @@ public class RuntimeSpawner implements AutoCloseable {
if (!runtime.isAlive()) {
log.error("Function Container is dead with exception", runtime.getDeathException());
log.error("Restarting...");
+ runtimeDeathException = runtime.getDeathException();
runtime.start();
numRestarts++;
}
@@ -89,8 +91,8 @@ public class RuntimeSpawner implements AutoCloseable {
return runtime.getFunctionStatus().thenApply(f -> {
FunctionStatus.Builder builder = FunctionStatus.newBuilder();
builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceId());
- if (runtime.getDeathException() != null) {
- builder.setFailureException(runtime.getDeathException().getMessage());
+ if (runtimeDeathException != null) {
+ builder.setFailureException(runtimeDeathException.getMessage());
}
return builder.build();
});
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 136951e..a0d5fb8 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -80,14 +80,6 @@ class ThreadRuntime implements Runtime {
public void uncaughtException(Thread t, Throwable e) {
startupException = new Exception(e);
log.error("Error occured in java instance:", e);
- try {
- Thread.sleep(500);
- } catch (InterruptedException e1) {
- //ignore
- }
- // restart
- start();
-
}
});
this.fnThread.start();
@@ -117,13 +109,14 @@ class ThreadRuntime implements Runtime {
@Override
public CompletableFuture<FunctionStatus> getFunctionStatus() {
- FunctionStatus.Builder functionStatusBuilder = javaInstanceRunnable.getFunctionStatus();
- if (javaInstanceRunnable.getFailureException() != null) {
+ if (!isAlive()) {
+ FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
functionStatusBuilder.setRunning(false);
- functionStatusBuilder.setFailureException(javaInstanceRunnable.getFailureException().getMessage());
- } else {
- functionStatusBuilder.setRunning(true);
+ functionStatusBuilder.setFailureException(getDeathException().getMessage());
+ return CompletableFuture.completedFuture(functionStatusBuilder.build());
}
+ FunctionStatus.Builder functionStatusBuilder = javaInstanceRunnable.getFunctionStatus();
+ functionStatusBuilder.setRunning(true);
return CompletableFuture.completedFuture(functionStatusBuilder.build());
}
@@ -147,8 +140,6 @@ class ThreadRuntime implements Runtime {
return null;
} else if (null != startupException) {
return startupException;
- } else if (null != javaInstanceRunnable){
- return javaInstanceRunnable.getFailureException();
} else {
return null;
}
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.