You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:24:41 UTC
[pulsar] branch master updated: [pulsar-function] fix backward
compatibility with 2.2 auth not working (#4241)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4d75f1f [pulsar-function] fix backward compatibility with 2.2 auth not working (#4241)
4d75f1f is described below
commit 4d75f1f129c1771eea60dfee5f4655979121e311
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat May 18 17:24:35 2019 -0700
[pulsar-function] fix backward compatibility with 2.2 auth not working (#4241)
* [pulsar-function] fix backward compatibility with 2.2 auth not working
* Fix test
* remove authentication from upload and download api
---
.../apache/pulsar/broker/admin/v2/Functions.java | 23 ++++++------
.../functions/worker/rest/api/FunctionsImplV2.java | 42 +++++++++++-----------
.../worker/rest/api/v2/FunctionApiV2Resource.java | 27 +++++++-------
.../rest/api/v2/FunctionApiV2ResourceTest.java | 12 +++----
.../rest/api/v3/FunctionApiV3ResourceTest.java | 13 +++++--
5 files changed, 65 insertions(+), 52 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
index 25995d2..051bcc6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
@@ -142,7 +142,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionInfo(
- tenant, namespace, functionName);
+ tenant, namespace, functionName, clientAppId());
}
@GET
@@ -161,7 +161,8 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
- return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri(),
+ clientAppId());
}
@GET
@@ -178,7 +179,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStatusV2(
- tenant, namespace, functionName, uri.getRequestUri());
+ tenant, namespace, functionName, uri.getRequestUri(), clientAppId());
}
@GET
@@ -194,7 +195,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
@Path("/{tenant}/{namespace}")
public Response listFunctions(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
- return functions.listFunctions( tenant, namespace);
+ return functions.listFunctions( tenant, namespace, clientAppId());
}
@POST
@@ -216,7 +217,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
final @FormDataParam("data") String triggerValue,
final @FormDataParam("dataStream") InputStream triggerStream,
final @FormDataParam("topic") String topic) {
- return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+ return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, clientAppId());
}
@GET
@@ -235,7 +236,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("key") String key) {
- return functions.getFunctionState(tenant, namespace, functionName, key);
+ return functions.getFunctionState(tenant, namespace, functionName, key, clientAppId());
}
@POST
@@ -248,7 +249,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId());
}
@POST
@@ -260,7 +261,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
@Consumes(MediaType.APPLICATION_JSON)
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
- return functions.restartFunctionInstances(tenant, namespace, functionName);
+ return functions.restartFunctionInstances(tenant, namespace, functionName, clientAppId());
}
@POST
@@ -273,7 +274,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId());
}
@POST
@@ -285,7 +286,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
@Consumes(MediaType.APPLICATION_JSON)
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
- return functions.stopFunctionInstances(tenant, namespace, functionName);
+ return functions.stopFunctionInstances(tenant, namespace, functionName, clientAppId());
}
@POST
@@ -307,7 +308,7 @@ public class Functions extends AdminResource implements Supplier<WorkerService>
)
@Path("/download")
public Response downloadFunction(final @QueryParam("path") String path) {
- return functions.downloadFunction(path);
+ return functions.downloadFunction(path, clientAppId());
}
@GET
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index d8b8a4f..b1da329 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -57,11 +57,11 @@ public class FunctionsImplV2 {
this.delegate = delegate;
}
- public Response getFunctionInfo(final String tenant, final String namespace, final String functionName)
+ public Response getFunctionInfo(final String tenant, final String namespace, final String functionName, String clientRole)
throws IOException {
// run just for parameter checks
- delegate.getFunctionInfo(tenant, namespace, functionName, null, null);
+ delegate.getFunctionInfo(tenant, namespace, functionName, clientRole, null);
FunctionMetaDataManager functionMetaDataManager = delegate.worker().getFunctionMetaDataManager();
@@ -72,18 +72,18 @@ public class FunctionsImplV2 {
}
public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
- final String instanceId, URI uri) throws IOException {
+ final String instanceId, URI uri, String clientRole) throws IOException {
org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
- functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri, null, null);
+ functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri, clientRole, null);
String jsonResponse = FunctionCommon.printJson(toProto(functionInstanceStatus, instanceId));
return Response.status(Response.Status.OK).entity(jsonResponse).build();
}
- public Response getFunctionStatusV2(String tenant, String namespace, String functionName, URI requestUri) throws
+ public Response getFunctionStatusV2(String tenant, String namespace, String functionName, URI requestUri, String clientRole) throws
IOException {
- FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri, null, null);
+ FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri, clientRole, null);
InstanceCommunication.FunctionStatusList.Builder functionStatusList = InstanceCommunication.FunctionStatusList.newBuilder();
functionStatus.instances.forEach(functionInstanceStatus -> functionStatusList.addFunctionStatusList(
toProto(functionInstanceStatus.getStatus(),
@@ -142,20 +142,20 @@ public class FunctionsImplV2 {
return Response.ok().build();
}
- public Response listFunctions(String tenant, String namespace) {
- Collection<String> functionStateList = delegate.listFunctions( tenant, namespace, null, null);
+ public Response listFunctions(String tenant, String namespace, String clientRole) {
+ Collection<String> functionStateList = delegate.listFunctions( tenant, namespace, clientRole, null);
return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
}
public Response triggerFunction(String tenant, String namespace, String functionName, String triggerValue,
- InputStream triggerStream, String topic) {
- String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, null, null);
+ InputStream triggerStream, String topic, String clientRole) {
+ String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, clientRole, null);
return Response.status(Response.Status.OK).entity(result).build();
}
- public Response getFunctionState(String tenant, String namespace, String functionName, String key) {
+ public Response getFunctionState(String tenant, String namespace, String functionName, String key, String clientRole) {
FunctionState functionState = delegate.getFunctionState(
- tenant, namespace, functionName, key, null, null);
+ tenant, namespace, functionName, key, clientRole, null);
String value;
if (functionState.getNumberValue() != null) {
@@ -169,24 +169,24 @@ public class FunctionsImplV2 {
}
public Response restartFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
- uri) {
- delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri, null, null);
+ uri, String clientRole) {
+ delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri, clientRole, null);
return Response.ok().build();
}
- public Response restartFunctionInstances(String tenant, String namespace, String functionName) {
- delegate.restartFunctionInstances(tenant, namespace, functionName, null, null);
+ public Response restartFunctionInstances(String tenant, String namespace, String functionName, String clientRole) {
+ delegate.restartFunctionInstances(tenant, namespace, functionName, clientRole, null);
return Response.ok().build();
}
public Response stopFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
- uri) {
- delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri, null ,null);
+ uri, String clientRole) {
+ delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri, clientRole ,null);
return Response.ok().build();
}
- public Response stopFunctionInstances(String tenant, String namespace, String functionName) {
- delegate.stopFunctionInstances(tenant, namespace, functionName, null, null);
+ public Response stopFunctionInstances(String tenant, String namespace, String functionName, String clientRole) {
+ delegate.stopFunctionInstances(tenant, namespace, functionName, clientRole, null);
return Response.ok().build();
}
@@ -195,7 +195,7 @@ public class FunctionsImplV2 {
return Response.ok().build();
}
- public Response downloadFunction(String path) {
+ public Response downloadFunction(String path, String clientRole) {
return Response.status(Response.Status.OK).entity(delegate.downloadFunction(path)).build();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 23d3b4f..c978cad 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -132,7 +132,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionInfo(
- tenant, namespace, functionName);
+ tenant, namespace, functionName, clientAppId());
}
@GET
@@ -151,7 +151,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
- return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri(),
+ clientAppId());
}
@GET
@@ -167,8 +168,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public Response getFunctionStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
- return functions.getFunctionStatusV2(
- tenant, namespace, functionName, uri.getRequestUri());
+ return functions.getFunctionStatusV2(tenant, namespace, functionName, uri.getRequestUri(), clientAppId());
}
@GET
@@ -184,7 +184,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
@Path("/{tenant}/{namespace}")
public Response listFunctions(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
- return functions.listFunctions( tenant, namespace);
+ return functions.listFunctions(tenant, namespace, clientAppId());
}
@POST
@@ -206,7 +206,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @FormDataParam("data") String triggerValue,
final @FormDataParam("dataStream") InputStream triggerStream,
final @FormDataParam("topic") String topic) {
- return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+ return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic,
+ clientAppId());
}
@GET
@@ -225,7 +226,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("key") String key) {
- return functions.getFunctionState(tenant, namespace, functionName, key);
+ return functions.getFunctionState(tenant, namespace, functionName, key, clientAppId());
}
@POST
@@ -238,7 +239,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(),
+ clientAppId());
}
@POST
@@ -250,7 +252,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
@Consumes(MediaType.APPLICATION_JSON)
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
- return functions.restartFunctionInstances(tenant, namespace, functionName);
+ return functions.restartFunctionInstances(tenant, namespace, functionName, clientAppId());
}
@POST
@@ -263,7 +265,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(),
+ clientAppId());
}
@POST
@@ -275,7 +278,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
@Consumes(MediaType.APPLICATION_JSON)
public Response stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
- return functions.stopFunctionInstances(tenant, namespace, functionName);
+ return functions.stopFunctionInstances(tenant, namespace, functionName, clientAppId());
}
@POST
@@ -297,7 +300,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
)
@Path("/download")
public Response downloadFunction(final @QueryParam("path") String path) {
- return functions.downloadFunction(path);
+ return functions.downloadFunction(path, clientAppId());
}
@GET
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 053b19d..b05b0cd 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -1305,7 +1305,7 @@ public class FunctionApiV2ResourceTest {
resource.getFunctionInfo(
tenant,
namespace,
- function
+ function, null
);
}
@@ -1314,7 +1314,7 @@ public class FunctionApiV2ResourceTest {
String json = (String) resource.getFunctionInfo(
tenant,
namespace,
- function
+ function, null
).getEntity();
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
mergeJson(json, functionDetailsBuilder);
@@ -1399,7 +1399,7 @@ public class FunctionApiV2ResourceTest {
) {
resource.listFunctions(
tenant,
- namespace
+ namespace, null
);
}
@@ -1407,7 +1407,7 @@ public class FunctionApiV2ResourceTest {
private List<String> listDefaultFunctions() {
return new Gson().fromJson((String) resource.listFunctions(
tenant,
- namespace
+ namespace, null
).getEntity(), List.class);
}
@@ -1434,7 +1434,7 @@ public class FunctionApiV2ResourceTest {
String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
- StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction(jarHttpUrl).getEntity();
+ StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction(jarHttpUrl, null).getEntity();
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
@@ -1449,7 +1449,7 @@ public class FunctionApiV2ResourceTest {
String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
- StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction("file://" + fileLocation).getEntity();
+ StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction("file://" + fileLocation, null).getEntity();
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 1eff4e8..c364906 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -76,6 +76,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@@ -1413,7 +1414,11 @@ public class FunctionApiV3ResourceTest {
public void testDownloadFunctionHttpUrl() throws Exception {
String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- FunctionsImpl function = new FunctionsImpl(null);
+ WorkerService worker = mock(WorkerService.class);
+ WorkerConfig config = mock(WorkerConfig.class);
+ when(config.isAuthorizationEnabled()).thenReturn(false);
+ when(worker.getWorkerConfig()).thenReturn(config);
+ FunctionsImpl function = new FunctionsImpl(()-> worker);
StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
@@ -1428,7 +1433,11 @@ public class FunctionApiV3ResourceTest {
public void testDownloadFunctionFile() throws Exception {
String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- FunctionsImpl function = new FunctionsImpl(null);
+ WorkerService worker = mock(WorkerService.class);
+ WorkerConfig config = mock(WorkerConfig.class);
+ when(config.isAuthorizationEnabled()).thenReturn(false);
+ when(worker.getWorkerConfig()).thenReturn(config);
+ FunctionsImpl function = new FunctionsImpl(() -> worker);
StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);