You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/22 23:02:28 UTC

[GitHub] jerrypeng closed pull request #1828: instance worker health check

jerrypeng closed pull request #1828: instance worker health check
URL: https://github.com/apache/incubator-pulsar/pull/1828
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 2a0e1a22e9..88cf318247 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -146,6 +146,7 @@ private static boolean argsContains(String[] args, String arg) {
                 // worker talks to local broker
                 workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + brokerConfig.getBrokerServicePort());
                 workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + brokerConfig.getWebServicePort());
+                workerConfig.setWorkerPort(brokerConfig.getWebServicePort());
                 String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
                     brokerConfig.getAdvertisedAddress());
                 workerConfig.setWorkerHostname(hostname);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 641636afed..f6ba9f569c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -189,6 +189,7 @@ void start() throws Exception {
             // worker talks to local broker
             workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort());
             workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort());
+            workerConfig.setWorkerPort(config.getWebServicePort());
             String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
                 config.getAdvertisedAddress());
             workerConfig.setWorkerHostname(hostname);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index c22b6117b2..a08b9a8eba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -221,6 +221,20 @@ public Response getAssignments() {
         return functions.getAssignments();
     }
 
+    @GET
+    @ApiOperation(
+            value = "Fetches unqiue Id of worker",
+            response = String.class,
+            responseContainer = "String"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+    })
+    @Path("/id")
+    public Response getId() {
+        return functions.getId();
+    }
+
     @POST
     @ApiOperation(
             value = "Triggers a Pulsar Function with a user-specified value or file data",
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 9f9da7986a..4e73eeaf7f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -41,4 +41,6 @@
     private FunctionDetails functionDetails;
     private int maxBufferedTuples;
     private int port;
+    private String fullyQualifiedWorkerId;
+    private int workerPort;
 }
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 0c9c5bf3f9..26e6706778 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -74,6 +74,8 @@ def main():
   parser.add_argument('--source_topics_serde_classname', required=True, help='A mapping of Input topics to SerDe')
   parser.add_argument('--sink_topic', required=False, help='Sink Topic')
   parser.add_argument('--sink_serde_classname', required=False, help='Sink SerDe classname')
