You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/30 19:02:10 UTC
[pulsar] branch master updated: Add support for schema extraction
from a jar (#2578)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 671c7bc Add support for schema extraction from a jar (#2578)
671c7bc is described below
commit 671c7bc6eaba0555d18b008e75a02f37ede3677d
Author: Ali Ahmed <al...@gmail.com>
AuthorDate: Sun Sep 30 12:02:06 2018 -0700
Add support for schema extraction from a jar (#2578)
Add options to create a schema based on pojo class in a jar.
---
.../org/apache/pulsar/admin/cli/CmdSchemas.java | 45 +++++++++++++
.../pulsar/admin/cli/utils/SchemaExtractor.java | 37 ++++++++++
.../pulsar/functions/api/examples/pojo/Tick.java | 71 ++++++++++++++++++++
.../pulsar/tests/integration/cli/CLITest.java | 78 ++++++++++++++++++++--
4 files changed, 226 insertions(+), 5 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
index cab463f..554f290 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
@@ -22,6 +22,10 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.schema.PostSchemaPayload;
@@ -34,6 +38,7 @@ public class CmdSchemas extends CmdBase {
jcommander.addCommand("get", new GetSchema());
jcommander.addCommand("delete", new DeleteSchema());
jcommander.addCommand("upload", new UploadSchema());
+ jcommander.addCommand("extract", new ExtractSchema());
}
@Parameters(commandDescription = "Get the schema for a topic")
@@ -83,4 +88,44 @@ public class CmdSchemas extends CmdBase {
}
}
+ @Parameters(commandDescription = "Provide the schema via a topic")
+ private class ExtractSchema extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-j", "--jar" }, description = "jar filepath", required = true)
+ private String jarFilePath;
+
+ @Parameter(names = { "-t", "--type" }, description = "type avro or json", required = true)
+ private String type;
+
+ @Parameter(names = { "-c", "--classname" }, description = "class name of pojo", required = true)
+ private String className;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+
+ File file = new File(jarFilePath);
+ ClassLoader cl = new URLClassLoader(new URL[]{ file.toURI().toURL() });
+
+ Class cls = cl.loadClass(className);
+
+ PostSchemaPayload input = new PostSchemaPayload();
+
+ if (type.toLowerCase().equalsIgnoreCase("avro")) {
+ input.setType("AVRO");
+ input.setSchema(SchemaExtractor.getAvroSchemaInfo(cls));
+ } else if (type.toLowerCase().equalsIgnoreCase("json")){
+ input.setType("JSON");
+ input.setSchema(SchemaExtractor.getJsonSchemaInfo(cls));
+ }
+ else {
+ throw new Exception("Unknown schema type specified as type");
+ }
+
+ admin.schemas().createSchema(topic, input);
+ }
+ }
+
}
\ No newline at end of file
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java
new file mode 100644
index 0000000..53269a8
--- /dev/null
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli.utils;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.nio.charset.StandardCharsets;
+
+public class SchemaExtractor {
+
+ public static String getJsonSchemaInfo(Class clazz) {
+
+ return new String(Schema.JSON(clazz).getSchemaInfo().getSchema(), StandardCharsets.UTF_8);
+ }
+
+ public static String getAvroSchemaInfo(Class clazz) {
+
+ return new String(Schema.AVRO(clazz).getSchemaInfo().getSchema(), StandardCharsets.UTF_8);
+ }
+
+}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
new file mode 100644
index 0000000..8a94a19
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples.pojo;
+
+/**
+ * Pojo to represent a stock tick
+ */
+public class Tick {
+
+ private long timeStamp;
+ private String stockSymbol;
+ private double bid;
+ private double ask;
+
+ public void setTimeStamp(long timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ public void setStockSymbol(String stockSymbol) {
+ this.stockSymbol = stockSymbol;
+ }
+
+ public void setBid(double bid) {
+ this.bid = bid;
+ }
+
+ public void setAsk(double ask) {
+ this.ask = ask;
+ }
+
+ public long getTimeStamp() {
+
+ return timeStamp;
+ }
+
+ public String getStockSymbol() {
+ return stockSymbol;
+ }
+
+ public double getBid() {
+ return bid;
+ }
+
+ public double getAsk() {
+ return ask;
+ }
+
+ public Tick(long timeStamp, String stockSymbol, double bid, double ask) {
+
+ this.timeStamp = timeStamp;
+ this.stockSymbol = stockSymbol;
+ this.bid = bid;
+ this.ask = ask;
+ }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 8526ded..4311645 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -18,18 +18,30 @@
*/
package org.apache.pulsar.tests.integration.cli;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.functions.api.examples.pojo.Tick;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThat;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
/**
* Test Pulsar CLI.
*/
@@ -232,4 +244,60 @@ public class CLITest extends PulsarTestSuite {
}
}
+ @Test
+ public void testJarPojoSchemaUploadAvro() throws Exception {
+
+ ContainerExecResult containerExecResult = pulsarCluster.runAdminCommandOnAnyBroker(
+ "schemas",
+ "extract", "--jar", "/pulsar/examples/api-examples.jar", "--type", "avro",
+ "--classname", "org.apache.pulsar.functions.api.examples.pojo.Tick",
+ "persistent://public/default/pojo-avro");
+
+ Assert.assertEquals(containerExecResult.getExitCode(), 0);
+ testPublishAndConsume("persistent://public/default/pojo-avro", "avro", Schema.AVRO(Tick.class));
+ }
+
+ @Test
+ public void testJarPojoSchemaUploadJson() throws Exception {
+
+ ContainerExecResult containerExecResult = pulsarCluster.runAdminCommandOnAnyBroker(
+ "schemas",
+ "extract", "--jar", "/pulsar/examples/api-examples.jar", "--type", "json",
+ "--classname", "org.apache.pulsar.functions.api.examples.pojo.Tick",
+ "persistent://public/default/pojo-json");
+
+ Assert.assertEquals(containerExecResult.getExitCode(), 0);
+ testPublishAndConsume("persistent://public/default/pojo-json", "json", Schema.JSON(Tick.class));
+ }
+
+ private void testPublishAndConsume(String topic, String sub, Schema type) throws PulsarClientException {
+
+ PulsarClient client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+
+ Producer<Tick> producer = client.newProducer(type)
+ .topic(topic + "-message")
+ .create();
+
+ Consumer<Tick> consumer = client.newConsumer(type)
+ .topic(topic + "-message")
+ .subscriptionName(sub)
+ .subscribe();
+
+ final int numOfMessages = 10;
+
+ for (int i = 1; i < numOfMessages; ++i) {
+ producer.send(new Tick(i, "Stock_" + i, 100 + i, 110 + i));
+ }
+
+ for (int i = 1; i < numOfMessages; ++i) {
+ Tick expected = new Tick(i, "Stock_" + i, 100 + i, 110 + i);
+ Message<Tick> receive = consumer.receive(5, TimeUnit.SECONDS);
+ Assert.assertEquals(receive.getValue(), expected);
+ }
+
+ producer.close();
+ consumer.close();
+ client.close();
+ }
+
}