You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/09/21 05:27:25 UTC

[incubator-pulsar] branch master updated: Make use of workerconfig defined health check interval (#2626)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aff681d  Make use of workerconfig defined health check interval (#2626)
aff681d is described below

commit aff681d372a8a8616bb1314dfb6e7594d0e7984f
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Sep 20 22:27:20 2018 -0700

    Make use of workerconfig defined health check interval (#2626)
---
 .../instance/src/main/python/python_instance.py    | 10 ++++---
 .../src/main/python/python_instance_main.py        |  5 +++-
 .../pulsar/functions/runtime/JavaInstanceMain.java | 31 +++++++++++++---------
 .../pulsar/functions/runtime/ProcessRuntime.java   | 10 ++++---
 .../functions/runtime/ProcessRuntimeFactory.java   |  6 +++--
 .../pulsar/functions/runtime/RuntimeFactory.java   |  3 ++-
 .../pulsar/functions/runtime/RuntimeSpawner.java   |  3 ++-
 .../functions/runtime/ThreadRuntimeFactory.java    |  3 ++-
 .../functions/runtime/ProcessRuntimeTest.java      | 14 +++++-----
 .../functions/worker/FunctionActionerTest.java     |  2 +-
 10 files changed, 54 insertions(+), 33 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index ddd546e..f8a14ff 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -114,7 +114,7 @@ class Stats(object):
     
 
 class PythonInstance(object):
-  def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, user_code, pulsar_client):
+  def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client):
     self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
     self.user_code = user_code
     self.queue = Queue.Queue(max_buffered_tuples)
@@ -138,6 +138,7 @@ class PythonInstance(object):
     self.stats = Stats()
     self.last_health_check_ts = time.time()
     self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None
+    self.expected_healthcheck_interval = expected_healthcheck_interval
 
   def health_check(self):
     self.last_health_check_ts = time.time()
@@ -146,12 +147,12 @@ class PythonInstance(object):
     return health_check_result
 
   def process_spawner_health_check_timer(self):
-    if time.time() - self.last_health_check_ts > 90:
+    if time.time() - self.last_health_check_ts > self.expected_healthcheck_interval * 3:
       Log.critical("Haven't received health check from spawner in a while. Stopping instance...")
       os.kill(os.getpid(), signal.SIGKILL)
       sys.exit(1)
 
-    Timer(30, self.process_spawner_health_check_timer).start()
+    Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start()
 
   def run(self):
     # Setup consumers and input deserializers
@@ -214,7 +215,8 @@ class PythonInstance(object):
 
     # start proccess spawner health check timer
     self.last_health_check_ts = time.time()
-    Timer(30, self.process_spawner_health_check_timer).start()
+    if self.expected_healthcheck_interval > 0:
+      Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start()
 
   def actual_execution(self):
     Log.info("Started Thread for executing the function")
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index d9f1132..2f5c895 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -70,6 +70,7 @@ def main():
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples')
   parser.add_argument('--logging_directory', required=True, help='Logging Directory')
   parser.add_argument('--logging_file', required=True, help='Log file name')
+  parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
 
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
@@ -97,7 +98,9 @@ def main():
   pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection)
   pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
                                               str(args.function_version), function_details,
-                                              int(args.max_buffered_tuples), str(args.py), pulsar_client)
+                                              int(args.max_buffered_tuples),
+                                              int(args.expected_healthcheck_interval),
+                                              str(args.py), pulsar_client)
   pyinstance.run()
   server_instance = server.serve(args.port, pyinstance)
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 083686b..38a4c28 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -92,6 +92,9 @@ public class JavaInstanceMain implements AutoCloseable {
     @Parameter(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true)
     protected int maxBufferedTuples;
 
+    @Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true)
+    protected int expectedHealthCheckInterval;
+
     private Server server;
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
@@ -124,7 +127,7 @@ public class JavaInstanceMain implements AutoCloseable {
                 instanceConfig,
                 jarFile,
                 containerFactory,
-                30000);
+                expectedHealthCheckInterval * 1000);
 
         server = ServerBuilder.forPort(port)
                 .addService(new InstanceControlImpl(runtimeSpawner))
