You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/08 22:03:14 UTC
[pulsar] 08/13: Made pulsar admin to use Source/Sink endpoints
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch srkukarni/serverside_validation_endpoints
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4334d628fbffebc39d6d162911e1c9532c6fea83
Author: Sanjeev Kulkarni <sa...@streaml.io>
AuthorDate: Mon Oct 8 09:32:34 2018 -0700
Made pulsar admin to use Source/Sink endpoints
---
pulsar-client-admin/pom.xml | 6 +
.../apache/pulsar/client/admin/PulsarAdmin.java | 36 +--
.../java/org/apache/pulsar/client/admin/Sink.java | 277 +++++++++++++++++++++
.../org/apache/pulsar/client/admin/Source.java | 277 +++++++++++++++++++++
.../pulsar/client/admin/internal/SinkImpl.java | 270 ++++++++++++++++++++
.../pulsar/client/admin/internal/SourceImpl.java | 270 ++++++++++++++++++++
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 24 +-
.../org/apache/pulsar/admin/cli/CmdSources.java | 24 +-
8 files changed, 1147 insertions(+), 37 deletions(-)
diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml
index b42ea2c..ac39524 100644
--- a/pulsar-client-admin/pom.xml
+++ b/pulsar-client-admin/pom.xml
@@ -46,6 +46,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index a1502b0..0953d22 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -32,21 +32,7 @@ import javax.ws.rs.client.WebTarget;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.pulsar.client.admin.internal.BookiesImpl;
-import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
-import org.apache.pulsar.client.admin.internal.BrokersImpl;
-import org.apache.pulsar.client.admin.internal.ClustersImpl;
-import org.apache.pulsar.client.admin.internal.FunctionsImpl;
-import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
-import org.apache.pulsar.client.admin.internal.LookupImpl;
-import org.apache.pulsar.client.admin.internal.NamespacesImpl;
-import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
-import org.apache.pulsar.client.admin.internal.SchemasImpl;
-import org.apache.pulsar.client.admin.internal.TopicsImpl;
-import org.apache.pulsar.client.admin.internal.WorkerImpl;
-import org.apache.pulsar.client.admin.internal.TenantsImpl;
-import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
-import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
+import org.apache.pulsar.client.admin.internal.*;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
@@ -85,6 +71,8 @@ public class PulsarAdmin implements Closeable {
private final String serviceUrl;
private final Lookup lookups;
private final Functions functions;
+ private final Source source;
+ private final Sink sink;
private final Worker worker;
private final Schemas schemas;
protected final WebTarget root;
@@ -189,6 +177,8 @@ public class PulsarAdmin implements Closeable {
this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls);
this.functions = new FunctionsImpl(root, auth);
+ this.source = new SourceImpl(root, auth);
+ this.sink = new SinkImpl(root, auth);
this.worker = new WorkerImpl(root, auth);
this.schemas = new SchemasImpl(root, auth);
this.bookies = new BookiesImpl(root, auth);
@@ -358,6 +348,22 @@ public class PulsarAdmin implements Closeable {
}
/**
+ *
+ * @return the source management object
+ */
+ public Source source() {
+ return source;
+ }
+
+ /**
+ *
+ * @return the sink management object
+ */
+ public Sink sink() {
+ return sink;
+ }
+
+ /**
*
* @return the Worker stats
*/
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
new file mode 100644
index 0000000..3f8fe2f
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SinkConfig;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Admin interface for Sink management.
+ */
+public interface Sink {
+ /**
+ * Get the list of sinks.
+ * <p>
+ * Get the list of all the Pulsar Sinks.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>["f1", "f2", "f3"]</code>
+ * </pre>
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ List<String> getSinks(String tenant, String namespace) throws PulsarAdminException;
+
+ /**
+ * Get the configuration for the specified sink.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code>
+ * </pre>
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param sink
+ * Sink name
+ *
+ * @return the sink configuration
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission to get the configuration of the cluster
+ * @throws NotFoundException
+ * Cluster doesn't exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ Function.FunctionDetails getSink(String tenant, String namespace, String sink) throws PulsarAdminException;
+
+ /**
+ * Create a new sink.
+ *
+ * @param sinkConfig
+ * the sink configuration object
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException;
+
+ /**
+ * <pre>
+ * Create a new sink by providing url from which fun-pkg can be downloaded. supported url: http/file
+ * eg:
+ * File: file:/dir/fileName.jar
+ * Http: http://www.repo.com/fileName.jar
+ * </pre>
+ *
+ * @param sinkConfig
+ * the sink configuration object
+ * @param pkgUrl
+ * url from which pkg can be downloaded
+ * @throws PulsarAdminException
+ */
+ void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException;
+
+ /**
+ * Update the configuration for a sink.
+ * <p>
+ *
+ * @param sinkConfig
+ * the sink configuration object
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission to create the cluster
+ * @throws NotFoundException
+ * Cluster doesn't exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException;
+
+ /**
+ * Update the configuration for a sink.
+ * <pre>
+ * Update a sink by providing url from which fun-pkg can be downloaded. supported url: http/file
+ * eg:
+ * File: file:/dir/fileName.jar
+ * Http: http://www.repo.com/fileName.jar
+ * </pre>
+ *
+ * @param sinkConfig
+ * the sink configuration object
+ * @param pkgUrl
+ * url from which pkg can be downloaded
+ * @throws NotAuthorizedException
+ * You don't have admin permission to create the cluster
+ * @throws NotFoundException
+ * Cluster doesn't exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException;
+
+ /**
+ * Delete an existing sink
+ * <p>
+ * Delete a sink
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param sink
+ * Sink name
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission
+ * @throws NotFoundException
+ * Cluster does not exist
+ * @throws PreconditionFailedException
+ * Cluster is not empty
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void deleteSink(String tenant, String namespace, String sink) throws PulsarAdminException;
+
+ /**
+ * Gets the current status of a sink.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param sink
+ * Sink name
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ FunctionStatusList getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminException;
+
+ /**
+ * Gets the current status of a sink instance.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param sink
+ * Sink name
+ * @param id
+ * Sink instance-id
+ * @return
+ * @throws PulsarAdminException
+ */
+ FunctionStatus getSinkStatus(String tenant, String namespace, String sink, int id)
+ throws PulsarAdminException;
+
+ /**
+ * Restart sink instance
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param sink
+ * Sink name
+ *
+ * @param instanceId
+ * Sink instanceId
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException;
+
+ /**
+ * Restart all sink instances
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param sink
+ * Sink name
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void restartSink(String tenant, String namespace, String sink) throws PulsarAdminException;
+
+
+ /**
+ * Stop sink instance
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param sink
+ * Sink name
+ *
+ * @param instanceId
+ * Sink instanceId
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminException;
+
+ /**
+ * Stop all sink instances
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param sink
+ * Sink name
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void stopSink(String tenant, String namespace, String sink) throws PulsarAdminException;
+
+ /**
+ * Fetches a list of supported Pulsar IO sinks currently running in cluster mode
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ *
+ */
+ List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException;
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
new file mode 100644
index 0000000..3c43cf2
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
+import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SourceConfig;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Admin interface for Source management.
+ */
+public interface Source {
+ /**
+ * Get the list of sources.
+ * <p>
+ * Get the list of all the Pulsar Sources.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>["f1", "f2", "f3"]</code>
+ * </pre>
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ List<String> getSources(String tenant, String namespace) throws PulsarAdminException;
+
+ /**
+ * Get the configuration for the specified source.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code>
+ * </pre>
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param source
+ * Source name
+ *
+ * @return the source configuration
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission to get the configuration of the cluster
+ * @throws NotFoundException
+ * Cluster doesn't exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ Function.FunctionDetails getSource(String tenant, String namespace, String source) throws PulsarAdminException;
+
+ /**
+ * Create a new source.
+ *
+ * @param sourceConfig
+ * the source configuration object
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException;
+
+ /**
+ * <pre>
+ * Create a new source by providing url from which fun-pkg can be downloaded. supported url: http/file
+ * eg:
+ * File: file:/dir/fileName.jar
+ * Http: http://www.repo.com/fileName.jar
+ * </pre>
+ *
+ * @param sourceConfig
+ * the source configuration object
+ * @param pkgUrl
+ * url from which pkg can be downloaded
+ * @throws PulsarAdminException
+ */
+ void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException;
+
+ /**
+ * Update the configuration for a source.
+ * <p>
+ *
+ * @param sourceConfig
+ * the source configuration object
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission to create the cluster
+ * @throws NotFoundException
+ * Cluster doesn't exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException;
+
+ /**
+ * Update the configuration for a source.
+ * <pre>
+ * Update a source by providing url from which fun-pkg can be downloaded. supported url: http/file
+ * eg:
+ * File: file:/dir/fileName.jar
+ * Http: http://www.repo.com/fileName.jar
+ * </pre>
+ *
+ * @param sourceConfig
+ * the source configuration object
+ * @param pkgUrl
+ * url from which pkg can be downloaded
+ * @throws NotAuthorizedException
+ * You don't have admin permission to create the cluster
+ * @throws NotFoundException
+ * Cluster doesn't exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException;
+
+ /**
+ * Delete an existing source
+ * <p>
+ * Delete a source
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param source
+ * Source name
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission
+ * @throws NotFoundException
+ * Cluster does not exist
+ * @throws PreconditionFailedException
+ * Cluster is not empty
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void deleteSource(String tenant, String namespace, String source) throws PulsarAdminException;
+
+ /**
+ * Gets the current status of a source.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param source
+ * Source name
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ FunctionStatusList getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminException;
+
+ /**
+ * Gets the current status of a source instance.
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param source
+ * Source name
+ * @param id
+ * Source instance-id
+ * @return
+ * @throws PulsarAdminException
+ */
+ FunctionStatus getSourceStatus(String tenant, String namespace, String source, int id)
+ throws PulsarAdminException;
+
+ /**
+ * Restart source instance
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param source
+ * Source name
+ *
+ * @param instanceId
+ * Source instanceId
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException;
+
+ /**
+ * Restart all source instances
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param source
+ * Source name
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void restartSource(String tenant, String namespace, String source) throws PulsarAdminException;
+
+
+ /**
+ * Stop source instance
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param source
+ * Source name
+ *
+ * @param instanceId
+ * Source instanceId
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminException;
+
+ /**
+ * Stop all source instances
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param source
+ * Source name
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void stopSource(String tenant, String namespace, String source) throws PulsarAdminException;
+
+ /**
+ * Fetches a list of supported Pulsar IO sources currently running in cluster mode
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ *
+ */
+ List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException;
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
new file mode 100644
index 0000000..b74f4ff
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import com.google.gson.Gson;
+import com.google.protobuf.AbstractMessage.Builder;
+import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Sink;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SinkConfig;
+import org.glassfish.jersey.media.multipart.FormDataBodyPart;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class SinkImpl extends BaseResource implements Sink {
+
+ private final WebTarget sink;
+
+ public SinkImpl(WebTarget web, Authentication auth) {
+ super(auth);
+ this.sink = web.path("/admin/sink");
+ }
+
+ @Override
+ public List<String> getSinks(String tenant, String namespace) throws PulsarAdminException {
+ try {
+ Response response = request(sink.path(tenant).path(namespace)).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ return response.readEntity(new GenericType<List<String>>() {
+ });
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public FunctionDetails getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
+ try {
+ Response response = request(sink.path(tenant).path(namespace).path(sinkName)).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ String jsonResponse = response.readEntity(String.class);
+ FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+ mergeJson(jsonResponse, functionDetailsBuilder);
+ return functionDetailsBuilder.build();
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public FunctionStatusList getSinkStatus(
+ String tenant, String namespace, String sinkName) throws PulsarAdminException {
+ try {
+ Response response = request(sink.path(tenant).path(namespace).path(sinkName).path("status")).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ String jsonResponse = response.readEntity(String.class);
+ FunctionStatusList.Builder functionStatusBuilder = FunctionStatusList.newBuilder();
+ mergeJson(jsonResponse, functionStatusBuilder);
+ return functionStatusBuilder.build();
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public FunctionStatus getSinkStatus(
+ String tenant, String namespace, String sinkName, int id) throws PulsarAdminException {
+ try {
+ Response response = request(
+ sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(id)).path("status"))
+ .get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ String jsonResponse = response.readEntity(String.class);
+ FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
+ mergeJson(jsonResponse, functionStatusBuilder);
+ return functionStatusBuilder.build();
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException {
+ try {
+ final FormDataMultiPart mp = new FormDataMultiPart();
+
+ if (fileName != null && !fileName.startsWith("builtin://")) {
+ // If the function code is built in, we don't need to submit here
+ mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ }
+
+ mp.bodyPart(new FormDataBodyPart("sinkConfig",
+ new Gson().toJson(sinkConfig),
+ MediaType.APPLICATION_JSON_TYPE));
+ request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
+ .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException {
+ try {
+ final FormDataMultiPart mp = new FormDataMultiPart();
+
+ mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+
+ mp.bodyPart(new FormDataBodyPart("sinkConfig",
+ new Gson().toJson(sinkConfig),
+ MediaType.APPLICATION_JSON_TYPE));
+ request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
+ .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void deleteSink(String cluster, String namespace, String function) throws PulsarAdminException {
+ try {
+ request(sink.path(cluster).path(namespace).path(function))
+ .delete(ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException {
+ try {
+ final FormDataMultiPart mp = new FormDataMultiPart();
+
+ if (fileName != null && !fileName.startsWith("builtin://")) {
+ // If the function code is built in, we don't need to submit here
+ mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ }
+
+ mp.bodyPart(new FormDataBodyPart("sinkConfig",
+ new Gson().toJson(sinkConfig),
+ MediaType.APPLICATION_JSON_TYPE));
+ request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
+ .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminException {
+ try {
+ final FormDataMultiPart mp = new FormDataMultiPart();
+
+ mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+
+ mp.bodyPart(new FormDataBodyPart("sinkConfig", new Gson().toJson(sinkConfig),
+ MediaType.APPLICATION_JSON_TYPE));
+ request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace())
+ .path(sinkConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA),
+ ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void restartSink(String tenant, String namespace, String functionName, int instanceId)
+ throws PulsarAdminException {
+ try {
+ request(sink.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+ .path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void restartSink(String tenant, String namespace, String functionName) throws PulsarAdminException {
+ try {
+ request(sink.path(tenant).path(namespace).path(functionName).path("restart"))
+ .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void stopSink(String tenant, String namespace, String sinkName, int instanceId)
+ throws PulsarAdminException {
+ try {
+ request(sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(instanceId))
+ .path("stop")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void stopSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
+ try {
+ request(sink.path(tenant).path(namespace).path(sinkName).path("stop"))
+ .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public List<ConnectorDefinition> getBuiltInSinks() throws PulsarAdminException {
+ try {
+ Response response = request(sink.path("builtinsink")).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ return response.readEntity(new GenericType<List<ConnectorDefinition>>() {});
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+
+ public static void mergeJson(String json, Builder builder) throws IOException {
+ JsonFormat.parser().merge(json, builder);
+ }
+
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
new file mode 100644
index 0000000..3ae0a6f
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import com.google.gson.Gson;
+import com.google.protobuf.AbstractMessage.Builder;
+import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Source;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
+import org.apache.pulsar.functions.utils.SourceConfig;
+import org.glassfish.jersey.media.multipart.FormDataBodyPart;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
+
+import javax.ws.rs.ClientErrorException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.GenericType;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class SourceImpl extends BaseResource implements Source {
+
+ private final WebTarget source;
+
+ public SourceImpl(WebTarget web, Authentication auth) {
+ super(auth);
+ this.source = web.path("/admin/source");
+ }
+
+ @Override
+ public List<String> getSources(String tenant, String namespace) throws PulsarAdminException {
+ try {
+ Response response = request(source.path(tenant).path(namespace)).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ return response.readEntity(new GenericType<List<String>>() {
+ });
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public FunctionDetails getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
+ try {
+ Response response = request(source.path(tenant).path(namespace).path(sourceName)).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ String jsonResponse = response.readEntity(String.class);
+ FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+ mergeJson(jsonResponse, functionDetailsBuilder);
+ return functionDetailsBuilder.build();
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public FunctionStatusList getSourceStatus(
+ String tenant, String namespace, String sourceName) throws PulsarAdminException {
+ try {
+ Response response = request(source.path(tenant).path(namespace).path(sourceName).path("status")).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ String jsonResponse = response.readEntity(String.class);
+ FunctionStatusList.Builder functionStatusBuilder = FunctionStatusList.newBuilder();
+ mergeJson(jsonResponse, functionStatusBuilder);
+ return functionStatusBuilder.build();
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public FunctionStatus getSourceStatus(
+ String tenant, String namespace, String sourceName, int id) throws PulsarAdminException {
+ try {
+ Response response = request(
+ source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(id)).path("status"))
+ .get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ String jsonResponse = response.readEntity(String.class);
+ FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
+ mergeJson(jsonResponse, functionStatusBuilder);
+ return functionStatusBuilder.build();
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException {
+ try {
+ final FormDataMultiPart mp = new FormDataMultiPart();
+
+ if (fileName != null && !fileName.startsWith("builtin://")) {
+ // If the function code is built in, we don't need to submit here
+ mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ }
+
+ mp.bodyPart(new FormDataBodyPart("sourceConfig",
+ new Gson().toJson(sourceConfig),
+ MediaType.APPLICATION_JSON_TYPE));
+ request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
+ .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException {
+ try {
+ final FormDataMultiPart mp = new FormDataMultiPart();
+
+ mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+
+ mp.bodyPart(new FormDataBodyPart("sourceConfig",
+ new Gson().toJson(sourceConfig),
+ MediaType.APPLICATION_JSON_TYPE));
+ request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
+ .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void deleteSource(String cluster, String namespace, String function) throws PulsarAdminException {
+ try {
+ request(source.path(cluster).path(namespace).path(function))
+ .delete(ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException {
+ try {
+ final FormDataMultiPart mp = new FormDataMultiPart();
+
+ if (fileName != null && !fileName.startsWith("builtin://")) {
+ // If the function code is built in, we don't need to submit here
+ mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ }
+
+ mp.bodyPart(new FormDataBodyPart("sourceConfig",
+ new Gson().toJson(sourceConfig),
+ MediaType.APPLICATION_JSON_TYPE));
+ request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
+ .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminException {
+ try {
+ final FormDataMultiPart mp = new FormDataMultiPart();
+
+ mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+
+ mp.bodyPart(new FormDataBodyPart("sourceConfig", new Gson().toJson(sourceConfig),
+ MediaType.APPLICATION_JSON_TYPE));
+ request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace())
+ .path(sourceConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA),
+ ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void restartSource(String tenant, String namespace, String functionName, int instanceId)
+ throws PulsarAdminException {
+ try {
+ request(source.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
+ .path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void restartSource(String tenant, String namespace, String functionName) throws PulsarAdminException {
+ try {
+ request(source.path(tenant).path(namespace).path(functionName).path("restart"))
+ .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void stopSource(String tenant, String namespace, String sourceName, int instanceId)
+ throws PulsarAdminException {
+ try {
+ request(source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(instanceId))
+ .path("stop")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void stopSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
+ try {
+ request(source.path(tenant).path(namespace).path(sourceName).path("stop"))
+ .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public List<ConnectorDefinition> getBuiltInSources() throws PulsarAdminException {
+ try {
+ Response response = request(source.path("builtinsource")).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ return response.readEntity(new GenericType<List<ConnectorDefinition>>() {});
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+
+ public static void mergeJson(String json, Builder builder) throws IOException {
+ JsonFormat.parser().merge(json, builder);
+ }
+
+}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index bd5af79..e56a343 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -40,6 +40,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -79,7 +80,7 @@ public class CmdSinks extends CmdBase {
jcommander.addCommand("update", updateSink);
jcommander.addCommand("delete", deleteSink);
jcommander.addCommand("localrun", localSinkRunner);
- jcommander.addCommand("available-sinks", new ListSinks());
+ jcommander.addCommand("available-sinks", new ListBuiltInSinks());
}
/**
@@ -187,9 +188,9 @@ public class CmdSinks extends CmdBase {
@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(archive)) {
- admin.functions().createFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
+ admin.sink().createSinkWithUrl(sinkConfig, sinkConfig.getArchive());
} else {
- admin.functions().createFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
+ admin.sink().createSink(sinkConfig, sinkConfig.getArchive());
}
print("Created successfully");
}
@@ -200,9 +201,9 @@ public class CmdSinks extends CmdBase {
@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(archive)) {
- admin.functions().updateFunctionWithUrl(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
+ admin.sink().updateSinkWithUrl(sinkConfig, sinkConfig.getArchive());
} else {
- admin.functions().updateFunction(SinkConfigUtils.convert(sinkConfig), sinkConfig.getArchive());
+ admin.sink().updateSink(sinkConfig, sinkConfig.getArchive());
}
print("Updated successfully");
}
@@ -282,6 +283,8 @@ public class CmdSinks extends CmdBase {
protected SinkConfig sinkConfig;
+ protected NarClassLoader classLoader;
+
private void mergeArgs() {
if (!StringUtils.isBlank(DEPRECATED_subsName)) subsName = DEPRECATED_subsName;
if (!StringUtils.isBlank(DEPRECATED_topicsPattern)) topicsPattern = DEPRECATED_topicsPattern;
@@ -449,7 +452,6 @@ public class CmdSinks extends CmdBase {
}
// if jar file is present locally then load jar and validate SinkClass in it
- ClassLoader classLoader = null;
if (archivePath != null) {
if (!fileExists(archivePath)) {
throw new ParameterException("Archive file " + archivePath + " does not exist");
@@ -482,14 +484,14 @@ public class CmdSinks extends CmdBase {
throws IOException {
org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
= org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
- Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig)), functionDetailsBuilder);
+ Utils.mergeJson(FunctionsImpl.printJson(SinkConfigUtils.convert(sinkConfig, classLoader)), functionDetailsBuilder);
return functionDetailsBuilder.build();
}
protected String validateSinkType(String sinkType) throws IOException {
Set<String> availableSinks;
try {
- availableSinks = admin.functions().getSinks();
+ availableSinks = admin.sink().getBuiltInSinks().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet());
} catch (PulsarAdminException e) {
throw new IOException(e);
}
@@ -533,16 +535,16 @@ public class CmdSinks extends CmdBase {
@Override
void runCmd() throws Exception {
- admin.functions().deleteFunction(tenant, namespace, name);
+ admin.sink().deleteSink(tenant, namespace, name);
print("Deleted successfully");
}
}
@Parameters(commandDescription = "Get the list of Pulsar IO connector sinks supported by Pulsar cluster")
- public class ListSinks extends BaseCommand {
+ public class ListBuiltInSinks extends BaseCommand {
@Override
void runCmd() throws Exception {
- admin.functions().getConnectorsList().stream().filter(x -> isNotBlank(x.getSinkClass()))
+ admin.sink().getBuiltInSinks().stream().filter(x -> isNotBlank(x.getSinkClass()))
.forEach(connector -> {
System.out.println(connector.getName());
System.out.println(WordUtils.wrap(connector.getDescription(), 80));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 5d0c84c..f2d7681 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -37,6 +37,7 @@ import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -79,7 +80,7 @@ public class CmdSources extends CmdBase {
jcommander.addCommand("update", updateSource);
jcommander.addCommand("delete", deleteSource);
jcommander.addCommand("localrun", localSourceRunner);
- jcommander.addCommand("available-sources", new ListSources());
+ jcommander.addCommand("available-sources", new ListBuiltInSources());
}
/**
@@ -187,9 +188,9 @@ public class CmdSources extends CmdBase {
@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) {
- admin.functions().createFunctionWithUrl(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
+ admin.source().createSourceWithUrl(sourceConfig, sourceConfig.getArchive());
} else {
- admin.functions().createFunction(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
+ admin.source().createSource(sourceConfig, sourceConfig.getArchive());
}
print("Created successfully");
}
@@ -200,9 +201,9 @@ public class CmdSources extends CmdBase {
@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) {
- admin.functions().updateFunctionWithUrl(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
+ admin.source().updateSourceWithUrl(sourceConfig, sourceConfig.getArchive());
} else {
- admin.functions().updateFunction(SourceConfigUtils.convert(sourceConfig), sourceConfig.getArchive());
+ admin.source().updateSource(sourceConfig, sourceConfig.getArchive());
}
print("Updated successfully");
}
@@ -267,6 +268,8 @@ public class CmdSources extends CmdBase {
protected SourceConfig sourceConfig;
+ protected NarClassLoader classLoader;
+
private void mergeArgs() {
if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees;
if (!StringUtils.isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName;
@@ -403,7 +406,6 @@ public class CmdSources extends CmdBase {
// if jar file is present locally then load jar and validate SinkClass in it
- ClassLoader classLoader = null;
if (archivePath != null) {
if (!fileExists(archivePath)) {
throw new ParameterException("Archive file " + archivePath + " does not exist");
@@ -436,14 +438,14 @@ public class CmdSources extends CmdBase {
throws IOException {
org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
= org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
- Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig)), functionDetailsBuilder);
+ Utils.mergeJson(FunctionsImpl.printJson(SourceConfigUtils.convert(sourceConfig, classLoader)), functionDetailsBuilder);
return functionDetailsBuilder.build();
}
protected String validateSourceType(String sourceType) throws IOException {
Set<String> availableSources;
try {
- availableSources = admin.functions().getSources();
+ availableSources = admin.source().getBuiltInSources().stream().map(ConnectorDefinition::getName).collect(Collectors.toSet());
} catch (PulsarAdminException e) {
throw new IOException(e);
}
@@ -487,16 +489,16 @@ public class CmdSources extends CmdBase {
@Override
void runCmd() throws Exception {
- admin.functions().deleteFunction(tenant, namespace, name);
+ admin.source().deleteSource(tenant, namespace, name);
print("Delete source successfully");
}
}
@Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster")
- public class ListSources extends BaseCommand {
+ public class ListBuiltInSources extends BaseCommand {
@Override
void runCmd() throws Exception {
- admin.functions().getConnectorsList().stream().filter(x -> !StringUtils.isEmpty(x.getSourceClass()))
+ admin.source().getBuiltInSources().stream().filter(x -> !StringUtils.isEmpty(x.getSourceClass()))
.forEach(connector -> {
System.out.println(connector.getName());
System.out.println(WordUtils.wrap(connector.getDescription(), 80));