You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/04 01:47:15 UTC

[GitHub] jerrypeng closed pull request #3109: Consolidate timer threads in functions

jerrypeng closed pull request #3109: Consolidate timer threads in functions
URL: https://github.com/apache/pulsar/pull/3109
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
index f9a3c77c8f..9059f79b0f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionStatsManager.java
@@ -393,6 +393,9 @@ public String getStatsAsString() throws IOException {
 
     @Override
     public void close() {
-        scheduledFuture.cancel(false);
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            scheduledFuture = null;
+        }
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
index 937c27337b..fe7e049291 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceCache.java
@@ -18,17 +18,24 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.Getter;
+
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 
 public class InstanceCache {
 
     private static InstanceCache instance;
 
-    public final ScheduledExecutorService executor;
+    @Getter
+    private final ScheduledExecutorService scheduledExecutorService;
 
     private InstanceCache() {
-        executor = Executors.newSingleThreadScheduledExecutor();
+        ThreadFactory namedThreadFactory =
+                new DefaultThreadFactory("function-timer-thread");
+        scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(namedThreadFactory);
     }
 
     public static InstanceCache getInstanceCache() {
@@ -43,7 +50,7 @@ public static InstanceCache getInstanceCache() {
     public static void shutdown() {
         synchronized (InstanceCache.class) {
             if (instance != null) {
-                instance.executor.shutdown();
+                instance.scheduledExecutorService.shutdown();
             }
             instance = null;
         }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index d302e6a8ea..e55fabca91 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -216,7 +216,7 @@ public void run() {
             if (this.collectorRegistry == null) {
                 this.collectorRegistry = new CollectorRegistry();
             }
-            this.stats = new FunctionStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.executor);
+            this.stats = new FunctionStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.getScheduledExecutorService());
 
             ContextImpl contextImpl = setupContext();
             javaInstance = setupJavaInstance(contextImpl);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index f0be5119bc..3a0a404e69 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -34,6 +34,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -49,6 +50,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -122,8 +124,8 @@
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
     private Long lastHealthCheckTs = null;
-    private ScheduledExecutorService timer;
     private HTTPServer metricsServer;
+    private ScheduledFuture healthCheckTimer;
 
     public JavaInstanceMain() { }
 
@@ -215,18 +217,14 @@ public void run() {
         metricsServer = new HTTPServer(new InetSocketAddress(metrics_port), collectorRegistry, true);
 
         if (expectedHealthCheckInterval > 0) {
-            timer = Executors.newSingleThreadScheduledExecutor();
-            timer.scheduleAtFixedRate(new TimerTask() {
-                @Override
-                public void run() {
-                    try {
-                        if (System.currentTimeMillis() - lastHealthCheckTs > 3 * expectedHealthCheckInterval * 1000) {
-                            log.info("Haven't received health check from spawner in a while. Stopping instance...");
-                            close();
-                        }
-                    } catch (Exception e) {
-                        log.error("Error occurred when checking for latest health check", e);
+            healthCheckTimer = InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> {
+                try {
+                    if (System.currentTimeMillis() - lastHealthCheckTs > 3 * expectedHealthCheckInterval * 1000) {
+                        log.info("Haven't received health check from spawner in a while. Stopping instance...");
+                        close();
                     }
+                } catch (Exception e) {
+                    log.error("Error occurred when checking for latest health check", e);
                 }
             }, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval * 1000, TimeUnit.MILLISECONDS);
         }
@@ -260,8 +258,8 @@ public void close() {
             if (runtimeSpawner != null) {
                 runtimeSpawner.close();
             }
-            if (timer != null) {
-                timer.shutdown();
+            if (healthCheckTimer != null) {
+                healthCheckTimer.cancel(false);
             }
             if (containerFactory != null) {
                 containerFactory.close();
@@ -269,6 +267,8 @@ public void close() {
             if (metricsServer != null) {
                 metricsServer.stop();
             }
+
+            InstanceCache.shutdown();
         } catch (Exception ex) {
             System.err.println(ex);
         }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 4edac1784b..70e7a3a494 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -30,6 +30,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -45,6 +46,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -64,7 +66,7 @@
     private Throwable deathException;
     private ManagedChannel channel;
     private InstanceControlGrpc.InstanceControlFutureStub stub;
-    private ScheduledExecutorService timer;
+    private ScheduledFuture timer;
     private InstanceConfig instanceConfig;
     private final Long expectedHealthCheckInterval;
     private final SecretsProviderConfigurator secretsProviderConfigurator;
@@ -138,19 +140,14 @@ public void start() {
                     .build();
             stub = InstanceControlGrpc.newFutureStub(channel);
 
-            timer = Executors.newSingleThreadScheduledExecutor();
-            timer.scheduleAtFixedRate(new TimerTask() {
-
-                @Override
-                public void run() {
-                    CompletableFuture<InstanceCommunication.HealthCheckResult> result = healthCheck();
-                    try {
-                        result.get();
-                    } catch (Exception e) {
-                        log.error("Health check failed for {}-{}",
-                                instanceConfig.getFunctionDetails().getName(),
-                                instanceConfig.getInstanceId(), e);
-                    }
+            timer = InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> {
+                CompletableFuture<InstanceCommunication.HealthCheckResult> result = healthCheck();
+                try {
+                    result.get();
+                } catch (Exception e) {
+                    log.error("Health check failed for {}-{}",
+                            instanceConfig.getFunctionDetails().getName(),
+                            instanceConfig.getInstanceId(), e);
                 }
             }, expectedHealthCheckInterval, expectedHealthCheckInterval, TimeUnit.SECONDS);
         }
@@ -164,7 +161,7 @@ public void join() throws Exception {
     @Override
     public void stop() {
         if (timer != null) {
-            timer.shutdown();
+            timer.cancel(false);
         }
         if (process != null) {
             process.destroyForcibly();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 6b5abce269..aa1784ab5a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -27,12 +27,15 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -50,7 +53,7 @@
 
     @Getter
     private Runtime runtime;
-    private Timer processLivenessCheckTimer;
+    private ScheduledFuture processLivenessCheckTimer;
     private int numRestarts;
     private long instanceLivenessCheckFreqMs;
     private Throwable runtimeDeathException;
@@ -79,27 +82,23 @@ public void start() throws Exception {
 
         // monitor function runtime to make sure it is running.  If not, restart the function runtime
         if (!runtimeFactory.externallyManaged() && instanceLivenessCheckFreqMs > 0) {
-            processLivenessCheckTimer = new Timer();
-            processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
-                @Override
-                public void run() {
-                    Runtime runtime = RuntimeSpawner.this.runtime;
-                    if (runtime != null && !runtime.isAlive()) {
-                        log.error("{}/{}/{}-{} Function Container is dead with exception.. restarting", details.getTenant(),
-                                details.getNamespace(), details.getName(), runtime.getDeathException());
-                        // Just for the sake of sanity, just destroy the runtime
-                        try {
-                            runtime.stop();
-                            runtimeDeathException = runtime.getDeathException();
-                            runtime.start();
-                        } catch (Exception e) {
-                            log.error("{}/{}/{}-{} Function Restart failed", details.getTenant(),
-                                    details.getNamespace(), details.getName(), e, e);
-                        }
-                        numRestarts++;
+            processLivenessCheckTimer = InstanceCache.getInstanceCache().getScheduledExecutorService().scheduleAtFixedRate(() -> {
+                Runtime runtime = RuntimeSpawner.this.runtime;
+                if (runtime != null && !runtime.isAlive()) {
+                    log.error("{}/{}/{}-{} Function Container is dead with exception.. restarting", details.getTenant(),
+                            details.getNamespace(), details.getName(), runtime.getDeathException());
+                    // Just for the sake of sanity, just destroy the runtime
+                    try {
+                        runtime.stop();
+                        runtimeDeathException = runtime.getDeathException();
+                        runtime.start();
+                    } catch (Exception e) {
+                        log.error("{}/{}/{}-{} Function Restart failed", details.getTenant(),
+                                details.getNamespace(), details.getName(), e, e);
                     }
+                    numRestarts++;
                 }
-            }, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs);
+            }, instanceLivenessCheckFreqMs, instanceLivenessCheckFreqMs, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -139,7 +138,7 @@ public void join() throws Exception {
     public void close() {
         // cancel liveness checker before stopping runtime.
         if (processLivenessCheckTimer != null) {
-            processLivenessCheckTimer.cancel();
+            processLivenessCheckTimer.cancel(false);
             processLivenessCheckTimer = null;
         }
         if (null != runtime) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services