You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/05/25 18:40:04 UTC
[pulsar] branch master updated: [Issue-10109] [admin client] Add
--batch-source-config switch to the Pulsar Admin Source API (#10593)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 85effc4 [Issue-10109] [admin client] Add --batch-source-config switch to the Pulsar Admin Source API (#10593)
85effc4 is described below
commit 85effc4048199e30bd70edac17bf2d86b49f003b
Author: David Kjerrumgaard <35...@users.noreply.github.com>
AuthorDate: Tue May 25 11:39:10 2021 -0700
[Issue-10109] [admin client] Add --batch-source-config switch to the Pulsar Admin Source API (#10593)
Co-authored-by: David Kjerrumgaard <dk...@splunk.com>
---
pulsar-client-tools/pom.xml | 6 ++
.../org/apache/pulsar/admin/cli/CmdSources.java | 42 +++++++++++++-
.../apache/pulsar/admin/cli/TestCmdSources.java | 65 ++++++++++++++++++++++
.../common/io/BatchSourceConfigParseTest.java | 53 ++++++++++++++++++
.../website/versioned_docs/version-2.7.2/io-cli.md | 2 +
5 files changed, 167 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index a639109..a094ba7 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -85,6 +85,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 90a7d16..9f318c2 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SourceConfig;
@@ -315,6 +316,8 @@ public class CmdSources extends CmdBase {
protected String DEPRECATED_sourceConfigString;
@Parameter(names = "--source-config", description = "Source config key/values")
protected String sourceConfigString;
+ @Parameter(names = "--batch-source-config", description = "Batch source config key/values")
+ protected String batchSourceConfigString;
@Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details")
protected String customRuntimeOptions;
@@ -417,6 +420,10 @@ public class CmdSources extends CmdBase {
if (null != sourceConfigString) {
sourceConfig.setConfigs(parseConfigs(sourceConfigString));
}
+
+ if (null != batchSourceConfigString) {
+ sourceConfig.setBatchSourceConfig(parseBatchSourceConfigs(batchSourceConfigString));
+ }
if (customRuntimeOptions != null) {
sourceConfig.setCustomRuntimeOptions(customRuntimeOptions);
@@ -427,7 +434,11 @@ public class CmdSources extends CmdBase {
protected Map<String, Object> parseConfigs(String str) {
Type type = new TypeToken<Map<String, Object>>(){}.getType();
- return new Gson().fromJson(str, type);
+ return new Gson().fromJson(str, type);
+ }
+
+ protected BatchSourceConfig parseBatchSourceConfigs(String str) {
+ return new Gson().fromJson(str, BatchSourceConfig.class);
}
protected void validateSourceConfigs(SourceConfig sourceConfig) {
@@ -444,6 +455,35 @@ public class CmdSources extends CmdBase {
if (isBlank(sourceConfig.getName())) {
throw new IllegalArgumentException("Source name not specified");
}
+
+ if (sourceConfig.getBatchSourceConfig() != null) {
+ validateBatchSourceConfigs(sourceConfig.getBatchSourceConfig());
+ }
+ }
+
+ protected void validateBatchSourceConfigs(BatchSourceConfig batchSourceConfig) {
+ if (isBlank(batchSourceConfig.getDiscoveryTriggererClassName())) {
+ throw new IllegalArgumentException("Discovery Triggerer not specified");
+ }
+
+ boolean isBatchSourceTriggerer = false;
+
+ try {
+ Class<?>[] interfaces = Class.forName(batchSourceConfig.getDiscoveryTriggererClassName()).getInterfaces();
+ int idx = 0;
+
+ while (idx < interfaces.length && !isBatchSourceTriggerer) {
+ isBatchSourceTriggerer = interfaces[idx++].getName().equals("org.apache.pulsar.io.core.BatchSourceTriggerer");
+ }
+
+ if (!isBatchSourceTriggerer) {
+ throw new IllegalArgumentException("Invalid Discovery Triggerer specified");
+ }
+
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Invalid Discovery Triggerer specified");
+ }
+
}
protected String validateSourceType(String sourceType) throws IOException {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index d9ef39d..b2137bd 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -33,12 +33,16 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.powermock.api.mockito.PowerMockito;
@@ -74,6 +78,8 @@ public class TestCmdSources {
private static final Long RAM = 1024L * 1024L;
private static final Long DISK = 1024L * 1024L * 1024L;
private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}";
+ private static final String BATCH_SOURCE_CONFIG_STRING = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
+ + "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }";
private PulsarAdmin pulsarAdmin;
private Sources source;
@@ -133,6 +139,10 @@ public class TestCmdSources {
sourceConfig.setConfigs(createSource.parseConfigs(SINK_CONFIG_STRING));
return sourceConfig;
}
+
+ public BatchSourceConfig getBatchSourceConfig() {
+ return createSource.parseBatchSourceConfigs(BATCH_SOURCE_CONFIG_STRING);
+ }
@Test
public void testCliCorrect() throws Exception {
@@ -390,6 +400,61 @@ public class TestCmdSources {
testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
}
+ @Test
+ public void testBatchSourceConfigCorrect() throws Exception {
+ SourceConfig testSourceConfig = getSourceConfig();
+ testSourceConfig.setBatchSourceConfig(getBatchSourceConfig());
+
+ SourceConfig expectedSourceConfig = getSourceConfig();
+ expectedSourceConfig.setBatchSourceConfig(getBatchSourceConfig());
+ testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+ }
+
+ /*
+ * Test where the DiscoveryTriggererClassName is null
+ */
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Discovery Triggerer not specified")
+ public void testBatchSourceConfigMissingDiscoveryTriggererClassName() throws Exception {
+ SourceConfig testSourceConfig = getSourceConfig();
+ BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
+ batchSourceConfig.setDiscoveryTriggererClassName(null);
+ testSourceConfig.setBatchSourceConfig(batchSourceConfig);
+
+ SourceConfig expectedSourceConfig = getSourceConfig();
+ expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
+ testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+ }
+
+ /*
+ * Test where the class name does not implement the BatchSourceTriggerer interface
+ */
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified")
+ public void testBatchSourceConfigInvalidDiscoveryTriggererClassName() throws Exception {
+ SourceConfig testSourceConfig = getSourceConfig();
+ BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
+ batchSourceConfig.setDiscoveryTriggererClassName("java.lang.String");
+ testSourceConfig.setBatchSourceConfig(batchSourceConfig);
+
+ SourceConfig expectedSourceConfig = getSourceConfig();
+ expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
+ testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+ }
+
+ /*
+ * Test where the class name provided doesn't exist
+ */
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified")
+ public void testBatchSourceConfigDiscoveryTriggererClassNotFound() throws Exception {
+ SourceConfig testSourceConfig = getSourceConfig();
+ BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
+ batchSourceConfig.setDiscoveryTriggererClassName("com.foo.Bar");
+ testSourceConfig.setBatchSourceConfig(batchSourceConfig);
+
+ SourceConfig expectedSourceConfig = getSourceConfig();
+ expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
+ testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+ }
+
public void testCmdSourceConfigFile(SourceConfig testSourceConfig, SourceConfig expectedSourceConfig) throws Exception {
File file = Files.createTempFile("", "").toFile();
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java
new file mode 100644
index 0000000..c761404
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class BatchSourceConfigParseTest {
+
+ private ObjectMapper objectMapper = new ObjectMapper();
+
+ @Test
+ public final void ImmediateTriggererTest() throws JsonMappingException, JsonProcessingException {
+ String json = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer\" }";
+ BatchSourceConfig config = objectMapper.readValue(json, BatchSourceConfig.class);
+ assertNotNull(config);
+ assertEquals(config.getDiscoveryTriggererClassName(), "org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer");
+ }
+
+ @Test
+ public final void CronTriggererTest() throws JsonMappingException, JsonProcessingException {
+ String json = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
+ + "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }";
+ BatchSourceConfig config = objectMapper.readValue(json, BatchSourceConfig.class);
+ assertNotNull(config);
+ assertEquals(config.getDiscoveryTriggererClassName(), "org.apache.pulsar.io.batchdiscovery.CronTriggerer");
+ assertNotNull(config.getDiscoveryTriggererConfig());
+ assertEquals(config.getDiscoveryTriggererConfig().size(), 1);
+ assertEquals(config.getDiscoveryTriggererConfig().get("cron"), "5 0 0 0 0 *");
+ }
+}
diff --git a/site2/website/versioned_docs/version-2.7.2/io-cli.md b/site2/website/versioned_docs/version-2.7.2/io-cli.md
index 00f2e30..2198a85 100644
--- a/site2/website/versioned_docs/version-2.7.2/io-cli.md
+++ b/site2/website/versioned_docs/version-2.7.2/io-cli.md
@@ -57,6 +57,7 @@ $ pulsar-admin sources create options
|Flag|Description|
|----|---|
| `-a`, `--archive` | The path to the NAR archive for the source. <br> It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
+| `--batch-source-config` | BatchSource configuration key/values pairs provided as a JSON string, e.g., { "discoveryTriggererClassName" : "org.apache.pulsar.io.batchdiscovery.CronTriggerer", "discoveryTriggererConfig": {"cron": "*/5 * * * *"} }
| `--classname` | The source's class name if `archive` is file-url-path (file://).
| `--cpu` | The CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime).
| `--deserialization-classname` | The SerDe classname for the source.
@@ -89,6 +90,7 @@ $ pulsar-admin sources update options
|Flag|Description|
|----|---|
| `-a`, `--archive` | The path to the NAR archive for the source. <br> It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
+| `--batch-source-config` | BatchSource configuration key/values pairs provided as a JSON string, e.g., { "discoveryTriggererClassName" : "org.apache.pulsar.io.batchdiscovery.CronTriggerer", "discoveryTriggererConfig": {"cron": "*/5 * * * *"} }
| `--classname` | The source's class name if `archive` is file-url-path (file://).
| `--cpu` | The CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime).
| `--deserialization-classname` | The SerDe classname for the source.