You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/08 00:31:44 UTC
[incubator-pulsar] branch master updated: Print periodic status of
instances in localrun (#1734)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cb648d2 Print periodic status of instances in localrun (#1734)
cb648d2 is described below
commit cb648d2e4705e19ba8bc607214fe99d0de23c35e
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon May 7 17:31:41 2018 -0700
Print periodic status of instances in localrun (#1734)
* Log status of running instances periodically
* Fix shading conflicts
* Add shutdown hook to cancel the timer
---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 37 ++++++++++++++++++----
.../pulsar/functions/runtime/RuntimeSpawner.java | 12 +++++++
2 files changed, 42 insertions(+), 7 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index c0d8858..3e6201e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -29,13 +29,10 @@ import java.io.IOException;
import java.io.ObjectOutputStream;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -1014,6 +1011,32 @@ public class CmdFunctions extends CmdBase {
}
}
});
+ Timer statusCheckTimer = new Timer();
+ statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
+ int index = 0;
+ for (RuntimeSpawner spawner : spawners) {
+ futures[index++] = spawner.getFunctionStatusAsJson();
+ }
+ try {
+ CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
+ for (index = 0; index < futures.length; ++index) {
+ String json = futures[index].get();
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ log.info(gson.toJson(new JsonParser().parse(json)));
+ }
+ } catch (Exception ex) {
+ log.error("Could not get status from all local instances");
+ }
+ }
+ }, 30000, 30000);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ statusCheckTimer.cancel();
+ }
+ });
for (RuntimeSpawner spawner : spawners) {
spawner.join();
log.info("RuntimeSpawner quit because of {}", spawner.getRuntime().getDeathException());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index ede9056..0994adc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -23,6 +23,7 @@
*/
package org.apache.pulsar.functions.runtime;
+import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
@@ -31,6 +32,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.utils.Utils;
@Slf4j
public class RuntimeSpawner implements AutoCloseable {
@@ -100,6 +102,16 @@ public class RuntimeSpawner implements AutoCloseable {
});
}
+ public CompletableFuture<String> getFunctionStatusAsJson() {
+ return this.getFunctionStatus().thenApply(msg -> {
+ try {
+ return Utils.printJson(msg);
+ } catch (IOException e) {
+ throw new RuntimeException("Exception parsing getstatus", e);
+ }
+ });
+ }
+
@Override
public void close() {
if (null != runtime) {
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.