@@ -146,20 +149,22 @@ public class JavaInstanceMain implements AutoCloseable {
         log.info("Starting runtimeSpawner");
         runtimeSpawner.start();
 
-        timer = Executors.newSingleThreadScheduledExecutor();
-        timer.scheduleAtFixedRate(new TimerTask() {
-            @Override
-            public void run() {
-                try {
-                    if (System.currentTimeMillis() - lastHealthCheckTs > 90000) {
-                        log.info("Haven't received health check from spawner in a while. Stopping instance...");
-                        close();
+        if (expectedHealthCheckInterval > 0) {
+            timer = Executors.newSingleThreadScheduledExecutor();
+            timer.scheduleAtFixedRate(new TimerTask() {
+                @Override
+                public void run() {
+                    try {
+                        if (System.currentTimeMillis() - lastHealthCheckTs > 3 * expectedHealthCheckInterval * 1000) {
+                            log.info("Haven't received health check from spawner in a while. Stopping instance...");
+                            close();
+                        }
+                    } catch (Exception e) {
+                        log.error("Error occurred when checking for latest health check", e);
                     }
-                } catch (Exception e) {
-                    log.error("Error occurred when checking for latest health check", e);
                 }
-            }
-        }, 30000, 30000, TimeUnit.MILLISECONDS);
+            }, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval * 1000, TimeUnit.MILLISECONDS);
+        }
 
         runtimeSpawner.join();
         log.info("RuntimeSpawner quit, shutting down JavaInstance");
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index c5159b0..044f636 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -73,11 +73,12 @@ class ProcessRuntime implements Runtime {
                    String codeFile,
                    String pulsarServiceUrl,
                    String stateStorageServiceUrl,
-                   AuthenticationConfig authConfig) throws Exception {
+                   AuthenticationConfig authConfig,
+                   Long expectedHealthCheckInterval) throws Exception {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
         this.processArgs = composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
-                authConfig);
+                authConfig, expectedHealthCheckInterval);
     }
 
     private List<String> composeArgs(InstanceConfig instanceConfig,
@@ -86,7 +87,8 @@ class ProcessRuntime implements Runtime {
                                      String codeFile,
                                      String pulsarServiceUrl,
                                      String stateStorageServiceUrl,
-                                     AuthenticationConfig authConfig) throws Exception {
+                                     AuthenticationConfig authConfig,
+                                     Long expectedHealthCheckInterval) throws Exception {
         List<String> args = new LinkedList<>();
         if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
             args.add("java");
@@ -167,6 +169,8 @@ class ProcessRuntime implements Runtime {
             args.add("--state_storage_serviceurl");
             args.add(stateStorageServiceUrl);
         }
+        args.add("--expected_healthcheck_interval");
+        args.add(String.valueOf(expectedHealthCheckInterval));
         return args;
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index b16e88d..7b910bc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -91,7 +91,8 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
     }
 
     @Override
-    public ProcessRuntime createContainer(InstanceConfig instanceConfig, String codeFile) throws Exception {
+    public ProcessRuntime createContainer(InstanceConfig instanceConfig, String codeFile,
+                                          Long expectedHealthCheckInterval) throws Exception {
         String instanceFile;
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
@@ -110,7 +111,8 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
             codeFile,
             pulsarServiceUrl,
             stateStorageServiceUrl,
-            authConfig);
+            authConfig,
+            expectedHealthCheckInterval);
     }
 
     @Override
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 0fe1f9f..ef2ea9c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -33,7 +33,8 @@ public interface RuntimeFactory extends AutoCloseable {
      * @return function container to start/stop instance
      */
     Runtime createContainer(
-            InstanceConfig instanceConfig, String codeFile) throws Exception;
+            InstanceConfig instanceConfig, String codeFile,
+            Long expectedHealthCheckInterval) throws Exception;
 
     @Override
     void close();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index aad7d4a..00c09ae 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -74,7 +74,8 @@ public class RuntimeSpawner implements AutoCloseable {
             throw new IllegalArgumentException("topics-pattern is not supported for python function");
         }
 
-        runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile);
+        runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile,
+                instanceLivenessCheckFreqMs * 1000);
         runtime.start();
 
         // monitor function runtime to make sure it is running.  If not, restart the function runtime
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index f9c9cff..84d21af 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -81,7 +81,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     }
     
     @Override
-    public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile) {
+    public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile,
+                                         Long expectedHealthCheckInterval) {
         return new ThreadRuntime(
             instanceConfig,
             fnCache,
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 6f1e2f3..97881a4 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -114,9 +114,9 @@ public class ProcessRuntimeTest {
     public void testJavaConstructor() throws Exception {
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA);
 
-        ProcessRuntime container = factory.createContainer(config, userJarFile);
+        ProcessRuntime container = factory.createContainer(config, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 26);
+        assertEquals(args.size(), 28);
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
@@ -129,7 +129,8 @@ public class ProcessRuntimeTest {
                 + " --function_details " + JsonFormat.printer().print(config.getFunctionDetails())
                 + " --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
-                + " --state_storage_serviceurl " + stateStorageServiceUrl;
+                + " --state_storage_serviceurl " + stateStorageServiceUrl
+                + " --expected_healthcheck_interval 30";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -137,9 +138,9 @@ public class ProcessRuntimeTest {
     public void testPythonConstructor() throws Exception {
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON);
 
-        ProcessRuntime container = factory.createContainer(config, userJarFile);
+        ProcessRuntime container = factory.createContainer(config, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 22);
+        assertEquals(args.size(), 24);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
                 + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
@@ -147,7 +148,8 @@ public class ProcessRuntimeTest {
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details " + JsonFormat.printer().print(config.getFunctionDetails())
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(21);
+                + " --max_buffered_tuples 1024 --port " + args.get(21)
+                + " --expected_healthcheck_interval 30";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index 5754477..05e3422 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -103,7 +103,7 @@ public class FunctionActionerTest {
 
         RuntimeFactory factory = mock(RuntimeFactory.class);
         Runtime runtime = mock(Runtime.class);
-        doReturn(runtime).when(factory).createContainer(any(), any());
+        doReturn(runtime).when(factory).createContainer(any(), any(), any());
         doNothing().when(runtime).start();
         Namespace dlogNamespace = mock(Namespace.class);
         final String exceptionMsg = "dl namespace not-found";