You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/08 00:31:44 UTC

[incubator-pulsar] branch master updated: Print periodic status of instances in localrun (#1734)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cb648d2  Print periodic status of instances in localrun (#1734)
cb648d2 is described below

commit cb648d2e4705e19ba8bc607214fe99d0de23c35e
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon May 7 17:31:41 2018 -0700

    Print periodic status of instances in localrun (#1734)
    
    * Log status of running instances periodically
    
    * Fix shading conflicts
    
    * Add shutdown hook to cancel the timer
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 37 ++++++++++++++++++----
 .../pulsar/functions/runtime/RuntimeSpawner.java   | 12 +++++++
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index c0d8858..3e6201e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -29,13 +29,10 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -1014,6 +1011,32 @@ public class CmdFunctions extends CmdBase {
                     }
                 }
             });
+            Timer statusCheckTimer = new Timer();
+            statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
+                    @Override
+                    public void run() {
+                        CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
+                        int index = 0;
+                        for (RuntimeSpawner spawner : spawners) {
+                            futures[index++] = spawner.getFunctionStatusAsJson();
+                        }
+                        try {
+                            CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
+                            for (index = 0; index < futures.length; ++index) {
+                                String json = futures[index].get();
+                                Gson gson = new GsonBuilder().setPrettyPrinting().create();
+                                log.info(gson.toJson(new JsonParser().parse(json)));
+                            }
+                        } catch (Exception ex) {
+                            log.error("Could not get status from all local instances");
+                        }
+                    }
+                }, 30000, 30000);
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                    public void run() {
+                        statusCheckTimer.cancel();
+                    }
+                });
             for (RuntimeSpawner spawner : spawners) {
                 spawner.join();
                 log.info("RuntimeSpawner quit because of {}", spawner.getRuntime().getDeathException());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index ede9056..0994adc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -23,6 +23,7 @@
  */
 package org.apache.pulsar.functions.runtime;
 
+import java.io.IOException;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
@@ -31,6 +32,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.utils.Utils;
 
 @Slf4j
 public class RuntimeSpawner implements AutoCloseable {
@@ -100,6 +102,16 @@ public class RuntimeSpawner implements AutoCloseable {
         });
     }
 
+    public CompletableFuture<String> getFunctionStatusAsJson() {
+        return this.getFunctionStatus().thenApply(msg -> {
+            try {
+                return Utils.printJson(msg);
+            } catch (IOException e) {
+                throw new RuntimeException("Exception parsing getstatus", e);
+            }
+        });
+    }
+
     @Override
     public void close() {
         if (null != runtime) {

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.