You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/04 22:00:43 UTC
[incubator-pulsar] branch master updated: Make Function port part
of InstanceConfig (#1731)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6a42419 Make Function port part of InstanceConfig (#1731)
6a42419 is described below
commit 6a42419bd20350d1814168ed4df33439aa112a9f
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri May 4 15:00:40 2018 -0700
Make Function port part of InstanceConfig (#1731)
* Made port part of instance config
* Put back worker dep
---
.../java/org/apache/pulsar/admin/cli/CmdFunctions.java | 1 +
.../pulsar/functions/instance/InstanceConfig.java | 1 +
.../pulsar/functions/runtime/JavaInstanceMain.java | 1 +
.../pulsar/functions/runtime/ProcessRuntime.java | 18 ++----------------
.../java/org/apache/pulsar/functions/utils/Utils.java | 15 +++++++++++++++
.../pulsar/functions/worker/FunctionActioner.java | 1 +
6 files changed, 21 insertions(+), 16 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index df15fc9..15d0166 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -638,6 +638,7 @@ public class CmdFunctions extends CmdBase {
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setInstanceId(Integer.toString(i));
instanceConfig.setMaxBufferedTuples(1024);
+ instanceConfig.setPort(Utils.findAvailablePort());
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 5f26e43..9f9da79 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -40,4 +40,5 @@ public class InstanceConfig {
private String functionVersion;
private FunctionDetails functionDetails;
private int maxBufferedTuples;
+ private int port;
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 5bb5cac..feaecd1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -175,6 +175,7 @@ public class JavaInstanceMain {
FunctionDetails functionDetails = functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
+ instanceConfig.setPort(port);
ThreadRuntimeFactory containerFactory = new ThreadRuntimeFactory(
"LocalRunnerThreadGroup",
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 72a5801..8428187 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -64,6 +64,7 @@ class ProcessRuntime implements Runtime {
String logDirectory,
String codeFile,
String pulsarServiceUrl) {
+ this.instancePort = instanceConfig.getPort();
this.processArgs = composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl);
}
@@ -130,9 +131,8 @@ class ProcessRuntime implements Runtime {
args.add("--user_config");
args.add(new Gson().toJson(userConfig));
}
- instancePort = findAvailablePort();
args.add("--port");
- args.add(String.valueOf(instancePort));
+ args.add(String.valueOf(instanceConfig.getPort()));
// source related configs
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
@@ -256,20 +256,6 @@ class ProcessRuntime implements Runtime {
return retval;
}
- private int findAvailablePort() {
- // The logic here is a little flaky. There is no guarantee that this
- // port returned will be available later on when the instance starts
- // TODO(sanjeev):- Fix this
- try {
- ServerSocket socket = new ServerSocket(0);
- int port = socket.getLocalPort();
- socket.close();
- return port;
- } catch (IOException ex){
- throw new RuntimeException("No free port found", ex);
- }
- }
-
private void startProcess() {
deathException = null;
try {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 7a15893..68ac86b 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -22,6 +22,8 @@ import com.google.protobuf.AbstractMessage.Builder;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import java.io.IOException;
+import java.net.ServerSocket;
+
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.MessageId;
@@ -61,4 +63,17 @@ public class Utils {
JsonFormat.parser().merge(json, builder);
}
+ public static int findAvailablePort() {
+ // The logic here is a little flaky. There is no guarantee that this
+ // port returned will be available later on when the instance starts
+ // TODO(sanjeev):- Fix this
+ try {
+ ServerSocket socket = new ServerSocket(0);
+ int port = socket.getLocalPort();
+ socket.close();
+ return port;
+ } catch (IOException ex){
+ throw new RuntimeException("No free port found", ex);
+ }
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index e6821f6..5586114 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -164,6 +164,7 @@ public class FunctionActioner implements AutoCloseable {
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setInstanceId(String.valueOf(instanceId));
instanceConfig.setMaxBufferedTuples(1024);
+ instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(),
runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.