You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/23 18:09:57 UTC

[GitHub] sijie closed pull request #1833: adding worker to instance health check

sijie closed pull request #1833: adding worker to instance health check
URL: https://github.com/apache/incubator-pulsar/pull/1833
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 30103c7360..eedbbea49c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1126,7 +1126,7 @@ protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.F
                         instanceConfig,
                         userCodeFile,
                         containerFactory,
-                        0);
+                        30000);
                 spawners.add(runtimeSpawner);
                 runtimeSpawner.start();
             }
diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
index 0b1d45e0cf..b4e814dd54 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -18,8 +17,6 @@
 # under the License.
 #
 
-# -*- encoding: utf-8 -*-
-
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: InstanceCommunication.proto
 
@@ -42,7 +39,7 @@
   name='InstanceCommunication.proto',
   package='proto',
   syntax='proto3',
-  serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"G\n\x12\x46unctionStatusList\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xd2\x01\n\x0bMetricsData\x12\x30\n\x07metrics\x18\x01 \x03(\x0b\x32\x1f.proto.MetricsData.MetricsEntry\x1a\x42\n\nDataDigest\x12\r\n\x05\x63ount\x18\x01 \x01(\x01\x12\x0b\n\x03sum\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04 \x01(\x01\x1aM\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.proto.MetricsData.DataDigest:\x02\x38\x01\x32\x9b\x01\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
+  serialized_pb=_b('\n\x1bInstanceCommunication.proto\x12\x05proto\x1a\x1bgoogle/protobuf/empty.proto\"\xa1\x05\n\x0e\x46unctionStatus\x12\x0f\n\x07running\x18\x01 \x01(\x08\x12\x18\n\x10\x66\x61ilureException\x18\x02 \x01(\t\x12\x13\n\x0bnumRestarts\x18\x03 \x01(\x03\x12\x14\n\x0cnumProcessed\x18\x04 \x01(\x03\x12 \n\x18numSuccessfullyProcessed\x18\x05 \x01(\x03\x12\x19\n\x11numUserExceptions\x18\x06 \x01(\x03\x12H\n\x14latestUserExceptions\x18\x07 \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12\x1b\n\x13numSystemExceptions\x18\x08 \x01(\x03\x12J\n\x16latestSystemExceptions\x18\t \x03(\x0b\x32*.proto.FunctionStatus.ExceptionInformation\x12W\n\x19\x64\x65serializationExceptions\x18\n \x03(\x0b\x32\x34.proto.FunctionStatus.DeserializationExceptionsEntry\x12\x1f\n\x17serializationExceptions\x18\x0b \x01(\x03\x12\x16\n\x0e\x61verageLatency\x18\x0c \x01(\x01\x12\x1a\n\x12lastInvocationTime\x18\r \x01(\x03\x12\x12\n\ninstanceId\x18\x0e \x01(\t\x1a\x45\n\x14\x45xceptionInformation\x12\x17\n\x0f\x65xceptionString\x18\x01 \x01(\t\x12\x14\n\x0cmsSinceEpoch\x18\x02 \x01(\x03\x1a@\n\x1e\x44\x65serializationExceptionsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\x03:\x02\x38\x01\"G\n\x12\x46unctionStatusList\x12\x31\n\x12\x66unctionStatusList\x18\x01 \x03(\x0b\x32\x15.proto.FunctionStatus\"\xd2\x01\n\x0bMetricsData\x12\x30\n\x07metrics\x18\x01 \x03(\x0b\x32\x1f.proto.MetricsData.MetricsEntry\x1a\x42\n\nDataDigest\x12\r\n\x05\x63ount\x18\x01 \x01(\x01\x12\x0b\n\x03sum\x18\x02 \x01(\x01\x12\x0b\n\x03max\x18\x03 \x01(\x01\x12\x0b\n\x03min\x18\x04 \x01(\x01\x1aM\n\x0cMetricsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.proto.MetricsData.DataDigest:\x02\x38\x01\"$\n\x11HealthCheckResult\x12\x0f\n\x07success\x18\x01 \x01(\x08\x32\xde\x01\n\x0fInstanceControl\x12\x44\n\x11GetFunctionStatus\x12\x16.google.protobuf.Empty\x1a\x15.proto.FunctionStatus\"\x00\x12\x42\n\x12GetAndResetMetrics\x12\x16.google.protobuf.Empty\x1a\x12.proto.MetricsData\"\x00\x12\x41\n\x0bHealthCheck\x12\x16.google.protobuf.Empty\x1a\x18.proto.HealthCheckResult\"\x00\x42:\n!org.apache.pulsar.functions.protoB\x15InstanceCommunicationb\x06proto3')
   ,
   dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
 
@@ -394,6 +391,37 @@
   serialized_end=1027,
 )
 
