You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/09/21 05:27:25 UTC
[incubator-pulsar] branch master updated: Make use of workerconfig
defined health check interval (#2626)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aff681d Make use of workerconfig defined health check interval (#2626)
aff681d is described below
commit aff681d372a8a8616bb1314dfb6e7594d0e7984f
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Sep 20 22:27:20 2018 -0700
Make use of workerconfig defined health check interval (#2626)
---
.../instance/src/main/python/python_instance.py | 10 ++++---
.../src/main/python/python_instance_main.py | 5 +++-
.../pulsar/functions/runtime/JavaInstanceMain.java | 31 +++++++++++++---------
.../pulsar/functions/runtime/ProcessRuntime.java | 10 ++++---
.../functions/runtime/ProcessRuntimeFactory.java | 6 +++--
.../pulsar/functions/runtime/RuntimeFactory.java | 3 ++-
.../pulsar/functions/runtime/RuntimeSpawner.java | 3 ++-
.../functions/runtime/ThreadRuntimeFactory.java | 3 ++-
.../functions/runtime/ProcessRuntimeTest.java | 14 +++++-----
.../functions/worker/FunctionActionerTest.java | 2 +-
10 files changed, 54 insertions(+), 33 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index ddd546e..f8a14ff 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -114,7 +114,7 @@ class Stats(object):
class PythonInstance(object):
- def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, user_code, pulsar_client):
+ def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client):
self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
self.user_code = user_code
self.queue = Queue.Queue(max_buffered_tuples)
@@ -138,6 +138,7 @@ class PythonInstance(object):
self.stats = Stats()
self.last_health_check_ts = time.time()
self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None
+ self.expected_healthcheck_interval = expected_healthcheck_interval
def health_check(self):
self.last_health_check_ts = time.time()
@@ -146,12 +147,12 @@ class PythonInstance(object):
return health_check_result
def process_spawner_health_check_timer(self):
- if time.time() - self.last_health_check_ts > 90:
+ if time.time() - self.last_health_check_ts > self.expected_healthcheck_interval * 3:
Log.critical("Haven't received health check from spawner in a while. Stopping instance...")
os.kill(os.getpid(), signal.SIGKILL)
sys.exit(1)
- Timer(30, self.process_spawner_health_check_timer).start()
+ Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start()
def run(self):
# Setup consumers and input deserializers
@@ -214,7 +215,8 @@ class PythonInstance(object):
# start proccess spawner health check timer
self.last_health_check_ts = time.time()
- Timer(30, self.process_spawner_health_check_timer).start()
+ if self.expected_healthcheck_interval > 0:
+ Timer(self.expected_healthcheck_interval, self.process_spawner_health_check_timer).start()
def actual_execution(self):
Log.info("Started Thread for executing the function")
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index d9f1132..2f5c895 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -70,6 +70,7 @@ def main():
parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples')
parser.add_argument('--logging_directory', required=True, help='Logging Directory')
parser.add_argument('--logging_file', required=True, help='Log file name')
+ parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
args = parser.parse_args()
function_details = Function_pb2.FunctionDetails()
@@ -97,7 +98,9 @@ def main():
pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection)
pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
str(args.function_version), function_details,
- int(args.max_buffered_tuples), str(args.py), pulsar_client)
+ int(args.max_buffered_tuples),
+ int(args.expected_healthcheck_interval),
+ str(args.py), pulsar_client)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 083686b..38a4c28 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -92,6 +92,9 @@ public class JavaInstanceMain implements AutoCloseable {
@Parameter(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true)
protected int maxBufferedTuples;
+ @Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true)
+ protected int expectedHealthCheckInterval;
+
private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
@@ -124,7 +127,7 @@ public class JavaInstanceMain implements AutoCloseable {
instanceConfig,
jarFile,
containerFactory,
- 30000);
+ expectedHealthCheckInterval * 1000);
server = ServerBuilder.forPort(port)
.addService(new InstanceControlImpl(runtimeSpawner))
@@ -146,20 +149,22 @@ public class JavaInstanceMain implements AutoCloseable {
log.info("Starting runtimeSpawner");
runtimeSpawner.start();
- timer = Executors.newSingleThreadScheduledExecutor();
- timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- if (System.currentTimeMillis() - lastHealthCheckTs > 90000) {
- log.info("Haven't received health check from spawner in a while. Stopping instance...");
- close();
+ if (expectedHealthCheckInterval > 0) {
+ timer = Executors.newSingleThreadScheduledExecutor();
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ if (System.currentTimeMillis() - lastHealthCheckTs > 3 * expectedHealthCheckInterval * 1000) {
+ log.info("Haven't received health check from spawner in a while. Stopping instance...");
+ close();
+ }
+ } catch (Exception e) {
+ log.error("Error occurred when checking for latest health check", e);
}
- } catch (Exception e) {
- log.error("Error occurred when checking for latest health check", e);
}
- }
- }, 30000, 30000, TimeUnit.MILLISECONDS);
+ }, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval * 1000, TimeUnit.MILLISECONDS);
+ }
runtimeSpawner.join();
log.info("RuntimeSpawner quit, shutting down JavaInstance");
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index c5159b0..044f636 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -73,11 +73,12 @@ class ProcessRuntime implements Runtime {
String codeFile,
String pulsarServiceUrl,
String stateStorageServiceUrl,
- AuthenticationConfig authConfig) throws Exception {
+ AuthenticationConfig authConfig,
+ Long expectedHealthCheckInterval) throws Exception {
this.instanceConfig = instanceConfig;
this.instancePort = instanceConfig.getPort();
this.processArgs = composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
- authConfig);
+ authConfig, expectedHealthCheckInterval);
}
private List<String> composeArgs(InstanceConfig instanceConfig,
@@ -86,7 +87,8 @@ class ProcessRuntime implements Runtime {
String codeFile,
String pulsarServiceUrl,
String stateStorageServiceUrl,
- AuthenticationConfig authConfig) throws Exception {
+ AuthenticationConfig authConfig,
+ Long expectedHealthCheckInterval) throws Exception {
List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
args.add("java");
@@ -167,6 +169,8 @@ class ProcessRuntime implements Runtime {
args.add("--state_storage_serviceurl");
args.add(stateStorageServiceUrl);
}
+ args.add("--expected_healthcheck_interval");
+ args.add(String.valueOf(expectedHealthCheckInterval));
return args;
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index b16e88d..7b910bc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -91,7 +91,8 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
}
@Override
- public ProcessRuntime createContainer(InstanceConfig instanceConfig, String codeFile) throws Exception {
+ public ProcessRuntime createContainer(InstanceConfig instanceConfig, String codeFile,
+ Long expectedHealthCheckInterval) throws Exception {
String instanceFile;
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
@@ -110,7 +111,8 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
codeFile,
pulsarServiceUrl,
stateStorageServiceUrl,
- authConfig);
+ authConfig,
+ expectedHealthCheckInterval);
}
@Override
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 0fe1f9f..ef2ea9c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -33,7 +33,8 @@ public interface RuntimeFactory extends AutoCloseable {
* @return function container to start/stop instance
*/
Runtime createContainer(
- InstanceConfig instanceConfig, String codeFile) throws Exception;
+ InstanceConfig instanceConfig, String codeFile,
+ Long expectedHealthCheckInterval) throws Exception;
@Override
void close();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index aad7d4a..00c09ae 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -74,7 +74,8 @@ public class RuntimeSpawner implements AutoCloseable {
throw new IllegalArgumentException("topics-pattern is not supported for python function");
}
- runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile);
+ runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile,
+ instanceLivenessCheckFreqMs * 1000);
runtime.start();
// monitor function runtime to make sure it is running. If not, restart the function runtime
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index f9c9cff..84d21af 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -81,7 +81,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
}
@Override
- public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile) {
+ public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile,
+ Long expectedHealthCheckInterval) {
return new ThreadRuntime(
instanceConfig,
fnCache,
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 6f1e2f3..97881a4 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -114,9 +114,9 @@ public class ProcessRuntimeTest {
public void testJavaConstructor() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA);
- ProcessRuntime container = factory.createContainer(config, userJarFile);
+ ProcessRuntime container = factory.createContainer(config, userJarFile, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 26);
+ assertEquals(args.size(), 28);
String expectedArgs = "java -cp " + javaInstanceJarFile
+ " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+ " -Dlog4j.configurationFile=java_instance_log4j2.yml "
@@ -129,7 +129,8 @@ public class ProcessRuntimeTest {
+ " --function_details " + JsonFormat.printer().print(config.getFunctionDetails())
+ " --pulsar_serviceurl " + pulsarServiceUrl
+ " --max_buffered_tuples 1024 --port " + args.get(23)
- + " --state_storage_serviceurl " + stateStorageServiceUrl;
+ + " --state_storage_serviceurl " + stateStorageServiceUrl
+ + " --expected_healthcheck_interval 30";
assertEquals(String.join(" ", args), expectedArgs);
}
@@ -137,9 +138,9 @@ public class ProcessRuntimeTest {
public void testPythonConstructor() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON);
- ProcessRuntime container = factory.createContainer(config, userJarFile);
+ ProcessRuntime container = factory.createContainer(config, userJarFile, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 22);
+ assertEquals(args.size(), 24);
String expectedArgs = "python " + pythonInstanceFile
+ " --py " + userJarFile + " --logging_directory "
+ logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
@@ -147,7 +148,8 @@ public class ProcessRuntimeTest {
+ " --function_version " + config.getFunctionVersion()
+ " --function_details " + JsonFormat.printer().print(config.getFunctionDetails())
+ " --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(21);
+ + " --max_buffered_tuples 1024 --port " + args.get(21)
+ + " --expected_healthcheck_interval 30";
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index 5754477..05e3422 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -103,7 +103,7 @@ public class FunctionActionerTest {
RuntimeFactory factory = mock(RuntimeFactory.class);
Runtime runtime = mock(Runtime.class);
- doReturn(runtime).when(factory).createContainer(any(), any());
+ doReturn(runtime).when(factory).createContainer(any(), any(), any());
doNothing().when(runtime).start();
Namespace dlogNamespace = mock(Namespace.class);
final String exceptionMsg = "dl namespace not-found";