You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/04/27 18:17:52 UTC

[incubator-pulsar] branch master updated: Better Runtime failure mgmt (#1671)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f307080  Better Runtime failure mgmt (#1671)
f307080 is described below

commit f307080e9558bf34edfffbecf9bdcdbe08fa7427
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Apr 27 11:17:50 2018 -0700

    Better Runtime failure mgmt (#1671)
    
    * Retooled management of Runtime failures
    
    * Removed extra code
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  2 +-
 .../functions/instance/JavaInstanceRunnable.java   |  7 +-
 .../pulsar/functions/runtime/JavaInstanceMain.java |  2 +-
 .../pulsar/functions/runtime/ProcessRuntime.java   | 74 ++++++++++------------
 .../pulsar/functions/runtime/RuntimeSpawner.java   | 12 ++--
 .../pulsar/functions/runtime/ThreadRuntime.java    | 21 ++----
 6 files changed, 50 insertions(+), 68 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 1369a69..92bd757 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -565,7 +565,7 @@ public class CmdFunctions extends CmdBase {
                             instanceConfig,
                             userCodeFile,
                             containerFactory,
-                            null);
+                            0);
                     spawners.add(runtimeSpawner);
                     runtimeSpawner.start();
                 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ce5220f..c16bc10 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -88,8 +88,6 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     @Getter(AccessLevel.PACKAGE)
     private Table<ByteBuf, ByteBuf> stateTable;
 
-    @Getter
-    private Exception failureException;
     private JavaInstance javaInstance;
     private AtomicBoolean running = new AtomicBoolean(true);
 
@@ -229,10 +227,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             }
         } catch (Exception ex) {
             log.error("Uncaught exception in Java Instance", ex);
-            if (running.get()) {
-                failureException = ex;
-                throw new RuntimeException(ex);
-            }
+            throw new RuntimeException(ex);
         } finally {
             close();
         }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 4b14dad..77d8e97 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -189,7 +189,7 @@ public class JavaInstanceMain {
                 instanceConfig,
                 jarFile,
                 containerFactory,
-                null);
+                0);
 
         server = ServerBuilder.forPort(port)
                 .addService(new InstanceControlImpl(runtimeSpawner))
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 6a46097..ba510cc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -54,7 +54,8 @@ class ProcessRuntime implements Runtime {
     @Getter
     private List<String> processArgs;
     private int instancePort;
-    private Exception startupException;
+    @Getter
+    private Exception deathException;
     private ManagedChannel channel;
     private InstanceControlGrpc.InstanceControlFutureStub stub;
 
@@ -227,8 +228,8 @@ class ProcessRuntime implements Runtime {
             public void onFailure(Throwable throwable) {
                 FunctionStatus.Builder builder = FunctionStatus.newBuilder();
                 builder.setRunning(false);
-                if (startupException != null) {
-                    builder.setFailureException(startupException.getMessage());
+                if (deathException != null) {
+                    builder.setFailureException(deathException.getMessage());
                 } else {
                     builder.setFailureException(throwable.getMessage());
                 }
@@ -280,19 +281,20 @@ class ProcessRuntime implements Runtime {
     }
 
     private void startProcess() {
-        startupException = null;
+        deathException = null;
         try {
             ProcessBuilder processBuilder = new ProcessBuilder(processArgs);
             log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command()));
             process = processBuilder.start();
         } catch (Exception ex) {
             log.error("Starting process failed", ex);
-            startupException = ex;
+            deathException = ex;
             return;
         }
         try {
             int exitValue = process.exitValue();
             log.error("Instance Process quit unexpectedly with return value " + exitValue);
+            tryExtractingDeathException();
         } catch (IllegalThreadStateException ex) {
             log.info("Started process successfully");
         }
@@ -300,49 +302,41 @@ class ProcessRuntime implements Runtime {
 
     @Override
     public boolean isAlive() {
-        return process != null && process.isAlive();
+        if (process == null) {
+            return false;
+        }
+        if (!process.isAlive()) {
+            if (deathException == null) {
+                tryExtractingDeathException();
+            }
+            return false;
+        }
+        FunctionStatus status;
+        try {
+            status = getFunctionStatus().get();
+        } catch (Exception ex) {
+            return false;
+        }
+        if (!status.getRunning()) {
+            if (status.getFailureException() != null && !status.getFailureException().isEmpty()) {
+                deathException = new Exception(status.getFailureException());
+            }
+            return false;
+        }
+        return true;
     }
 
-    @Override
-    public Exception getDeathException() {
-        if (isAlive()) return null;
-        if (startupException != null) return startupException;
+    private void tryExtractingDeathException() {
         InputStream errorStream = process.getErrorStream();
         try {
             byte[] errorBytes = new byte[errorStream.available()];
             errorStream.read(errorBytes);
             String errorMessage = new String(errorBytes);
-            startupException = new RuntimeException(errorMessage);
+            deathException = new RuntimeException(errorMessage);
+            log.error("Extracted Process death exception", deathException);
         } catch (Exception ex) {
-            startupException = ex;
+            deathException = ex;
+            log.error("Error extracting Process death exception", deathException);
         }
-        return startupException;
-    }
-
-    public static void main(String[] args) throws ExecutionException, InterruptedException {
-        int port = Integer.parseInt(args[0]);
-
-        ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", port)
-                .usePlaintext(true)
-                .build();
-        InstanceControlFutureStub stub = InstanceControlGrpc.newFutureStub(channel);
-        ListenableFuture<FunctionStatus> response = stub.getFunctionStatus(Empty.newBuilder().build());
-        CompletableFuture<FunctionStatus> future = new CompletableFuture<>();
-        Futures.addCallback(response, new FutureCallback<FunctionStatus>() {
-            @Override
-            public void onFailure(Throwable throwable) {
-                log.info("GetFunctionStatus:", throwable);
-                future.completeExceptionally(throwable);
-            }
-
-            @Override
-            public void onSuccess(InstanceCommunication.FunctionStatus t) {
-                log.info("GetFunctionStatus: {}", t);
-                future.complete(t);
-            }
-        });
-        FunctionStatus status = future.get();
-
-        log.info("Function Status : {}", status);
     }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 272ca6a..2b020df 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -43,12 +43,13 @@ public class RuntimeSpawner implements AutoCloseable {
     private Runtime runtime;
     private Timer processLivenessCheckTimer;
     private int numRestarts;
-    private Long instanceLivenessCheckFreqMs;
+    private long instanceLivenessCheckFreqMs;
+    private Exception runtimeDeathException;
 
 
     public RuntimeSpawner(InstanceConfig instanceConfig,
                           String codeFile,
-                          RuntimeFactory containerFactory, Long instanceLivenessCheckFreqMs) {
+                          RuntimeFactory containerFactory, long instanceLivenessCheckFreqMs) {
         this.instanceConfig = instanceConfig;
         this.runtimeFactory = containerFactory;
         this.codeFile = codeFile;
@@ -63,7 +64,7 @@ public class RuntimeSpawner implements AutoCloseable {
         runtime.start();
 
         // monitor function runtime to make sure it is running.  If not, restart the function runtime
-        if (instanceLivenessCheckFreqMs != null) {
+        if (instanceLivenessCheckFreqMs > 0) {
             processLivenessCheckTimer = new Timer();
             processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
                 @Override
@@ -71,6 +72,7 @@ public class RuntimeSpawner implements AutoCloseable {
                     if (!runtime.isAlive()) {
                         log.error("Function Container is dead with exception", runtime.getDeathException());
                         log.error("Restarting...");
+                        runtimeDeathException = runtime.getDeathException();
                         runtime.start();
                         numRestarts++;
                     }
@@ -89,8 +91,8 @@ public class RuntimeSpawner implements AutoCloseable {
         return runtime.getFunctionStatus().thenApply(f -> {
            FunctionStatus.Builder builder = FunctionStatus.newBuilder();
            builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceId());
-           if (runtime.getDeathException() != null) {
-               builder.setFailureException(runtime.getDeathException().getMessage());
+           if (runtimeDeathException != null) {
+               builder.setFailureException(runtimeDeathException.getMessage());
            }
            return builder.build();
         });
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 136951e..a0d5fb8 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -80,14 +80,6 @@ class ThreadRuntime implements Runtime {
             public void uncaughtException(Thread t, Throwable e) {
                 startupException = new Exception(e);
                 log.error("Error occured in java instance:", e);
-                try {
-                    Thread.sleep(500);
-                } catch (InterruptedException e1) {
-                    //ignore
-                }
-                // restart
-                start();
-
             }
         });
         this.fnThread.start();
@@ -117,13 +109,14 @@ class ThreadRuntime implements Runtime {
 
     @Override
     public CompletableFuture<FunctionStatus> getFunctionStatus() {
-        FunctionStatus.Builder functionStatusBuilder = javaInstanceRunnable.getFunctionStatus();
-        if (javaInstanceRunnable.getFailureException() != null) {
+        if (!isAlive()) {
+            FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
             functionStatusBuilder.setRunning(false);
-            functionStatusBuilder.setFailureException(javaInstanceRunnable.getFailureException().getMessage());
-        } else {
-            functionStatusBuilder.setRunning(true);
+            functionStatusBuilder.setFailureException(getDeathException().getMessage());
+            return CompletableFuture.completedFuture(functionStatusBuilder.build());
         }
+        FunctionStatus.Builder functionStatusBuilder = javaInstanceRunnable.getFunctionStatus();
+        functionStatusBuilder.setRunning(true);
         return CompletableFuture.completedFuture(functionStatusBuilder.build());
     }
 
@@ -147,8 +140,6 @@ class ThreadRuntime implements Runtime {
             return null;
         } else if (null != startupException) {
             return startupException;
-        } else if (null != javaInstanceRunnable){
-            return javaInstanceRunnable.getFailureException();
         } else {
             return null;
         }

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.