+
+_HEALTHCHECKRESULT = _descriptor.Descriptor(
+  name='HealthCheckResult',
+  full_name='proto.HealthCheckResult',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='success', full_name='proto.HealthCheckResult.success', index=0,
+      number=1, type=8, cpp_type=7, label=1,
+      has_default_value=False, default_value=False,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=1029,
+  serialized_end=1065,
+)
+
 _FUNCTIONSTATUS_EXCEPTIONINFORMATION.containing_type = _FUNCTIONSTATUS
 _FUNCTIONSTATUS_DESERIALIZATIONEXCEPTIONSENTRY.containing_type = _FUNCTIONSTATUS
 _FUNCTIONSTATUS.fields_by_name['latestUserExceptions'].message_type = _FUNCTIONSTATUS_EXCEPTIONINFORMATION
@@ -407,6 +435,7 @@
 DESCRIPTOR.message_types_by_name['FunctionStatus'] = _FUNCTIONSTATUS
 DESCRIPTOR.message_types_by_name['FunctionStatusList'] = _FUNCTIONSTATUSLIST
 DESCRIPTOR.message_types_by_name['MetricsData'] = _METRICSDATA
+DESCRIPTOR.message_types_by_name['HealthCheckResult'] = _HEALTHCHECKRESULT
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
 
 FunctionStatus = _reflection.GeneratedProtocolMessageType('FunctionStatus', (_message.Message,), dict(
@@ -462,6 +491,13 @@
 _sym_db.RegisterMessage(MetricsData.DataDigest)
 _sym_db.RegisterMessage(MetricsData.MetricsEntry)
 
+HealthCheckResult = _reflection.GeneratedProtocolMessageType('HealthCheckResult', (_message.Message,), dict(
+  DESCRIPTOR = _HEALTHCHECKRESULT,
+  __module__ = 'InstanceCommunication_pb2'
+  # @@protoc_insertion_point(class_scope:proto.HealthCheckResult)
+  ))
+_sym_db.RegisterMessage(HealthCheckResult)
+
 
 DESCRIPTOR.has_options = True
 DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\025InstanceCommunication'))
@@ -476,8 +512,8 @@
   file=DESCRIPTOR,
   index=0,
   options=None,
-  serialized_start=1030,
-  serialized_end=1185,
+  serialized_start=1068,
+  serialized_end=1290,
   methods=[
   _descriptor.MethodDescriptor(
     name='GetFunctionStatus',
@@ -497,6 +533,15 @@
     output_type=_METRICSDATA,
     options=None,
   ),
+  _descriptor.MethodDescriptor(
+    name='HealthCheck',
+    full_name='proto.InstanceControl.HealthCheck',
+    index=2,
+    containing_service=None,
+    input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
+    output_type=_HEALTHCHECKRESULT,
+    options=None,
+  ),
 ])
 _sym_db.RegisterServiceDescriptor(_INSTANCECONTROL)
 
diff --git a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
index b37ee197bb..575f994641 100644
--- a/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
+++ b/pulsar-functions/instance/src/main/python/InstanceCommunication_pb2_grpc.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -18,8 +17,6 @@
 # under the License.
 #
 
-# -*- encoding: utf-8 -*-
-
 # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
 import grpc
 
@@ -47,6 +44,11 @@ def __init__(self, channel):
         request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
         response_deserializer=InstanceCommunication__pb2.MetricsData.FromString,
         )
+    self.HealthCheck = channel.unary_unary(
+        '/proto.InstanceControl/HealthCheck',
+        request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+        response_deserializer=InstanceCommunication__pb2.HealthCheckResult.FromString,
+        )
 
 
 class InstanceControlServicer(object):
@@ -67,6 +69,13 @@ def GetAndResetMetrics(self, request, context):
     context.set_details('Method not implemented!')
     raise NotImplementedError('Method not implemented!')
 
+  def HealthCheck(self, request, context):
+    # missing associated documentation comment in .proto file
+    pass
+    context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+    context.set_details('Method not implemented!')
+    raise NotImplementedError('Method not implemented!')
+
 
 def add_InstanceControlServicer_to_server(servicer, server):
   rpc_method_handlers = {
@@ -80,6 +89,11 @@ def add_InstanceControlServicer_to_server(servicer, server):
           request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
           response_serializer=InstanceCommunication__pb2.MetricsData.SerializeToString,
       ),