+  parser.add_argument('--fully_qualified_worker_id', required=True, help='Unique Identifier for worker')
+  parser.add_argument('--worker_port', required=True, help='Service port of worker service')
 
   args = parser.parse_args()
   log_file = os.path.join(args.logging_directory,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 4e23038325..b10e55a1f2 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -37,15 +37,25 @@
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 
-import java.lang.reflect.Type;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
+import java.net.URL;
 import java.util.Map;
+import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A function container implemented using java thread.
  */
 @Slf4j
-public class JavaInstanceMain {
+public class JavaInstanceMain implements AutoCloseable{
     @Parameter(names = "--function_classname", description = "Function Class Name\n", required = true)
     protected String className;
     @Parameter(
@@ -122,7 +132,16 @@
     @Parameter(names = "--sink_serde_classname", description = "Sink SerDe\n")
     protected String sinkSerdeClassName;
 
+    @Parameter(names = "--fully_qualified_worker_id", description = "Unique Identifier for worker\n", required = true)
+    protected String uniqueWorkerId;
+
+    @Parameter(names = "--worker_port", description = "Service port of worker service\n", required = true)
+    protected int workerPort;
+
     private Server server;
+    private RuntimeSpawner runtimeSpawner;
+    private ScheduledExecutorService timer;
+    private ScheduledFuture<?> handle;
 
     public JavaInstanceMain() { }
 
@@ -133,6 +152,8 @@ public void start() throws Exception {
         instanceConfig.setFunctionVersion(functionVersion);
         instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
+        instanceConfig.setFullyQualifiedWorkerId(uniqueWorkerId);
+        instanceConfig.setWorkerPort(workerPort);
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
         functionDetailsBuilder.setTenant(tenant);
         functionDetailsBuilder.setNamespace(namespace);
@@ -186,12 +207,13 @@ public void start() throws Exception {
         instanceConfig.setFunctionDetails(functionDetails);
         instanceConfig.setPort(port);
 
+
         ThreadRuntimeFactory containerFactory = new ThreadRuntimeFactory(
                 "LocalRunnerThreadGroup",
                 pulsarServiceUrl,
                 stateStorageServiceUrl);
 
-        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
+        runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
                 containerFactory,
@@ -199,21 +221,52 @@ public void start() throws Exception {
 
         server = ServerBuilder.forPort(port)
                 .addService(new InstanceControlImpl(runtimeSpawner))
-                .build()
-                .start();
-        log.info("JaveInstance Server started, listening on " + port);
-        java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
+                .build();
+
+        // monitor worker that spawned this process
+        timer = Executors.newSingleThreadScheduledExecutor();
+        handle = timer.scheduleAtFixedRate(new TimerTask() {
             @Override
             public void run() {
-                // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+                String uniqueWorkerId = null;
                 try {
-                    server.shutdown();
-                    runtimeSpawner.close();
+                    URL url = new URL(String.format("http://127.0.0.1:%s/admin/functions/id",
+                            instanceConfig.getWorkerPort()));
+                    HttpURLConnection con = (HttpURLConnection) url.openConnection();
+                    con.setConnectTimeout(30000);
+                    con.setReadTimeout(30000);
+                    con.setRequestMethod("GET");
+                    BufferedReader in = new BufferedReader(
+                            new InputStreamReader(con.getInputStream()));
+                    String inputLine;
+                    StringBuffer content = new StringBuffer();
+                    while ((inputLine = in.readLine()) != null) {
+                        content.append(inputLine);
+                    }
+                    in.close();
+
+                    uniqueWorkerId = content.toString();
+
+                } catch (SocketTimeoutException | ConnectException ex) {
+                    log.info("Could not connect to worker.  Shutting down...");
+                    close();
+                    return;
                 } catch (Exception ex) {
-                    System.err.println(ex);
+                    log.error("Exception occurred when getting worker ID", ex);
+                    return;
+                }
+                if (!uniqueWorkerId.equals(instanceConfig.getFullyQualifiedWorkerId())) {
+                    log.info("There is a discrepency in worker ID. Expected worker ID: {} Actual worker ID: {}. New worker process might have been started. Shutting down...", instanceConfig.getFullyQualifiedWorkerId(), uniqueWorkerId);
+                    close();
+                    return;
                 }
             }
-        });
+        }, 30, 30, TimeUnit.SECONDS);
+
+        // start server
+        server.start();
+        log.info("JaveInstance Server started, listening on " + port);
+        java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> close()));
         log.info("Starting runtimeSpawner");
         runtimeSpawner.start();
         runtimeSpawner.join();
@@ -231,6 +284,27 @@ public static void main(String[] args) throws Exception {
         javaInstanceMain.start();
     }
 
+    @Override
+    public void close() {
+        try {
+            // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+            if (server != null) {
+                server.shutdown();
+            }
+            if (runtimeSpawner != null) {
+                runtimeSpawner.close();
+            }
+            if (handle != null) {
+                handle.cancel(true);
+            }
+            if (timer != null) {
+                timer.shutdown();
+            }
+        } catch (Exception ex) {
+            System.err.println(ex);
+        }
+    }
+
     static class InstanceControlImpl extends InstanceControlGrpc.InstanceControlImplBase {
         private RuntimeSpawner runtimeSpawner;
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 5b77eb277c..d62c89cae9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -26,7 +26,6 @@
 import com.google.protobuf.Empty;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
-import java.util.concurrent.ExecutionException;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -35,12 +34,9 @@
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 
-import java.io.IOException;
 import java.io.InputStream;
-import java.net.ServerSocket;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.functions.proto.InstanceControlGrpc.InstanceControlFutureStub;
 
 /**
  * A function container implemented using java thread.
@@ -190,6 +186,11 @@
             args.add("--sink_serde_classname");
             args.add(instanceConfig.getFunctionDetails().getSink().getSerDeClassName());
         }
+
+        args.add("--fully_qualified_worker_id");
+        args.add(instanceConfig.getFullyQualifiedWorkerId());
+        args.add("--worker_port");
+        args.add(Integer.toString(instanceConfig.getWorkerPort()));
         return args;
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 0994adcc73..cf911e1f38 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -118,6 +118,9 @@ public void close() {
             runtime.stop();
             runtime = null;
         }
+        if (runtimeFactory != null) {
+            runtimeFactory.close();
+        }
         if (processLivenessCheckTimer != null) {
             processLivenessCheckTimer.cancel();
             processLivenessCheckTimer = null;
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 5a3732889d..c2d601457d 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -104,6 +104,8 @@ InstanceConfig createJavaInstanceConfig(FunctionDetails.Runtime runtime) {
         config.setFunctionVersion("1.0");
         config.setInstanceId(java.util.UUID.randomUUID().toString());
         config.setMaxBufferedTuples(1024);
+        config.setFullyQualifiedWorkerId("my-fully-qualified-worker-id");
+        config.setWorkerPort(8080);
 
         return config;
     }
@@ -114,7 +116,7 @@ public void testJavaConstructor() {
 
         ProcessRuntime container = factory.createContainer(config, userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 51);
+        assertEquals(args.size(), 55);
         String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
                 + "-Dpulsar.log.dir=" + logDirectory + "/functions" + " -Dpulsar.log.file=" + config.getFunctionDetails().getName()
                 + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
@@ -136,7 +138,9 @@ public void testJavaConstructor() {
                 + " --sink_classname " + config.getFunctionDetails().getSink().getClassName()
                 + " --sink_type_classname " + config.getFunctionDetails().getSink().getTypeClassName()
                 + " --sink_topic " + config.getFunctionDetails().getSink().getTopic()
-                + " --sink_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName();
+                + " --sink_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName()
+                + " --fully_qualified_worker_id " + config.getFullyQualifiedWorkerId()
+                + " --worker_port " + config.getWorkerPort();
         assertEquals(expectedArgs, String.join(" ", args));
     }
 
@@ -146,7 +150,7 @@ public void testPythonConstructor() {
 
         ProcessRuntime container = factory.createContainer(config, userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 42);
+        assertEquals(args.size(), 46);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
                 + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
@@ -163,7 +167,9 @@ public void testPythonConstructor() {
                 + " --source_subscription_type " + config.getFunctionDetails().getSource().getSubscriptionType().name()
                 + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName)
                 + " --sink_topic " + config.getFunctionDetails().getSink().getTopic()
-                + " --sink_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName();
+                + " --sink_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName()
+                + " --fully_qualified_worker_id " + config.getFullyQualifiedWorkerId()
+                + " --worker_port " + config.getWorkerPort();
         assertEquals(expectedArgs, String.join(" ", args));
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 5586114c59..ace0c63fda 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -55,11 +55,12 @@
     private LinkedBlockingQueue<FunctionAction> actionQueue;
     private volatile boolean running;
     private Thread actioner;
+    private String fullyQualifiedWorkerId;
 
     public FunctionActioner(WorkerConfig workerConfig,
                             RuntimeFactory runtimeFactory,
                             Namespace dlogNamespace,
-                            LinkedBlockingQueue<FunctionAction> actionQueue) {
+                            LinkedBlockingQueue<FunctionAction> actionQueue, String fullyQualifiedWorkerId) {
         this.workerConfig = workerConfig;
         this.runtimeFactory = runtimeFactory;
         this.dlogNamespace = dlogNamespace;
@@ -85,6 +86,7 @@ public FunctionActioner(WorkerConfig workerConfig,
             }
         });
         actioner.setName("FunctionActionerThread");
+        this.fullyQualifiedWorkerId = fullyQualifiedWorkerId;
     }
 
     public void start() {
@@ -165,6 +167,8 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep
         instanceConfig.setInstanceId(String.valueOf(instanceId));
         instanceConfig.setMaxBufferedTuples(1024);
         instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
+        instanceConfig.setFullyQualifiedWorkerId(fullyQualifiedWorkerId);
+        instanceConfig.setWorkerPort(workerConfig.getWorkerPort());
         RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(),
                 runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index af23992308..57319c0716 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -110,7 +110,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig,
         this.actionQueue = new LinkedBlockingQueue<>();
 
         this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
-                dlogNamespace, actionQueue);
+                dlogNamespace, actionQueue, membershipManager.getWorkerInfo().getFullyQualifiedWorkerId());
 
         this.membershipManager = membershipManager;
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index dde017fe96..7241f24d42 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -19,25 +19,11 @@
 package org.apache.pulsar.functions.worker;
 
 import com.google.common.annotations.VisibleForTesting;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -50,6 +36,18 @@
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
 /**
  * A simple implementation of leader election using a pulsar topic.
  */
@@ -62,6 +60,7 @@
     private PulsarAdmin pulsarAdminClient;
     private final CompletableFuture<Void> firstConsumerEventFuture;
     private final AtomicBoolean isLeader = new AtomicBoolean();
+    private final WorkerInfo myWorkerInfo;
 
     static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
 
@@ -75,12 +74,13 @@
     MembershipManager(WorkerConfig workerConfig, PulsarClient client)
             throws PulsarClientException {
         this.workerConfig = workerConfig;
-        consumerName = String.format(
-            "%s:%s:%d",
-            workerConfig.getWorkerId(),
-            workerConfig.getWorkerHostname(),
-            workerConfig.getWorkerPort()
+        myWorkerInfo = WorkerInfo.of(
+                workerConfig.getWorkerId(),
+                workerConfig.getWorkerHostname(),
+                workerConfig.getWorkerPort(),
+                System.currentTimeMillis()
         );
+        consumerName = myWorkerInfo.getFullyQualifiedWorkerId();
         firstConsumerEventFuture = new CompletableFuture<>();
         // the membership manager is using a `coordination` topic for leader election.
         // we don't produce any messages into this topic, we only use the `failover` subscription
@@ -151,23 +151,39 @@ public void close() throws PulsarClientException {
         private String workerId;
         private String workerHostname;
         private int port;
+        private long timestamp;
 
-        public static WorkerInfo of (String workerId, String workerHostname, int port) {
-            return new WorkerInfo(workerId, workerHostname, port);
+        public static WorkerInfo of (String workerId, String workerHostname, int port, long timestamp) {
+            return new WorkerInfo(workerId, workerHostname, port, timestamp);
         }
 
         public static WorkerInfo parseFrom(String str) {
             String[] tokens = str.split(":");
-            if (tokens.length != 3) {
+            if (tokens.length != 4) {
                 throw new IllegalArgumentException("Invalid string to parse WorkerInfo : " + str);
             }
 
             String workerId = tokens[0];
             String workerHostname = tokens[1];
             int port = Integer.parseInt(tokens[2]);
+            Long timestamp = Long.parseLong(tokens[3]);
 
-            return new WorkerInfo(workerId, workerHostname, port);
+            return new WorkerInfo(workerId, workerHostname, port, timestamp);
         }
+
+        public String getFullyQualifiedWorkerId() {
+            return String.format(
+                    "%s:%s:%d:%d",
+                    workerId,
+                    workerHostname,
+                    port,
+                    timestamp
+            );
+        }
+    }
+
+    public WorkerInfo getWorkerInfo() {
+        return myWorkerInfo;
     }
 
     public void checkFailures(FunctionMetaDataManager functionMetaDataManager,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index d306efa1bb..a3fa5e3ea2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -41,6 +41,7 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Message;
@@ -55,7 +56,6 @@
 import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.MembershipManager;
@@ -488,6 +488,18 @@ public Response getAssignments() {
                 new Gson().toJson(ret)).build();
     }
 
+    @GET
+    @Path("/id")
+    public Response getId() {
+
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        return Response.status(Status.OK).entity(
+                worker().getMembershipManager().getWorkerInfo().getFullyQualifiedWorkerId()).build();
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/{functionName}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 00c2a2836b..ca64af1201 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -130,6 +130,12 @@ public Response getAssignments() {
         return functions.getAssignments();
     }
 
+    @GET
+    @Path("/id")
+    public Response getId() {
+        return functions.getId();
+    }
+
     @POST
     @Path("/{tenant}/{namespace}/{functionName}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 690f474150..ee9e5ad36d 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -87,12 +87,18 @@ public void testProcessAssignmentUpdateAddFunctions() throws Exception {
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
 
+        MembershipManager membershipManager = mock(MembershipManager.class);
+        doReturn(
+                MembershipManager.WorkerInfo.of("id",
+                        "hostname", 5555, System.currentTimeMillis()))
+                .when(membershipManager).getWorkerInfo();
+
         // test new assignment add functions
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                membershipManager
         ));
 
         Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -180,12 +186,18 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception {
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
 
+        MembershipManager membershipManager = mock(MembershipManager.class);
+        doReturn(
+                MembershipManager.WorkerInfo.of("id",
+                        "hostname", 5555, System.currentTimeMillis()))
+                .when(membershipManager).getWorkerInfo();
+
         // test new assignment delete functions
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                membershipManager
         ));
 
         Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
@@ -277,12 +289,18 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception {
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
 
+        MembershipManager membershipManager = mock(MembershipManager.class);
+        doReturn(
+                MembershipManager.WorkerInfo.of("id",
+                        "hostname", 5555, System.currentTimeMillis()))
+                .when(membershipManager).getWorkerInfo();
+
         // test new assignment update functions
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                membershipManager
         ));
 
         Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 6dd3fa3c3c..82f12c9a02 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -131,18 +131,19 @@ public void testCheckFailuresNoFailures() throws Exception {
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
+
+        FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
+        MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient()));
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                membershipManager
         ));
-        FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient()));
 
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "host-1", 8000));
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-2", "host-2", 8001));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "host-1", 8000, System.currentTimeMillis()));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-2", "host-2", 8001, System.currentTimeMillis()));
 
         Mockito.doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
