You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/04/08 06:06:43 UTC
[pulsar] branch master updated: fix: bug when function auth is not
enabled (#3980)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 23926ad fix: bug when function auth is not enabled (#3980)
23926ad is described below
commit 23926ad5ee60db744a8dcd21dff4ab991acfac2b
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Apr 7 23:06:39 2019 -0700
fix: bug when function auth is not enabled (#3980)
* fix bug when function auth is not enabled
* add comment and tests
* fix bug
* improving tests
---
.../pulsar/functions/worker/FunctionActioner.java | 29 +++++++----
.../functions/worker/FunctionActionerTest.java | 59 ++++++++++++++++++++++
2 files changed, 79 insertions(+), 9 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 1488cf2..175d6e5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -154,8 +154,14 @@ public class FunctionActioner {
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
+ // check to make sure functionAuthenticationSpec has any data. If not set to null, since for protobuf,
+ // even if the field is not set its not going to be null. Have to use the "has" method to check
+ Function.FunctionAuthenticationSpec functionAuthenticationSpec
+ = instance.getFunctionMetaData().hasFunctionAuthSpec()
+ ? instance.getFunctionMetaData().getFunctionAuthSpec() : null;
+
InstanceConfig instanceConfig = createInstanceConfig(functionDetailsBuilder.build(),
- instance.getFunctionMetaData().getFunctionAuthSpec(),
+ functionAuthenticationSpec,
instanceId, workerConfig.getPulsarFunctionsCluster());
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile,
@@ -275,15 +281,20 @@ public class FunctionActioner {
if (functionRuntimeInfo.getRuntimeSpawner() != null) {
functionRuntimeInfo.getRuntimeSpawner().close();
+
// cleanup any auth data cached
- try {
- functionRuntimeInfo.getRuntimeSpawner()
- .getRuntimeFactory().getAuthProvider()
- .cleanUpAuthData(
- details.getTenant(), details.getNamespace(), details.getName(),
- getFunctionAuthData(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionAuthSpec()));
- } catch (Exception e) {
- log.error("Failed to cleanup auth data for function: {}", fqfn, e);
+ if (functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec() != null) {
+ try {
+ log.info("{}-{} Cleaning up authentication data for function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
+ functionRuntimeInfo.getRuntimeSpawner()
+ .getRuntimeFactory().getAuthProvider()
+ .cleanUpAuthData(
+ details.getTenant(), details.getNamespace(), details.getName(),
+ getFunctionAuthData(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionAuthSpec()));
+
+ } catch (Exception e) {
+ log.error("Failed to cleanup auth data for function: {}", fqfn, e);
+ }
}
functionRuntimeInfo.setRuntimeSpawner(null);
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index cd636d4..c93569b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -19,19 +19,23 @@
package org.apache.pulsar.functions.worker;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.testng.annotations.Test;
import static org.apache.pulsar.common.functions.Utils.FILE;
@@ -147,4 +151,59 @@ public class FunctionActionerTest {
}
}
+ @Test
+ public void testFunctionAuthDisabled() throws Exception {
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+ String downloadDir = this.getClass().getProtectionDomain().getCodeSource().getLocation().getPath();
+ workerConfig.setDownloadDirectory(downloadDir);
+
+ RuntimeFactory factory = mock(RuntimeFactory.class);
+ Runtime runtime = mock(Runtime.class);
+ doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
+ doNothing().when(runtime).start();
+ Namespace dlogNamespace = mock(Namespace.class);
+ final String exceptionMsg = "dl namespace not-found";
+ doThrow(new IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any());
+
+ @SuppressWarnings("resource")
+ FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
+ new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
+
+
+ String pkgPathLocation = "http://invalid/my-file.jar";
+ Function.FunctionMetaData functionMeta = Function.FunctionMetaData.newBuilder()
+ .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
+ .setNamespace("test-namespace").setName("func-1"))
+ .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build())
+ .build();
+
+ Function.Instance instance = Function.Instance.newBuilder()
+ .setFunctionMetaData(functionMeta).build();
+
+ RuntimeSpawner runtimeSpawner = spy(actioner.getRuntimeSpawner(instance, "foo"));
+
+ assertTrue(runtimeSpawner.getInstanceConfig().getFunctionAuthenticationSpec() == null);
+
+ FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
+
+ RuntimeFactory runtimeFactory = mock(RuntimeFactory.class);
+
+ FunctionAuthProvider functionAuthProvider = mock(FunctionAuthProvider.class);
+ doReturn(functionAuthProvider).when(runtimeFactory).getAuthProvider();
+
+ doReturn(runtimeFactory).when(runtimeSpawner).getRuntimeFactory();
+ doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+ doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
+
+ actioner.terminateFunction(functionRuntimeInfo);
+
+ // make sure cache
+ verify(functionAuthProvider, times(0)).cleanUpAuthData(any(), any(), any(), any());
+ }
+
}