+      'HealthCheck': grpc.unary_unary_rpc_method_handler(
+          servicer.HealthCheck,
+          request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+          response_serializer=InstanceCommunication__pb2.HealthCheckResult.SerializeToString,
+      ),
   }
   generic_handler = grpc.method_handlers_generic_handler(
       'proto.InstanceControl', rpc_method_handlers)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 57bb292b2b..6a82b0ad19 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -24,12 +24,15 @@
 """
 import base64
 import os
+import signal
 import time
 import Queue
 import threading
 from functools import partial
 from collections import namedtuple
+from threading import Timer
 import traceback
+import sys
 
 import pulsar
 import contextimpl
@@ -116,6 +119,21 @@ def __init__(self, instance_id, function_id, function_version, function_details,
     self.contextimpl = None
     self.total_stats = Stats()
     self.current_stats = Stats()
+    self.last_health_check_ts = time.time()
+
+  def health_check(self):
+    self.last_health_check_ts = time.time()
+    health_check_result = InstanceCommunication_pb2.HealthCheckResult()
+    health_check_result.success = True
+    return health_check_result
+
+  def process_spawner_health_check_timer(self):
+    if time.time() - self.last_health_check_ts > 90:
+      Log.critical("Haven't received health check from spawner in a while. Stopping instance...")
+      os.kill(os.getpid(), signal.SIGTERM)
+      sys.exit(1)
+
+    Timer(30, self.process_spawner_health_check_timer).start()
 
   def run(self):
     # Setup consumers and input deserializers
@@ -153,6 +171,10 @@ def run(self):
     self.exeuction_thread = threading.Thread(target=self.actual_execution)
     self.exeuction_thread.start()
 
+    # start proccess spawner health check timer
+    self.last_health_check_ts = time.time()
+    Timer(30, self.process_spawner_health_check_timer).start()
+
   def actual_execution(self):
     Log.info("Started Thread for executing the function")
     while True:
diff --git a/pulsar-functions/instance/src/main/python/server.py b/pulsar-functions/instance/src/main/python/server.py
index 3fe7bd44ac..193e6ab928 100644
--- a/pulsar-functions/instance/src/main/python/server.py
+++ b/pulsar-functions/instance/src/main/python/server.py
@@ -42,6 +42,9 @@ def GetAndResetMetrics(self, request, context):
     Log.info("Came in GetAndResetMetrics")
     return self.pyinstance.get_and_reset_metrics()
 
+  def HealthCheck(self, request, context):
+    return self.pyinstance.health_check()
+
 
 def serve(port, pyinstance):
   server_instance = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 3ac0924d4c..8db742b641 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -65,7 +65,12 @@ message MetricsData {
     map<string, DataDigest> metrics = 1;
 }
 
+message HealthCheckResult {
+    bool success = 1;
+}
+
 service InstanceControl {
     rpc GetFunctionStatus(google.protobuf.Empty) returns (FunctionStatus) {}
     rpc GetAndResetMetrics(google.protobuf.Empty) returns (MetricsData) {}
+    rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResult) {}
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 4e23038325..c01f6bbb2c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -39,13 +39,17 @@
 
 import java.lang.reflect.Type;
 import java.util.Map;
+import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A function container implemented using java thread.
  */
 @Slf4j
-public class JavaInstanceMain {
+public class JavaInstanceMain implements AutoCloseable {
     @Parameter(names = "--function_classname", description = "Function Class Name\n", required = true)
     protected String className;
     @Parameter(
@@ -123,6 +127,9 @@
     protected String sinkSerdeClassName;
 
     private Server server;
+    private RuntimeSpawner runtimeSpawner;
+    private Long lastHealthCheckTs = null;
+    private ScheduledExecutorService timer;
 
     public JavaInstanceMain() { }
 
@@ -190,8 +197,7 @@ public void start() throws Exception {
                 "LocalRunnerThreadGroup",
                 pulsarServiceUrl,
                 stateStorageServiceUrl);
-
-        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
+        runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
                 containerFactory,
@@ -216,9 +222,25 @@ public void run() {
         });
         log.info("Starting runtimeSpawner");
         runtimeSpawner.start();
+
+        timer = Executors.newSingleThreadScheduledExecutor();
+        timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    if (System.currentTimeMillis() - lastHealthCheckTs > 90000) {
+                        log.info("Haven't received health check from spawner in a while. Stopping instance...");
+                        close();
+                    }
+                } catch (Exception e) {
+                    log.error("Error occurred when checking for latest health check", e);
+                }
+            }
+        }, 30000, 30000, TimeUnit.MILLISECONDS);
+
         runtimeSpawner.join();
         log.info("RuntimeSpawner quit, shutting down JavaInstance");
-        server.shutdown();
+        close();
     }
 
     public static void main(String[] args) throws Exception {
@@ -231,11 +253,31 @@ public static void main(String[] args) throws Exception {
         javaInstanceMain.start();
     }
 
-    static class InstanceControlImpl extends InstanceControlGrpc.InstanceControlImplBase {
+    @Override
+    public void close() {
+        try {
+            // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+            if (server != null) {
+                server.shutdown();
+            }
+            if (runtimeSpawner != null) {
+                runtimeSpawner.close();
+            }
+            if (timer != null) {
+                timer.shutdown();
+            }
+        } catch (Exception ex) {
+            System.err.println(ex);
+        }
+    }
+
+
+    class InstanceControlImpl extends InstanceControlGrpc.InstanceControlImplBase {
         private RuntimeSpawner runtimeSpawner;
 
         public InstanceControlImpl(RuntimeSpawner runtimeSpawner) {
             this.runtimeSpawner = runtimeSpawner;
+            lastHealthCheckTs = System.currentTimeMillis();
         }
 
         @Override
@@ -266,5 +308,16 @@ public void getAndResetMetrics(com.google.protobuf.Empty request,
             }
         }
 
+        @Override
+        public void healthCheck(com.google.protobuf.Empty request,
+                                io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult> responseObserver) {
+            log.debug("Recieved health check request...");
+            InstanceCommunication.HealthCheckResult healthCheckResult
+                    = InstanceCommunication.HealthCheckResult.newBuilder().setSuccess(true).build();
+            responseObserver.onNext(healthCheckResult);
+            responseObserver.onCompleted();
+
+            lastHealthCheckTs = System.currentTimeMillis();
+        }
     }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 5b77eb277c..977b9b1c56 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -26,7 +26,6 @@
 import com.google.protobuf.Empty;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-import java.util.concurrent.ExecutionException;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -35,12 +34,12 @@
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 
-import java.io.IOException;
 import java.io.InputStream;
-import java.net.ServerSocket;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.functions.proto.InstanceControlGrpc.InstanceControlFutureStub;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A function container implemented using java thread.
@@ -58,12 +57,15 @@
     private Exception deathException;
     private ManagedChannel channel;
     private InstanceControlGrpc.InstanceControlFutureStub stub;
+    private ScheduledExecutorService timer;
+    private InstanceConfig instanceConfig;
 
     ProcessRuntime(InstanceConfig instanceConfig,
                    String instanceFile,
                    String logDirectory,
                    String codeFile,
                    String pulsarServiceUrl) {
+        this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
         this.processArgs = composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl);
     }
@@ -205,6 +207,21 @@ public void start() {
                     .usePlaintext(true)
                     .build();
             stub = InstanceControlGrpc.newFutureStub(channel);
+
+            timer = Executors.newSingleThreadScheduledExecutor();
+            timer.scheduleAtFixedRate(new TimerTask() {
+
+                @Override
+                public void run() {
+                    CompletableFuture<InstanceCommunication.HealthCheckResult> result = healthCheck();
+                    try {
+                    } catch (Exception e) {
+                        log.error("Health check failed for {}-{}",
+                                instanceConfig.getFunctionDetails().getName(),
+                                instanceConfig.getInstanceId(), e);
+                    }
+                }
+            }, 30000, 30000, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -215,8 +232,15 @@ public void join() throws Exception {
 
     @Override
     public void stop() {
-        process.destroy();
-        channel.shutdown();
+        if (timer != null) {
+            timer.shutdown();
+        }
+        if (process != null) {
+            process.destroy();
+        }
+        if (channel != null) {
+            channel.shutdown();
+        }
         channel = null;
         stub = null;
     }
@@ -272,6 +296,27 @@ public void onSuccess(InstanceCommunication.MetricsData t) {
         return retval;
     }
 
+    public CompletableFuture<InstanceCommunication.HealthCheckResult> healthCheck() {
+        CompletableFuture<InstanceCommunication.HealthCheckResult> retval = new CompletableFuture<>();
+        if (stub == null) {
+            retval.completeExceptionally(new RuntimeException("Not alive"));
+            return retval;
+        }
+        ListenableFuture<InstanceCommunication.HealthCheckResult> response = stub.healthCheck(Empty.newBuilder().build());
+        Futures.addCallback(response, new FutureCallback<InstanceCommunication.HealthCheckResult>() {
+            @Override
+            public void onFailure(Throwable throwable) {
+                retval.completeExceptionally(throwable);
+            }
+
+            @Override
+            public void onSuccess(InstanceCommunication.HealthCheckResult t) {
+                retval.complete(t);
+            }
+        });
+        return retval;
+    }
+
     private void startProcess() {
         deathException = null;
         try {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index 29a8e8142e..18421f45d8 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -41,5 +41,5 @@
     CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus();
 
     CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics();
-
+    
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 0994adcc73..60b41def06 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -27,10 +27,12 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.utils.Utils;
 
@@ -118,6 +120,9 @@ public void close() {
             runtime.stop();
             runtime = null;
         }
+        if (runtimeFactory != null) {
+            runtimeFactory.close();
+        }
         if (processLivenessCheckTimer != null) {
             processLivenessCheckTimer.cancel();
             processLivenessCheckTimer = null;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services