You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/19 17:42:16 UTC
[pulsar] branch master updated: Implementing authentication for
Pulsar Functions (#3735)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b50760c Implementing authentication for Pulsar Functions (#3735)
b50760c is described below
commit b50760ca42d97ca16e7fd33329e5f74e0775d926
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Mar 19 11:42:11 2019 -0600
Implementing authentication for Pulsar Functions (#3735)
* Implementing authentication for Pulsar Functions
* delete unnecessary changes
* cleaning up
* improving implementation
* fixing tests
* cleaning up
* add no op implementation
* cleaning up unnecessary changes
* refactoring based on comments
* adding comments
* change data from string type to bytes
* add proto file
* addressing comments
* up merging
* refactoring get token code
* cleaning up
* fix bugs and add tests
* add tests
* remove service account creation
* cleanup unused imports
* add field for auth provider
* adding comments
---
.../AuthenticationProviderToken.java | 4 +-
.../pulsar/broker/admin/impl/FunctionsBase.java | 2 +-
.../apache/pulsar/broker/admin/impl/SinkBase.java | 2 +-
.../pulsar/broker/admin/impl/SourceBase.java | 2 +-
.../pulsar/functions/instance/InstanceConfig.java | 2 +
.../proto/src/main/proto/Function.proto | 15 ++
pulsar-functions/runtime/pom.xml | 6 +
.../auth/ClearTextFunctionTokenAuthProvider.java | 55 +++++
.../pulsar/functions/auth/FunctionAuthData.java} | 43 ++--
.../functions/auth/FunctionAuthProvider.java | 59 +++++
.../pulsar/functions/auth/FunctionAuthUtils.java | 28 +++
.../KubernetesFunctionAuthProvider.java} | 40 +---
.../auth/KubernetesSecretsTokenAuthProvider.java | 248 +++++++++++++++++++++
.../functions/auth/NoOpFunctionAuthProvider.java | 43 ++++
.../functions/runtime/KubernetesRuntime.java | 83 +++++--
.../runtime/KubernetesRuntimeFactory.java | 35 ++-
.../functions/runtime/ProcessRuntimeFactory.java | 8 +
.../apache/pulsar/functions/runtime/Runtime.java | 4 +
.../pulsar/functions/runtime/RuntimeFactory.java | 7 +
.../ClearTextFunctionTokenAuthProviderTest.java | 63 ++++++
.../KubernetesSecretsTokenAuthProviderTest.java | 107 +++++++++
.../runtime/KubernetesRuntimeFactoryTest.java | 2 -
.../pulsar/functions/worker/FunctionActioner.java | 60 +++--
.../functions/worker/rest/FunctionApiResource.java | 5 +
.../functions/worker/rest/api/ComponentImpl.java | 40 +++-
.../functions/worker/rest/api/FunctionsImplV2.java | 2 +-
.../worker/rest/api/v3/FunctionApiV3Resource.java | 2 +-
.../worker/rest/api/v3/SinkApiV3Resource.java | 4 +-
.../worker/rest/api/v3/SourceApiV3Resource.java | 2 +-
.../worker/FunctionRuntimeManagerTest.java | 15 +-
.../rest/api/v3/FunctionApiV3ResourceTest.java | 8 +-
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 6 +-
.../rest/api/v3/SourceApiV3ResourceTest.java | 6 +-
33 files changed, 872 insertions(+), 136 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
index 2af43f2..f5ee57c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -76,7 +76,7 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
return parseToken(token);
}
- private String getToken(AuthenticationDataSource authData) throws AuthenticationException {
+ public static String getToken(AuthenticationDataSource authData) throws AuthenticationException {
if (authData.hasDataFromCommand()) {
// Authenticate Pulsar binary connection
return authData.getCommandData();
@@ -96,7 +96,7 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
}
}
- private String validateToken(final String token) throws AuthenticationException {
+ private static String validateToken(final String token) throws AuthenticationException {
if (StringUtils.isNotBlank(token)) {
return token;
} else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 9b88f29..c4fd33f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -82,7 +82,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @FormDataParam("functionConfig") String functionConfigJson) {
functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, functionConfigJson, clientAppId());
+ functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
}
@PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index 2bd22a4..ffd4dd7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -79,7 +79,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
final @FormDataParam("sinkConfig") String sinkConfigJson) {
sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, sinkConfigJson, clientAppId());
+ functionPkgUrl, null, sinkConfigJson, clientAppId(), clientAuthData());
}
@PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index 0e8348f..6078ede 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -79,7 +79,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
final @FormDataParam("sourceConfig") String sourceConfigJson) {
source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, sourceConfigJson, clientAppId());
+ functionPkgUrl, null, sourceConfigJson, clientAppId(), clientAuthData());
}
@PUT
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 023f6a2..f823728 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -23,6 +23,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
/**
@@ -40,6 +41,7 @@ public class InstanceConfig {
private String functionVersion;
private FunctionDetails functionDetails;
private int maxBufferedTuples;
+ private Function.FunctionAuthenticationSpec functionAuthenticationSpec;
private int port;
private String clusterName;
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 8c93b3a..fba37e4 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -141,6 +141,21 @@ message FunctionMetaData {
uint64 version = 3;
uint64 createTime = 4;
map<int32, FunctionState> instanceStates = 5;
+ FunctionAuthenticationSpec functionAuthSpec = 6;
+}
+
+message FunctionAuthenticationSpec {
+ /**
+ * function authentication related data that the function authentication provider
+ * needs to cache/distribute to all workers support function authentication.
+ * Depending on the function authentication provider implementation, this can be the actual auth credentials
+ * or a pointer to the auth credentials that this function should use
+ */
+ bytes data = 1;
+ /**
+ * classname of the function auth provicer this data is relevant to
+ */
+ string provider = 2;
}
message Instance {
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index f3176a7..ca363c8 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -63,6 +63,12 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-broker-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
</project>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java
new file mode 100644
index 0000000..7adf5ed
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+
+import java.util.Optional;
+
+import static org.apache.pulsar.broker.authentication.AuthenticationProviderToken.getToken;
+
+public class ClearTextFunctionTokenAuthProvider implements FunctionAuthProvider {
+ @Override
+ public void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData) {
+ authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ authConfig.setClientAuthenticationParameters("token:" + new String(functionAuthData.getData()));
+ }
+
+ @Override
+ public Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception {
+ String token = null;
+ try {
+ token = getToken(authenticationDataSource);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ if (token != null) {
+ return Optional.of(FunctionAuthData.builder().data(token.getBytes()).build());
+ }
+ return null;
+ }
+
+ @Override
+ public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
+ //no-op
+ }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthData.java
similarity index 50%
copy from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
copy to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthData.java
index 023f6a2..be17668 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthData.java
@@ -16,39 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.instance;
+package org.apache.pulsar.functions.auth;
+import lombok.Builder;
import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+@Data
+@Builder
/**
- * This is the config passed to the Java Instance. Contains all the information
- * passed to run functions.
+ * A wrapper for authentication data for functions
*/
-@Data
-@Getter
-@Setter
-@EqualsAndHashCode
-@ToString
-public class InstanceConfig {
- private int instanceId;
- private String functionId;
- private String functionVersion;
- private FunctionDetails functionDetails;
- private int maxBufferedTuples;
- private int port;
- private String clusterName;
-
+public class FunctionAuthData {
+ /**
+ * function authentication related data that the function authentication provider
+ * needs to cache/distribute to all workers support function authentication.
+ * Depending on the function authentication provider implementation, this can be the actual auth credentials
+ * or a pointer to the auth credentials that this function should use
+ */
+ private byte[] data;
/**
- * Get the string representation of {@link #getInstanceId()}.
- *
- * @return the string representation of {@link #getInstanceId()}.
+ * classname of the function auth provicer this data is relevant to
*/
- public String getInstanceName() {
- return "" + instanceId;
- }
+ private String provider;
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
new file mode 100644
index 0000000..e30e9ad
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+
+import java.util.Optional;
+
+/**
+ * This is a generic interface that functions can use to cache and distribute appropriate authentication
+ * data that is needed to configure the runtime of functions to support appropriate authentication of function instances
+ */
+public interface FunctionAuthProvider {
+
+ /**
+ * Set authentication configs for function instance based on the data in FunctionAuthenticationSpec
+ * @param authConfig authentication configs passed to the function instance
+ * @param functionAuthData function authentication data that is provider specific
+ */
+ void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData);
+
+ /**
+ * Cache auth data in as part of function metadata for function that runtime may need to configure authentication
+ * @param tenant tenant that the function is running under
+ * @param namespace namespace that is the function is running under
+ * @param name name of the function
+ * @param authenticationDataSource auth data
+ * @return
+ * @throws Exception
+ */
+ Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception;
+
+ /**
+ * Clean up operation for auth when function is terminated
+ * @param tenant tenant that the function is running under
+ * @param namespace namespace that is the function is running under
+ * @param name name of the function
+ * @param functionAuthData function auth data
+ * @throws Exception
+ */
+ void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception;
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthUtils.java
new file mode 100644
index 0000000..3356706
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthUtils.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import org.apache.pulsar.functions.proto.Function;
+
+public final class FunctionAuthUtils {
+
+ public static final FunctionAuthData getFunctionAuthData(Function.FunctionAuthenticationSpec functionAuthenticationSpec) {
+ return FunctionAuthData.builder().data(functionAuthenticationSpec.getData().toByteArray()).build();
+ }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
similarity index 50%
copy from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
copy to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
index 19a5fc9..8b42c64 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
@@ -16,36 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.pulsar.functions.auth;
-package org.apache.pulsar.functions.runtime;
-
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
+import io.kubernetes.client.models.V1ServiceAccount;
+import io.kubernetes.client.models.V1StatefulSet;
/**
- * A function container is an environment for invoking functions.
+ * Kubernetes runtime specific functions authentication provider
*/
-public interface Runtime {
-
- void start() throws Exception;
-
- void join() throws Exception;
-
- void stop() throws Exception;
-
- boolean isAlive();
-
- Throwable getDeathException();
-
- CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus(int instanceId);
-
- CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics();
-
- CompletableFuture<Void> resetMetrics();
-
- CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId);
-
- String getPrometheusMetrics() throws IOException;
+public interface KubernetesFunctionAuthProvider extends FunctionAuthProvider {
+
+ /**
+ * Configure function statefulset spec based on function auth data
+ * @param statefulSet statefulset spec for function
+ * @param functionAuthData function auth data
+ */
+ void configureAuthDataStatefulSet(V1StatefulSet statefulSet, FunctionAuthData functionAuthData);
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
new file mode 100644
index 0000000..ff2cd57
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1DeleteOptions;
+import io.kubernetes.client.models.V1ObjectMeta;
+import io.kubernetes.client.models.V1PodSpec;
+import io.kubernetes.client.models.V1Secret;
+import io.kubernetes.client.models.V1SecretVolumeSource;
+import io.kubernetes.client.models.V1StatefulSet;
+import io.kubernetes.client.models.V1Volume;
+import io.kubernetes.client.models.V1VolumeMount;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.utils.Actions;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.net.HttpURLConnection.HTTP_CONFLICT;
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static org.apache.pulsar.broker.authentication.AuthenticationProviderToken.getToken;
+
+@Slf4j
+public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAuthProvider {
+
+ private static final int NUM_RETRIES = 5;
+ private static final long SLEEP_BETWEEN_RETRIES_MS = 500;
+ private static final String SECRET_NAME = "function-auth";
+ private static final String DEFAULT_SECRET_MOUNT_DIR = "/etc/auth";
+ private static final String FUNCTION_AUTH_TOKEN = "token";
+
+
+ private final CoreV1Api coreClient;
+ private final String kubeNamespace;
+
+ public KubernetesSecretsTokenAuthProvider(CoreV1Api coreClient, String kubeNamespace) {
+ this.coreClient = coreClient;
+ this.kubeNamespace = kubeNamespace;
+ }
+
+ @Override
+ public void configureAuthDataStatefulSet(V1StatefulSet statefulSet, FunctionAuthData functionAuthData) {
+
+ V1PodSpec podSpec = statefulSet.getSpec().getTemplate().getSpec();
+
+ // configure pod mount secret with auth token
+ podSpec.setVolumes(Collections.singletonList(
+ new V1Volume()
+ .name(SECRET_NAME)
+ .secret(
+ new V1SecretVolumeSource()
+ .secretName(getSecretName(new String(functionAuthData.getData())))
+ .defaultMode(256))));
+
+ podSpec.getContainers().forEach(container -> container.setVolumeMounts(Collections.singletonList(
+ new V1VolumeMount()
+ .name(SECRET_NAME)
+ .mountPath(DEFAULT_SECRET_MOUNT_DIR)
+ .readOnly(true))));
+
+ }
+
+ @Override
+ public void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData) {
+ authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ authConfig.setClientAuthenticationParameters(String.format("file://%s/%s", DEFAULT_SECRET_MOUNT_DIR, FUNCTION_AUTH_TOKEN));
+ }
+
+
+ @Override
+ public Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name,
+ AuthenticationDataSource authenticationDataSource) {
+ String id = null;
+ try {
+ String token = getToken(authenticationDataSource);
+ if (token != null) {
+ id = createSecret(token, tenant, namespace, name);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ if (id != null) {
+ return Optional.of(FunctionAuthData.builder().data(id.getBytes()).build());
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
+ String fqfn = FunctionDetailsUtils.getFullyQualifiedName(tenant, namespace, name);
+
+ String secretName = new String(functionAuthData.getData());
+ Actions.Action deleteSecrets = Actions.Action.builder()
+ .actionName(String.format("Deleting secrets for function %s", fqfn))
+ .numRetries(NUM_RETRIES)
+ .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+ .supplier(() -> {
+ try {
+ V1DeleteOptions v1DeleteOptions = new V1DeleteOptions();
+ v1DeleteOptions.setGracePeriodSeconds(0L);
+ v1DeleteOptions.setPropagationPolicy("Foreground");
+
+ coreClient.deleteNamespacedSecret(secretName,
+ kubeNamespace, v1DeleteOptions, "true",
+ null, null, null);
+ } catch (ApiException e) {
+ // if already deleted
+ if (e.getCode() == HTTP_NOT_FOUND) {
+ log.warn("Secrets for function {} does not exist", fqfn);
+ return Actions.ActionResult.builder().success(true).build();
+ }
+
+ String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
+ return Actions.ActionResult.builder()
+ .success(false)
+ .errorMsg(errorMsg)
+ .build();
+ }
+ return Actions.ActionResult.builder().success(true).build();
+ })
+ .build();
+
+ Actions.Action waitForSecretsDeletion = Actions.Action.builder()
+ .actionName(String.format("Waiting for secrets for function %s to complete deletion", fqfn))
+ .numRetries(NUM_RETRIES)
+ .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+ .supplier(() -> {
+ try {
+ coreClient.readNamespacedSecret(secretName, kubeNamespace,
+ null, null, null);
+
+ } catch (ApiException e) {
+ // statefulset is gone
+ if (e.getCode() == HTTP_NOT_FOUND) {
+ return Actions.ActionResult.builder().success(true).build();
+ }
+ String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
+ return Actions.ActionResult.builder()
+ .success(false)
+ .errorMsg(errorMsg)
+ .build();
+ }
+ return Actions.ActionResult.builder()
+ .success(false)
+ .build();
+ })
+ .build();
+
+ AtomicBoolean success = new AtomicBoolean(false);
+ Actions.newBuilder()
+ .addAction(deleteSecrets.toBuilder()
+ .continueOn(true)
+ .build())
+ .addAction(waitForSecretsDeletion.toBuilder()
+ .continueOn(false)
+ .onSuccess(ignore -> success.set(true))
+ .build())
+ .addAction(deleteSecrets.toBuilder()
+ .continueOn(true)
+ .build())
+ .addAction(waitForSecretsDeletion.toBuilder()
+ .onSuccess(ignore -> success.set(true))
+ .build())
+ .run();
+
+ if (!success.get()) {
+ throw new RuntimeException(String.format("Failed to delete secrets for function %s", fqfn));
+ }
+ }
+
+ private String createSecret(String token, String tenant, String namespace, String name) throws ApiException, InterruptedException {
+
+ StringBuilder sb = new StringBuilder();
+ Actions.Action createAuthSecret = Actions.Action.builder()
+ .actionName(String.format("Creating authentication secret for function %s/%s/%s", tenant, namespace, name))
+ .numRetries(NUM_RETRIES)
+ .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+ .supplier(() -> {
+ String id = RandomStringUtils.random(5, true, true).toLowerCase();
+ V1Secret v1Secret = new V1Secret()
+ .metadata(new V1ObjectMeta().name(getSecretName(id)))
+ .data(Collections.singletonMap(FUNCTION_AUTH_TOKEN, token.getBytes()));
+ try {
+ coreClient.createNamespacedSecret(kubeNamespace, v1Secret, "true");
+ } catch (ApiException e) {
+ // already exists
+ if (e.getCode() == HTTP_CONFLICT) {
+ return Actions.ActionResult.builder()
+ .errorMsg(String.format("Secret %s already present", id))
+ .success(false)
+ .build();
+ }
+
+ String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
+ return Actions.ActionResult.builder()
+ .success(false)
+ .errorMsg(errorMsg)
+ .build();
+ }
+
+ sb.append(id.toCharArray());
+ return Actions.ActionResult.builder().success(true).build();
+ })
+ .build();
+
+ AtomicBoolean success = new AtomicBoolean(false);
+ Actions.newBuilder()
+ .addAction(createAuthSecret.toBuilder()
+ .onSuccess(ignore -> success.set(true))
+ .build())
+ .run();
+
+ if (!success.get()) {
+ throw new RuntimeException(String.format("Failed to create authentication secret for function %s/%s/%s", tenant, namespace, name));
+ }
+
+ return sb.toString();
+ }
+
+ private String getSecretName(String id) {
+ return "pf-secret-" + id;
+ }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/NoOpFunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/NoOpFunctionAuthProvider.java
new file mode 100644
index 0000000..5b4a0ab
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/NoOpFunctionAuthProvider.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+
+import java.util.Optional;
+
+public class NoOpFunctionAuthProvider implements FunctionAuthProvider{
+ @Override
+ public void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData) {
+
+ }
+
+ @Override
+ public Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name,
+ AuthenticationDataSource authenticationDataSource)
+ throws Exception {
+ return Optional.empty();
+ }
+
+ @Override
+ public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
+
+ }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 140de1c..fb51b36 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -52,6 +52,7 @@ import io.kubernetes.client.models.V1StatefulSetSpec;
import io.kubernetes.client.models.V1Toleration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -80,6 +81,8 @@ import java.util.regex.Pattern;
import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
/**
* Kubernetes based runtime for running functions.
@@ -94,9 +97,6 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
@VisibleForTesting
public class KubernetesRuntime implements Runtime {
- private static int NUM_RETRIES = 5;
- private static long SLEEP_BETWEEN_RETRIES_MS = 500;
-
private static final String ENV_SHARD_ID = "SHARD_ID";
private static final int maxJobNameSize = 55;
private static final Integer GRPC_PORT = 9093;
@@ -134,7 +134,8 @@ public class KubernetesRuntime implements Runtime {
private final String pulsarAdminUrl;
private final SecretsProviderConfigurator secretsProviderConfigurator;
private int percentMemoryPadding;
-
+ private final KubernetesFunctionAuthProvider functionAuthDataCacheProvider;
+ private final AuthenticationConfig authConfig;
KubernetesRuntime(AppsV1Api appsClient,
CoreV1Api coreClient,
@@ -158,7 +159,8 @@ public class KubernetesRuntime implements Runtime {
AuthenticationConfig authConfig,
SecretsProviderConfigurator secretsProviderConfigurator,
Integer expectedMetricsCollectionInterval,
- int percentMemoryPadding) throws Exception {
+ int percentMemoryPadding,
+ KubernetesFunctionAuthProvider functionAuthDataCacheProvider) throws Exception {
this.appsClient = appsClient;
this.coreClient = coreClient;
this.instanceConfig = instanceConfig;
@@ -187,11 +189,16 @@ public class KubernetesRuntime implements Runtime {
break;
}
+ this.authConfig = authConfig;
+
+ this.functionAuthDataCacheProvider = functionAuthDataCacheProvider;
+
this.processArgs = new LinkedList<>();
this.processArgs.addAll(RuntimeUtils.getArgsBeforeCmd(instanceConfig, extraDependenciesDir));
// use exec to to launch function so that it gets launched in the foreground with the same PID as shell
// so that when we kill the pod, the signal will get propagated to the function code
this.processArgs.add("exec");
+
this.processArgs.addAll(
RuntimeUtils.getCmd(
instanceConfig,
@@ -221,16 +228,20 @@ public class KubernetesRuntime implements Runtime {
*/
@Override
public void start() throws Exception {
- submitService();
+
try {
+ submitService();
submitStatefulSet();
+
} catch (Exception e) {
- log.error("Could not submit statefulset for {}/{}/{}, deleting service as well",
+ log.error("Failed start function {}/{}/{} in Kubernetes",
instanceConfig.getFunctionDetails().getTenant(),
instanceConfig.getFunctionDetails().getNamespace(),
instanceConfig.getFunctionDetails().getName(), e);
- deleteService();
+ stop();
+ throw e;
}
+
if (channel == null && stub == null) {
channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];
@@ -363,8 +374,8 @@ public class KubernetesRuntime implements Runtime {
Actions.Action createService = Actions.Action.builder()
.actionName(String.format("Submitting service for function %s", fqfn))
- .numRetries(NUM_RETRIES)
- .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+ .numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
+ .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
final V1Service response;
try {
@@ -428,6 +439,11 @@ public class KubernetesRuntime implements Runtime {
private void submitStatefulSet() throws Exception {
final V1StatefulSet statefulSet = createStatefulSet();
+ // Configure function authentication if needed
+ if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+ functionAuthDataCacheProvider.configureAuthDataStatefulSet(
+ statefulSet, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
+ }
log.info("Submitting the following spec to k8 {}", appsClient.getApiClient().getJSON().serialize(statefulSet));
@@ -435,8 +451,8 @@ public class KubernetesRuntime implements Runtime {
Actions.Action createStatefulSet = Actions.Action.builder()
.actionName(String.format("Submitting statefulset for function %s", fqfn))
- .numRetries(NUM_RETRIES)
- .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+ .numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
+ .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
final V1StatefulSet response;
try {
@@ -482,8 +498,8 @@ public class KubernetesRuntime implements Runtime {
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
Actions.Action deleteStatefulSet = Actions.Action.builder()
.actionName(String.format("Deleting statefulset for function %s", fqfn))
- .numRetries(NUM_RETRIES)
- .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+ .numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
+ .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
Response response;
try {
@@ -531,8 +547,8 @@ public class KubernetesRuntime implements Runtime {
Actions.Action waitForStatefulSetDeletion = Actions.Action.builder()
.actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn))
// set retry period to be about 2x the graceshutdown time
- .numRetries(NUM_RETRIES * 2)
- .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS* 2)
+ .numRetries(KubernetesRuntimeFactory.NUM_RETRIES * 2)
+ .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS* 2)
.supplier(() -> {
V1StatefulSet response;
try {
@@ -560,8 +576,8 @@ public class KubernetesRuntime implements Runtime {
// Need to wait for all pods to die so we can cleanup subscriptions.
Actions.Action waitForStatefulPodsToTerminate = Actions.Action.builder()
.actionName(String.format("Waiting for pods for function %s to terminate", fqfn))
- .numRetries(NUM_RETRIES * 2)
- .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS * 2)
+ .numRetries(KubernetesRuntimeFactory.NUM_RETRIES * 2)
+ .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS * 2)
.supplier(() -> {
String labels = String.format("tenant=%s,namespace=%s,name=%s",
instanceConfig.getFunctionDetails().getTenant(),
@@ -633,8 +649,8 @@ public class KubernetesRuntime implements Runtime {
Actions.Action deleteService = Actions.Action.builder()
.actionName(String.format("Deleting service for function %s", fqfn))
- .numRetries(NUM_RETRIES)
- .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+ .numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
+ .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
final Response response;
try {
@@ -679,8 +695,8 @@ public class KubernetesRuntime implements Runtime {
Actions.Action waitForServiceDeletion = Actions.Action.builder()
.actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn))
- .numRetries(NUM_RETRIES)
- .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+ .numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
+ .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
.supplier(() -> {
V1Service response;
try {
@@ -738,6 +754,28 @@ public class KubernetesRuntime implements Runtime {
}
private List<String> getDownloadCommand(String bkPath, String userCodeFilePath) {
+
+ // add auth plugin and parameters if necessary
+ if (authConfig != null) {
+ if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
+ return Arrays.asList(
+ pulsarRootDir + "/bin/pulsar-admin",
+ "--auth-plugin",
+ authConfig.getClientAuthenticationPlugin(),
+ "--auth-params",
+ authConfig.getClientAuthenticationParameters(),
+ "--admin-url",
+ pulsarAdminUrl,
+ "functions",
+ "download",
+ "--path",
+ bkPath,
+ "--destination-file",
+ userCodeFilePath);
+ }
+ }
+
return Arrays.asList(
pulsarRootDir + "/bin/pulsar-admin",
"--admin-url",
@@ -795,6 +833,7 @@ public class KubernetesRuntime implements Runtime {
statefulSet.spec(statefulSetSpec);
+
return statefulSet;
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 6a551c2..7fa67ce 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -27,12 +27,15 @@ import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1ConfigMap;
import io.kubernetes.client.util.Config;
import java.nio.file.Paths;
+
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
+import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
@@ -44,6 +47,7 @@ import java.util.Timer;
import java.util.TimerTask;
import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
/**
* Kubernetes based function container factory implementation.
@@ -51,6 +55,9 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
@Slf4j
public class KubernetesRuntimeFactory implements RuntimeFactory {
+ static int NUM_RETRIES = 5;
+ static long SLEEP_BETWEEN_RETRIES_MS = 500;
+
@Getter
@Setter
@NoArgsConstructor
@@ -158,6 +165,12 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.functionInstanceMinResources = functionInstanceMinResources;
+ try {
+ setupClient();
+ } catch (Exception e) {
+ log.error("Failed to setup client", e);
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -169,7 +182,6 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String codePkgUrl,
String originalCodeFileName,
Long expectedHealthCheckInterval) throws Exception {
- setupClient();
String instanceFile;
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
@@ -181,6 +193,12 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
default:
throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
}
+
+ // adjust the auth config to support auth
+ if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+ getAuthProvider().configureAuthenticationConfig(authConfig, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
+ }
+
return new KubernetesRuntime(
appsClient,
coreClient,
@@ -204,7 +222,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
authConfig,
secretsProviderConfigurator,
expectedMetricsCollectionInterval,
- this.kubernetesInfo.getPercentMemoryPadding());
+ this.kubernetesInfo.getPercentMemoryPadding(),
+ getAuthProvider());
}
@Override
@@ -215,16 +234,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
KubernetesRuntime.doChecks(functionDetails);
validateMinResourcesRequired(functionDetails);
- try {
- setupClient();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, kubernetesInfo.getJobNamespace(), functionDetails);
}
@VisibleForTesting
- void setupClient() throws Exception {
+ public void setupClient() throws Exception {
if (appsClient == null) {
if (this.kubernetesInfo.getK8Uri() == null) {
log.info("k8Uri is null thus going by defaults");
@@ -309,4 +323,9 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
}
}
}
+
+ @Override
+ public KubernetesFunctionAuthProvider getAuthProvider() {
+ return new KubernetesSecretsTokenAuthProvider(coreClient, kubernetesInfo.jobNamespace);
+ }
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 2b85815..7090967 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -29,6 +29,8 @@ import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import java.nio.file.Paths;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+
/**
* Thread based function container factory implementation.
*/
@@ -125,6 +127,12 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
default:
throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
}
+
+ // configure auth if necessary
+ if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+ getAuthProvider().configureAuthenticationConfig(authConfig, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
+ }
+
return new ProcessRuntime(
instanceConfig,
instanceFile,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index 19a5fc9..77f660f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -35,6 +35,10 @@ public interface Runtime {
void stop() throws Exception;
+ default void terminate() throws Exception {
+ stop();
+ }
+
boolean isAlive();
Throwable getDeathException();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 248afbb..4384f00 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.functions.runtime;
+import org.apache.pulsar.functions.auth.FunctionAuthProvider;
+import org.apache.pulsar.functions.auth.NoOpFunctionAuthProvider;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
@@ -43,7 +45,12 @@ public interface RuntimeFactory extends AutoCloseable {
default void doAdmissionChecks(Function.FunctionDetails functionDetails) { }
+ default FunctionAuthProvider getAuthProvider() throws IllegalAccessException, InstantiationException {
+ return NoOpFunctionAuthProvider.class.newInstance();
+ }
+
@Override
void close();
}
+
\ No newline at end of file
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java
new file mode 100644
index 0000000..3dde48b
--- /dev/null
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+
+public class ClearTextFunctionTokenAuthProviderTest {
+
+ @Test
+ public void testClearTextAuth() throws Exception {
+
+ ClearTextFunctionTokenAuthProvider clearTextFunctionTokenAuthProvider = new ClearTextFunctionTokenAuthProvider();
+
+ Optional<FunctionAuthData> functionAuthData = clearTextFunctionTokenAuthProvider.cacheAuthData("test-tenant",
+ "test-ns", "test-func", new AuthenticationDataSource() {
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return "test-token";
+ }
+ });
+
+ Assert.assertTrue(functionAuthData.isPresent());
+ Assert.assertEquals(functionAuthData.get().getData(), "test-token".getBytes());
+
+ AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
+ clearTextFunctionTokenAuthProvider.configureAuthenticationConfig(authenticationConfig, functionAuthData.get());
+
+ Assert.assertEquals(authenticationConfig.getClientAuthenticationPlugin(), AuthenticationToken.class.getName());
+ Assert.assertEquals(authenticationConfig.getClientAuthenticationParameters(), "token:test-token");
+
+
+ AuthenticationToken authenticationToken = new AuthenticationToken();
+ authenticationToken.configure(authenticationConfig.getClientAuthenticationParameters());
+ Assert.assertEquals(authenticationToken.getAuthData().getCommandData(), "test-token");
+ }
+}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
new file mode 100644
index 0000000..eac937f
--- /dev/null
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1Container;
+import io.kubernetes.client.models.V1PodSpec;
+import io.kubernetes.client.models.V1PodTemplateSpec;
+import io.kubernetes.client.models.V1Secret;
+import io.kubernetes.client.models.V1ServiceAccount;
+import io.kubernetes.client.models.V1StatefulSet;
+import io.kubernetes.client.models.V1StatefulSetSpec;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class KubernetesSecretsTokenAuthProviderTest {
+
+ @Test
+ public void testConfigureAuthDataStatefulSet() {
+
+ CoreV1Api coreV1Api = mock(CoreV1Api.class);
+ KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
+
+
+ V1StatefulSet statefulSet = new V1StatefulSet();
+ statefulSet.setSpec(
+ new V1StatefulSetSpec().template(
+ new V1PodTemplateSpec().spec(
+ new V1PodSpec().containers(
+ Collections.singletonList(new V1Container())))));
+ FunctionAuthData functionAuthData = FunctionAuthData.builder().data("foo".getBytes()).build();
+ kubernetesSecretsTokenAuthProvider.configureAuthDataStatefulSet(statefulSet, functionAuthData);
+
+ Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
+ Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().get(0).getName(), "function-auth");
+ Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().get(0).getSecret().getSecretName(), "pf-secret-foo");
+
+ Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().size(), 1);
+ Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().size(), 1);
+ Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getName(), "function-auth");
+ Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getMountPath(), "/etc/auth");
+ }
+
+ @Test
+ public void testCacheAuthData() throws ApiException {
+ CoreV1Api coreV1Api = mock(CoreV1Api.class);
+ doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), any(), anyString());
+ KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
+ Optional<FunctionAuthData> functionAuthData = kubernetesSecretsTokenAuthProvider.cacheAuthData("test-tenant",
+ "test-ns", "test-func", new AuthenticationDataSource() {
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return "test-token";
+ }
+ });
+
+ Assert.assertTrue(functionAuthData.isPresent());
+ Assert.assertTrue(StringUtils.isNotBlank(new String(functionAuthData.get().getData())));
+ }
+
+ @Test
+ public void configureAuthenticationConfig() {
+ CoreV1Api coreV1Api = mock(CoreV1Api.class);
+ KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
+ AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
+ FunctionAuthData functionAuthData = FunctionAuthData.builder().data("foo".getBytes()).build();
+ kubernetesSecretsTokenAuthProvider.configureAuthenticationConfig(authenticationConfig, functionAuthData);
+
+ Assert.assertEquals(authenticationConfig.getClientAuthenticationPlugin(), AuthenticationToken.class.getName());
+ Assert.assertEquals(authenticationConfig.getClientAuthenticationParameters(), "file:///etc/auth/token");
+ }
+}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
index c97cdb6..8f1172e 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
@@ -165,8 +165,6 @@ public class KubernetesRuntimeFactoryTest {
factory = createKubernetesRuntimeFactory(null, null);
FunctionDetails functionDetails = createFunctionDetails();
factory.doAdmissionChecks(functionDetails);
- verify(factory, times(1)).setupClient();
-
}
@Test
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index ae006cc..246834e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -68,6 +68,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.functions.Utils.FILE;
import static org.apache.pulsar.common.functions.Utils.HTTP;
import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
import static org.apache.pulsar.functions.utils.Utils.getSinkType;
import static org.apache.pulsar.functions.utils.Utils.getSourceType;
@@ -154,6 +155,7 @@ public class FunctionActioner {
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
InstanceConfig instanceConfig = createInstanceConfig(functionDetailsBuilder.build(),
+ instance.getFunctionMetaData().getFunctionAuthSpec(),
instanceId, workerConfig.getPulsarFunctionsCluster());
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile,
@@ -164,7 +166,8 @@ public class FunctionActioner {
}
- InstanceConfig createInstanceConfig(FunctionDetails functionDetails, int instanceId, String clusterName) {
+ InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.FunctionAuthenticationSpec
+ functionAuthSpec, int instanceId, String clusterName) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetails);
// TODO: set correct function id and version when features implemented
@@ -174,6 +177,7 @@ public class FunctionActioner {
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
instanceConfig.setClusterName(clusterName);
+ instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
return instanceConfig;
}
@@ -231,17 +235,9 @@ public class FunctionActioner {
}
}
- public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
+ private void cleanupFunctionFiles(FunctionRuntimeInfo functionRuntimeInfo) {
Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
- FunctionDetails details = functionMetaData.getFunctionDetails();
- log.info("{}/{}/{}-{} Stopping function...", details.getTenant(), details.getNamespace(), details.getName(),
- instance.getInstanceId());
- if (functionRuntimeInfo.getRuntimeSpawner() != null) {
- functionRuntimeInfo.getRuntimeSpawner().close();
- functionRuntimeInfo.setRuntimeSpawner(null);
- }
-
// clean up function package
File pkgDir = new File(
workerConfig.getDownloadDirectory(),
@@ -250,7 +246,7 @@ public class FunctionActioner {
if (pkgDir.exists()) {
try {
MoreFiles.deleteRecursively(
- Paths.get(pkgDir.toURI()), RecursiveDeleteOption.ALLOW_INSECURE);
+ Paths.get(pkgDir.toURI()), RecursiveDeleteOption.ALLOW_INSECURE);
} catch (IOException e) {
log.warn("Failed to delete package for function: {}",
FunctionDetailsUtils.getFullyQualifiedName(functionMetaData.getFunctionDetails()), e);
@@ -258,14 +254,42 @@ public class FunctionActioner {
}
}
+ public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
+ Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
+ FunctionMetaData functionMetaData = instance.getFunctionMetaData();
+ FunctionDetails details = functionMetaData.getFunctionDetails();
+ log.info("{}/{}/{}-{} Stopping function...", details.getTenant(), details.getNamespace(), details.getName(),
+ instance.getInstanceId());
+ if (functionRuntimeInfo.getRuntimeSpawner() != null) {
+ functionRuntimeInfo.getRuntimeSpawner().close();
+ functionRuntimeInfo.setRuntimeSpawner(null);
+ }
+
+ cleanupFunctionFiles(functionRuntimeInfo);
+ }
+
public void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
- FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
- .getFunctionDetails();
- log.info("{}/{}/{}-{} Terminating function...", details.getTenant(), details.getNamespace(), details.getName(),
- functionRuntimeInfo.getFunctionInstance().getInstanceId());
+ FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(details);
+ log.info("{}-{} Terminating function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
+
+ if (functionRuntimeInfo.getRuntimeSpawner() != null) {
+ functionRuntimeInfo.getRuntimeSpawner().close();
+ // cleanup any auth data cached
+ try {
+ functionRuntimeInfo.getRuntimeSpawner()
+ .getRuntimeFactory().getAuthProvider()
+ .cleanUpAuthData(
+ details.getTenant(), details.getNamespace(), details.getName(),
+ getFunctionAuthData(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionAuthSpec()));
+ } catch (Exception e) {
+ log.error("Failed to cleanup auth data for function: {}", fqfn, e);
+ }
+ functionRuntimeInfo.setRuntimeSpawner(null);
+ }
+
+ cleanupFunctionFiles(functionRuntimeInfo);
- stopFunction(functionRuntimeInfo);
//cleanup subscriptions
if (details.getSource().getCleanupSubscription()) {
Map<String, Function.ConsumerSpec> consumerSpecMap = details.getSource().getInputSpecsMap();
@@ -309,8 +333,8 @@ public class FunctionActioner {
SubscriptionStats sub = stats.subscriptions.get(InstanceUtils.getDefaultSubscriptionName(details));
if (sub != null) {
existingConsumers = sub.consumers.stream()
- .map(consumerStats -> consumerStats.metadata)
- .collect(Collectors.toList());
+ .map(consumerStats -> consumerStats.metadata)
+ .collect(Collectors.toList());
}
} catch (PulsarAdminException e1) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 532d852..18ac5b2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -24,6 +24,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.UriInfo;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -52,4 +53,8 @@ public class FunctionApiResource implements Supplier<WorkerService> {
? (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
: null;
}
+
+ public AuthenticationDataHttps clientAuthData() {
+ return (AuthenticationDataHttps) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName);
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 2bae728..23f479a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -26,6 +26,7 @@ import static org.apache.pulsar.functions.utils.Reflections.createInstance;
import com.google.gson.Gson;
+import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
@@ -43,6 +44,7 @@ import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -50,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
@@ -77,6 +78,7 @@ import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -93,6 +95,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
@@ -292,7 +295,8 @@ public abstract class ComponentImpl {
final String functionPkgUrl,
final String functionDetailsJson,
final String componentConfigJson,
- final String clientRole) {
+ final String clientRole,
+ AuthenticationDataHttps clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
@@ -377,8 +381,30 @@ public abstract class ComponentImpl {
// function state
FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
- .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
+ .setFunctionDetails(functionDetails)
+ .setCreateTime(System.currentTimeMillis())
+ .setVersion(0);
+ // cache auth if need
+ if (clientAuthenticationDataHttps != null) {
+ try {
+ Optional<FunctionAuthData> functionAuthData = worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .cacheAuthData(tenant, namespace, componentName, clientAuthenticationDataHttps);
+
+ if (functionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+ Function.FunctionAuthenticationSpec.newBuilder()
+ .setData(ByteString.copyFrom(functionAuthData.get().getData()))
+ .build());
+ }
+ } catch (Exception e) {
+ log.error("Error caching authentication data for {} {}/{}/{}", componentType, tenant, namespace, componentName, e);
+
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", componentType, componentName, e.getMessage()));
+ }
+ }
PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
try {
@@ -494,6 +520,7 @@ public abstract class ComponentImpl {
String existingComponentConfigJson;
FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+
if (componentType.equals(FUNCTION)) {
FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
existingComponentConfigJson = new Gson().toJson(existingFunctionConfig);
@@ -573,9 +600,9 @@ public abstract class ComponentImpl {
throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()));
}
- // function state
- FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
- .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
+ // merge from existing metadata
+ FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+ .setFunctionDetails(functionDetails);
PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
if (isNotBlank(functionPkgUrl) || uploadedInputStreamAsFile != null) {
@@ -590,6 +617,7 @@ public abstract class ComponentImpl {
}
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
updateRequest(functionMetaDataBuilder.build());
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index c71e1c2..45fbbf2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -90,7 +90,7 @@ public class FunctionsImplV2 {
uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String
functionDetailsJson, String functionConfigJson, String clientAppId) {
delegate.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId);
+ functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId, null);
return Response.ok().build();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
index 4d27134..0415be2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
@@ -69,7 +69,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
final @FormDataParam("functionConfig") String functionConfigJson) {
functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, functionConfigJson, clientAppId());
+ functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
index ee7d1a4..e5d257f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
@@ -34,7 +33,6 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -62,7 +60,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
final @FormDataParam("sinkConfig") String sinkConfigJson) {
sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, sinkConfigJson, clientAppId());
+ functionPkgUrl, null, sinkConfigJson, clientAppId(), clientAuthData());
}
@PUT
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index f2b1936..e07bfe0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -60,7 +60,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
final @FormDataParam("sourceConfig") String sourceConfigJson) {
source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, sourceConfigJson, clientAppId());
+ functionPkgUrl, null, sourceConfigJson, clientAppId(), clientAuthData());
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index a66ba90..91fec23 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -48,7 +48,14 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@Slf4j
public class FunctionRuntimeManagerTest {
@@ -663,7 +670,9 @@ public class FunctionRuntimeManagerTest {
public void testExternallyManagedRuntimeUpdate() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setKubernetesContainerFactory(new WorkerConfig.KubernetesContainerFactory());
+ workerConfig.setKubernetesContainerFactory(
+ new WorkerConfig.KubernetesContainerFactory()
+ .setSubmittingInsidePod(false));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setPulsarFunctionsCluster("cluster");
@@ -679,8 +688,8 @@ public class FunctionRuntimeManagerTest {
doReturn(pulsarClient).when(workerService).getClient();
doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
-
KubernetesRuntimeFactory kubernetesRuntimeFactory = mock(KubernetesRuntimeFactory.class);
+ doNothing().when(kubernetesRuntimeFactory).setupClient();
doReturn(true).when(kubernetesRuntimeFactory).externallyManaged();
doReturn(mock(KubernetesRuntime.class)).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index bdfb50e..e02b253 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -487,7 +487,7 @@ public class FunctionApiV3ResourceTest {
functionPkgUrl,
null,
new Gson().toJson(functionConfig),
- null);
+ null, null);
}
@@ -502,7 +502,7 @@ public class FunctionApiV3ResourceTest {
null,
null,
new Gson().toJson(functionConfig),
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function already exists")
@@ -1446,7 +1446,7 @@ public class FunctionApiV3ResourceTest {
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
- null, new Gson().toJson(functionConfig), null);
+ null, new Gson().toJson(functionConfig), null, null);
}
@@ -1478,7 +1478,7 @@ public class FunctionApiV3ResourceTest {
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl,
- null, new Gson().toJson(functionConfig), null);
+ null, new Gson().toJson(functionConfig), null, null);
}
public static FunctionConfig createDefaultFunctionConfig() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 9e55800..1b475ed 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -420,7 +420,7 @@ public class SinkApiV3ResourceTest {
pkgUrl,
null,
new Gson().toJson(sinkConfig),
- null);
+ null, null);
}
@@ -435,7 +435,7 @@ public class SinkApiV3ResourceTest {
null,
null,
new Gson().toJson(sinkConfig),
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink already exists")
@@ -529,7 +529,7 @@ public class SinkApiV3ResourceTest {
null,
null,
new Gson().toJson(sinkConfig),
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register")
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 6ba8914..43f3303 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -389,7 +389,7 @@ public class SourceApiV3ResourceTest {
pkgUrl,
null,
new Gson().toJson(sourceConfig),
- null);
+ null, null);
}
@@ -404,7 +404,7 @@ public class SourceApiV3ResourceTest {
null,
null,
new Gson().toJson(sourceConfig),
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source test-source already exists")
@@ -498,7 +498,7 @@ public class SourceApiV3ResourceTest {
null,
null,
new Gson().toJson(sourceConfig),
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to register")