@@ -195,18 +196,18 @@ public void testCheckFailuresSomeFailures() throws Exception {
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
+
+        FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
+        MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient()));
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                membershipManager
         ));
 
-        FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient()));
-
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "host-1", 8000));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "host-1", 8000, System.currentTimeMillis()));
 
         Mockito.doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
@@ -284,18 +285,19 @@ public void testCheckFailuresSomeUnassigned() throws Exception {
         doReturn(readerBuilder).when(readerBuilder).topic(anyString());
         doReturn(readerBuilder).when(readerBuilder).startMessageId(any());
         doReturn(mock(Reader.class)).when(readerBuilder).create();
+
+        FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
+        MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient()));
         FunctionRuntimeManager functionRuntimeManager = spy(new FunctionRuntimeManager(
                 workerConfig,
                 pulsarClient,
                 mock(Namespace.class),
-                mock(MembershipManager.class)
+                membershipManager
         ));
-        FunctionMetaDataManager functionMetaDataManager = mock(FunctionMetaDataManager.class);
-        MembershipManager membershipManager = spy(new MembershipManager(workerConfig, mockPulsarClient()));
 
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "host-1", 8000));
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-2", "host-2", 8001));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "host-1", 8000, System.currentTimeMillis()));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-2", "host-2", 8001, System.currentTimeMillis()));
 
         Mockito.doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index b5b4e981ec..c57a3cb962 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -146,7 +146,7 @@ public void testSchedule() throws Exception {
 
         // single node
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000, System.currentTimeMillis()));
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am not leader
@@ -190,7 +190,7 @@ public void testNothingNewToSchedule() throws Exception {
 
         // single node
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000, System.currentTimeMillis()));
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
@@ -246,7 +246,7 @@ public void testAddingFunctions() throws Exception {
 
         // single node
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000, System.currentTimeMillis()));
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
@@ -317,7 +317,7 @@ public void testDeletingFunctions() throws Exception {
 
         // single node
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000, System.currentTimeMillis()));
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
@@ -376,7 +376,7 @@ public void testScalingUp() throws Exception {
 
         // single node
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000, System.currentTimeMillis()));
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
@@ -482,7 +482,7 @@ public void testScalingDown() throws Exception {
 
         // single node
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000, System.currentTimeMillis()));
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader
@@ -591,7 +591,7 @@ public void testUpdate() throws Exception {
 
         // single node
         List<MembershipManager.WorkerInfo> workerInfoList = new LinkedList<>();
-        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000));
+        workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000, System.currentTimeMillis()));
         doReturn(workerInfoList).when(membershipManager).getCurrentMembership();
 
         // i am leader


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services