You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/04 22:00:43 UTC

[incubator-pulsar] branch master updated: Make Function port part of InstanceConfig (#1731)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a42419  Make Function port part of InstanceConfig (#1731)
6a42419 is described below

commit 6a42419bd20350d1814168ed4df33439aa112a9f
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri May 4 15:00:40 2018 -0700

    Make Function port part of InstanceConfig (#1731)
    
    * Made port part of instance config
    
    * Put back worker dep
---
 .../java/org/apache/pulsar/admin/cli/CmdFunctions.java |  1 +
 .../pulsar/functions/instance/InstanceConfig.java      |  1 +
 .../pulsar/functions/runtime/JavaInstanceMain.java     |  1 +
 .../pulsar/functions/runtime/ProcessRuntime.java       | 18 ++----------------
 .../java/org/apache/pulsar/functions/utils/Utils.java  | 15 +++++++++++++++
 .../pulsar/functions/worker/FunctionActioner.java      |  1 +
 6 files changed, 21 insertions(+), 16 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index df15fc9..15d0166 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -638,6 +638,7 @@ public class CmdFunctions extends CmdBase {
                     instanceConfig.setFunctionId(UUID.randomUUID().toString());
                     instanceConfig.setInstanceId(Integer.toString(i));
                     instanceConfig.setMaxBufferedTuples(1024);
+                    instanceConfig.setPort(Utils.findAvailablePort());
                     RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                             instanceConfig,
                             userCodeFile,
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 5f26e43..9f9da79 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -40,4 +40,5 @@ public class InstanceConfig {
     private String functionVersion;
     private FunctionDetails functionDetails;
     private int maxBufferedTuples;
+    private int port;
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 5bb5cac..feaecd1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -175,6 +175,7 @@ public class JavaInstanceMain {
 
         FunctionDetails functionDetails = functionDetailsBuilder.build();
         instanceConfig.setFunctionDetails(functionDetails);
+        instanceConfig.setPort(port);
 
         ThreadRuntimeFactory containerFactory = new ThreadRuntimeFactory(
                 "LocalRunnerThreadGroup",
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 72a5801..8428187 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -64,6 +64,7 @@ class ProcessRuntime implements Runtime {
                    String logDirectory,
                    String codeFile,
                    String pulsarServiceUrl) {
+        this.instancePort = instanceConfig.getPort();
         this.processArgs = composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl);
     }
 
@@ -130,9 +131,8 @@ class ProcessRuntime implements Runtime {
             args.add("--user_config");
             args.add(new Gson().toJson(userConfig));
         }
-        instancePort = findAvailablePort();
         args.add("--port");
-        args.add(String.valueOf(instancePort));
+        args.add(String.valueOf(instanceConfig.getPort()));
 
         // source related configs
         if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
@@ -256,20 +256,6 @@ class ProcessRuntime implements Runtime {
         return retval;
     }
 
-    private int findAvailablePort() {
-        // The logic here is a little flaky. There is no guarantee that this
-        // port returned will be available later on when the instance starts
-        // TODO(sanjeev):- Fix this
-        try {
-            ServerSocket socket = new ServerSocket(0);
-            int port = socket.getLocalPort();
-            socket.close();
-            return port;
-        } catch (IOException ex){
-            throw new RuntimeException("No free port found", ex);
-        }
-    }
-
     private void startProcess() {
         deathException = null;
         try {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 7a15893..68ac86b 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -22,6 +22,8 @@ import com.google.protobuf.AbstractMessage.Builder;
 import com.google.protobuf.MessageOrBuilder;
 import com.google.protobuf.util.JsonFormat;
 import java.io.IOException;
+import java.net.ServerSocket;
+
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.pulsar.client.api.MessageId;
@@ -61,4 +63,17 @@ public class Utils {
         JsonFormat.parser().merge(json, builder);
     }
 
+    public static int findAvailablePort() {
+        // The logic here is a little flaky. There is no guarantee that this
+        // port returned will be available later on when the instance starts
+        // TODO(sanjeev):- Fix this
+        try {
+            ServerSocket socket = new ServerSocket(0);
+            int port = socket.getLocalPort();
+            socket.close();
+            return port;
+        } catch (IOException ex){
+            throw new RuntimeException("No free port found", ex);
+        }
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index e6821f6..5586114 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -164,6 +164,7 @@ public class FunctionActioner implements AutoCloseable {
         instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
         instanceConfig.setInstanceId(String.valueOf(instanceId));
         instanceConfig.setMaxBufferedTuples(1024);
+        instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
         RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(),
                 runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
 

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.