You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/05/25 18:40:04 UTC

[pulsar] branch master updated: [Issue-10109] [admin client] Add --batch-source-config switch to the Pulsar Admin Source API (#10593)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 85effc4  [Issue-10109] [admin client] Add --batch-source-config switch to the Pulsar Admin Source API (#10593)
85effc4 is described below

commit 85effc4048199e30bd70edac17bf2d86b49f003b
Author: David Kjerrumgaard <35...@users.noreply.github.com>
AuthorDate: Tue May 25 11:39:10 2021 -0700

    [Issue-10109] [admin client] Add --batch-source-config switch to the Pulsar Admin Source API (#10593)
    
    Co-authored-by: David Kjerrumgaard <dk...@splunk.com>
---
 pulsar-client-tools/pom.xml                        |  6 ++
 .../org/apache/pulsar/admin/cli/CmdSources.java    | 42 +++++++++++++-
 .../apache/pulsar/admin/cli/TestCmdSources.java    | 65 ++++++++++++++++++++++
 .../common/io/BatchSourceConfigParseTest.java      | 53 ++++++++++++++++++
 .../website/versioned_docs/version-2.7.2/io-cli.md |  2 +
 5 files changed, 167 insertions(+), 1 deletion(-)

diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index a639109..a094ba7 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -85,6 +85,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+    	<groupId>org.apache.pulsar</groupId>
+    	<artifactId>pulsar-io-batch-discovery-triggerers</artifactId>
+    	<version>${project.version}</version>
+    	<scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 90a7d16..9f318c2 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SourceConfig;
@@ -315,6 +316,8 @@ public class CmdSources extends CmdBase {
         protected String DEPRECATED_sourceConfigString;
         @Parameter(names = "--source-config", description = "Source config key/values")
         protected String sourceConfigString;
+        @Parameter(names = "--batch-source-config", description = "Batch source config key/values")
+        protected String batchSourceConfigString;
         @Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details")
         protected String customRuntimeOptions;
 
@@ -417,6 +420,10 @@ public class CmdSources extends CmdBase {
             if (null != sourceConfigString) {
                 sourceConfig.setConfigs(parseConfigs(sourceConfigString));
             }
+            
+            if (null != batchSourceConfigString) {
+            	sourceConfig.setBatchSourceConfig(parseBatchSourceConfigs(batchSourceConfigString));
+            }
 
             if (customRuntimeOptions != null) {
                 sourceConfig.setCustomRuntimeOptions(customRuntimeOptions);
@@ -427,7 +434,11 @@ public class CmdSources extends CmdBase {
 
         protected Map<String, Object> parseConfigs(String str) {
             Type type = new TypeToken<Map<String, Object>>(){}.getType();
-            return new Gson().fromJson(str, type);
+            return new Gson().fromJson(str, type); 
+        }
+        
+        protected BatchSourceConfig parseBatchSourceConfigs(String str) {
+        	return new Gson().fromJson(str, BatchSourceConfig.class);
         }
 
         protected void validateSourceConfigs(SourceConfig sourceConfig) {
@@ -444,6 +455,35 @@ public class CmdSources extends CmdBase {
             if (isBlank(sourceConfig.getName())) {
                 throw new IllegalArgumentException("Source name not specified");
             }
+            
+            if (sourceConfig.getBatchSourceConfig() != null) {
+            	validateBatchSourceConfigs(sourceConfig.getBatchSourceConfig());
+            }
+        }
+        
+        protected void validateBatchSourceConfigs(BatchSourceConfig batchSourceConfig) {
+           if (isBlank(batchSourceConfig.getDiscoveryTriggererClassName())) {
+             throw new IllegalArgumentException("Discovery Triggerer not specified");
+           } 
+           
+           boolean isBatchSourceTriggerer = false;
+           
+           try {
+             Class<?>[] interfaces = Class.forName(batchSourceConfig.getDiscoveryTriggererClassName()).getInterfaces();
+             int idx = 0;
+             
+             while (idx < interfaces.length && !isBatchSourceTriggerer) {
+            	 isBatchSourceTriggerer = interfaces[idx++].getName().equals("org.apache.pulsar.io.core.BatchSourceTriggerer");
+             }
+             
+             if (!isBatchSourceTriggerer) {
+            	 throw new IllegalArgumentException("Invalid Discovery Triggerer specified"); 
+             }
+             
+           } catch (ClassNotFoundException e) {
+             throw new IllegalArgumentException("Invalid Discovery Triggerer specified"); 
+           }
+           
         }
 
         protected String validateSourceType(String sourceType) throws IOException {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index d9ef39d..b2137bd 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -33,12 +33,16 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.Sources;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.powermock.api.mockito.PowerMockito;
@@ -74,6 +78,8 @@ public class TestCmdSources {
     private static final Long RAM = 1024L * 1024L;
     private static final Long DISK = 1024L * 1024L * 1024L;
     private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}";
+    private static final String BATCH_SOURCE_CONFIG_STRING = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
+			+ "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }";
 
     private PulsarAdmin pulsarAdmin;
     private Sources source;
@@ -133,6 +139,10 @@ public class TestCmdSources {
         sourceConfig.setConfigs(createSource.parseConfigs(SINK_CONFIG_STRING));
         return sourceConfig;
     }
+    
+    public BatchSourceConfig getBatchSourceConfig() {
+    	return createSource.parseBatchSourceConfigs(BATCH_SOURCE_CONFIG_STRING);
+    }
 
     @Test
     public void testCliCorrect() throws Exception {
@@ -390,6 +400,61 @@ public class TestCmdSources {
         testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
     }
 
+    @Test
+    public void testBatchSourceConfigCorrect() throws Exception {
+    	SourceConfig testSourceConfig = getSourceConfig();
+    	testSourceConfig.setBatchSourceConfig(getBatchSourceConfig());
+    	
+    	SourceConfig expectedSourceConfig = getSourceConfig();
+        expectedSourceConfig.setBatchSourceConfig(getBatchSourceConfig());
+        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+    }
+    
+    /*
+     * Test where the DiscoveryTriggererClassName is null
+     */
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Discovery Triggerer not specified")
+    public void testBatchSourceConfigMissingDiscoveryTriggererClassName() throws Exception {
+    	SourceConfig testSourceConfig = getSourceConfig();
+    	BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
+    	batchSourceConfig.setDiscoveryTriggererClassName(null);
+    	testSourceConfig.setBatchSourceConfig(batchSourceConfig);
+    	
+    	SourceConfig expectedSourceConfig = getSourceConfig();
+        expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
+        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+    }
+    
+    /*
+     * Test where the class name does not implement the BatchSourceTriggerer interface
+     */
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified")
+    public void testBatchSourceConfigInvalidDiscoveryTriggererClassName() throws Exception {
+    	SourceConfig testSourceConfig = getSourceConfig();
+    	BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
+    	batchSourceConfig.setDiscoveryTriggererClassName("java.lang.String");
+    	testSourceConfig.setBatchSourceConfig(batchSourceConfig);
+    	
+    	SourceConfig expectedSourceConfig = getSourceConfig();
+        expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
+        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+    }
+    
+    /*
+     * Test where the class name provided doesn't exist
+     */
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid Discovery Triggerer specified")
+    public void testBatchSourceConfigDiscoveryTriggererClassNotFound() throws Exception {
+    	SourceConfig testSourceConfig = getSourceConfig();
+    	BatchSourceConfig batchSourceConfig = getBatchSourceConfig();
+    	batchSourceConfig.setDiscoveryTriggererClassName("com.foo.Bar");
+    	testSourceConfig.setBatchSourceConfig(batchSourceConfig);
+    	
+    	SourceConfig expectedSourceConfig = getSourceConfig();
+        expectedSourceConfig.setBatchSourceConfig(batchSourceConfig);
+        testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
+    }
+    
     public void testCmdSourceConfigFile(SourceConfig testSourceConfig, SourceConfig expectedSourceConfig) throws Exception {
 
         File file = Files.createTempFile("", "").toFile();
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java
new file mode 100644
index 0000000..c761404
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/io/BatchSourceConfigParseTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class BatchSourceConfigParseTest {
+	
+	private ObjectMapper objectMapper = new ObjectMapper();
+
+	@Test
+	public final void ImmediateTriggererTest() throws JsonMappingException, JsonProcessingException {
+		String json = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer\" }";
+		BatchSourceConfig config = objectMapper.readValue(json, BatchSourceConfig.class);
+		assertNotNull(config);
+		assertEquals(config.getDiscoveryTriggererClassName(), "org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer");
+	}
+	
+	@Test
+	public final void CronTriggererTest() throws JsonMappingException, JsonProcessingException {
+		String json = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
+				+ "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }";
+		BatchSourceConfig config = objectMapper.readValue(json, BatchSourceConfig.class);
+		assertNotNull(config);
+		assertEquals(config.getDiscoveryTriggererClassName(), "org.apache.pulsar.io.batchdiscovery.CronTriggerer");
+		assertNotNull(config.getDiscoveryTriggererConfig());
+		assertEquals(config.getDiscoveryTriggererConfig().size(), 1);
+		assertEquals(config.getDiscoveryTriggererConfig().get("cron"), "5 0 0 0 0 *");
+	}
+}
diff --git a/site2/website/versioned_docs/version-2.7.2/io-cli.md b/site2/website/versioned_docs/version-2.7.2/io-cli.md
index 00f2e30..2198a85 100644
--- a/site2/website/versioned_docs/version-2.7.2/io-cli.md
+++ b/site2/website/versioned_docs/version-2.7.2/io-cli.md
@@ -57,6 +57,7 @@ $ pulsar-admin sources create options
 |Flag|Description|
 |----|---|
 | `-a`, `--archive` | The path to the NAR archive for the source. <br> It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
+| `--batch-source-config` | BatchSource configuration key/values pairs provided as a JSON string, e.g., { "discoveryTriggererClassName" : "org.apache.pulsar.io.batchdiscovery.CronTriggerer", "discoveryTriggererConfig": {"cron": "*/5 * * * *"} }
 | `--classname` | The source's class name if `archive` is file-url-path (file://).
 | `--cpu` | The CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime).
 | `--deserialization-classname` | The SerDe classname for the source.
@@ -89,6 +90,7 @@ $ pulsar-admin sources update options
 |Flag|Description|
 |----|---|
 | `-a`, `--archive` | The path to the NAR archive for the source. <br> It also supports url-path (http/https/file [file protocol assumes that file already exists on worker host]) from which worker can download the package.
+| `--batch-source-config` | BatchSource configuration key/values pairs provided as a JSON string, e.g., { "discoveryTriggererClassName" : "org.apache.pulsar.io.batchdiscovery.CronTriggerer", "discoveryTriggererConfig": {"cron": "*/5 * * * *"} }
 | `--classname` | The source's class name if `archive` is file-url-path (file://).
 | `--cpu` | The CPU (in cores) that needs to be allocated per source instance (applicable only to Docker runtime).
 | `--deserialization-classname` | The SerDe classname for the source.