You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/30 19:02:10 UTC

[pulsar] branch master updated: Add support for schema extraction from a jar (#2578)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 671c7bc  Add support for schema extraction from a jar (#2578)
671c7bc is described below

commit 671c7bc6eaba0555d18b008e75a02f37ede3677d
Author: Ali Ahmed <al...@gmail.com>
AuthorDate: Sun Sep 30 12:02:06 2018 -0700

    Add support for schema extraction from a jar (#2578)
    
    Add options to create a schema based on pojo class in a jar.
---
 .../org/apache/pulsar/admin/cli/CmdSchemas.java    | 45 +++++++++++++
 .../pulsar/admin/cli/utils/SchemaExtractor.java    | 37 ++++++++++
 .../pulsar/functions/api/examples/pojo/Tick.java   | 71 ++++++++++++++++++++
 .../pulsar/tests/integration/cli/CLITest.java      | 78 ++++++++++++++++++++--
 4 files changed, 226 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
index cab463f..554f290 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
@@ -22,6 +22,10 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.schema.PostSchemaPayload;
 
@@ -34,6 +38,7 @@ public class CmdSchemas extends CmdBase {
         jcommander.addCommand("get", new GetSchema());
         jcommander.addCommand("delete", new DeleteSchema());
         jcommander.addCommand("upload", new UploadSchema());
+        jcommander.addCommand("extract", new ExtractSchema());
     }
 
     @Parameters(commandDescription = "Get the schema for a topic")
@@ -83,4 +88,44 @@ public class CmdSchemas extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Provide the schema via a topic")
+    private class ExtractSchema extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-j", "--jar" }, description = "jar filepath", required = true)
+        private String jarFilePath;
+
+        @Parameter(names = { "-t", "--type" }, description = "type avro or json", required = true)
+        private String type;
+
+        @Parameter(names = { "-c", "--classname" }, description = "class name of pojo", required = true)
+        private String className;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+
+            File file  = new File(jarFilePath);
+            ClassLoader cl = new URLClassLoader(new URL[]{ file.toURI().toURL() });
+
+            Class cls = cl.loadClass(className);
+
+            PostSchemaPayload input = new PostSchemaPayload();
+
+            if (type.toLowerCase().equalsIgnoreCase("avro")) {
+                input.setType("AVRO");
+                input.setSchema(SchemaExtractor.getAvroSchemaInfo(cls));
+            } else if (type.toLowerCase().equalsIgnoreCase("json")){
+                input.setType("JSON");
+                input.setSchema(SchemaExtractor.getJsonSchemaInfo(cls));
+            }
+            else {
+                throw new Exception("Unknown schema type specified as type");
+            }
+
+            admin.schemas().createSchema(topic, input);
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java
new file mode 100644
index 0000000..53269a8
--- /dev/null
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli.utils;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.nio.charset.StandardCharsets;
+
+public class SchemaExtractor {
+
+    public static String getJsonSchemaInfo(Class clazz) {
+
+        return new String(Schema.JSON(clazz).getSchemaInfo().getSchema(), StandardCharsets.UTF_8);
+    }
+
+    public static String getAvroSchemaInfo(Class clazz) {
+
+        return new String(Schema.AVRO(clazz).getSchemaInfo().getSchema(), StandardCharsets.UTF_8);
+    }
+
+}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
new file mode 100644
index 0000000..8a94a19
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples.pojo;
+
+/**
+ * Pojo to represent a stock tick
+ */
+public class Tick {
+
+    private long timeStamp;
+    private String stockSymbol;
+    private double bid;
+    private double ask;
+
+    public void setTimeStamp(long timeStamp) {
+        this.timeStamp = timeStamp;
+    }
+
+    public void setStockSymbol(String stockSymbol) {
+        this.stockSymbol = stockSymbol;
+    }
+
+    public void setBid(double bid) {
+        this.bid = bid;
+    }
+
+    public void setAsk(double ask) {
+        this.ask = ask;
+    }
+
+    public long getTimeStamp() {
+
+        return timeStamp;
+    }
+
+    public String getStockSymbol() {
+        return stockSymbol;
+    }
+
+    public double getBid() {
+        return bid;
+    }
+
+    public double getAsk() {
+        return ask;
+    }
+
+    public Tick(long timeStamp, String stockSymbol, double bid, double ask) {
+
+        this.timeStamp = timeStamp;
+        this.stockSymbol = stockSymbol;
+        this.bid = bid;
+        this.ask = ask;
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 8526ded..4311645 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -18,18 +18,30 @@
  */
 package org.apache.pulsar.tests.integration.cli;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.functions.api.examples.pojo.Tick;
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThat;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 /**
  * Test Pulsar CLI.
  */
@@ -232,4 +244,60 @@ public class CLITest extends PulsarTestSuite {
         }
     }
 
+    @Test
+    public void testJarPojoSchemaUploadAvro() throws Exception {
+
+        ContainerExecResult containerExecResult = pulsarCluster.runAdminCommandOnAnyBroker(
+                "schemas",
+                "extract", "--jar", "/pulsar/examples/api-examples.jar", "--type", "avro",
+                "--classname", "org.apache.pulsar.functions.api.examples.pojo.Tick",
+                "persistent://public/default/pojo-avro");
+
+        Assert.assertEquals(containerExecResult.getExitCode(), 0);
+        testPublishAndConsume("persistent://public/default/pojo-avro", "avro", Schema.AVRO(Tick.class));
+    }
+
+    @Test
+    public void testJarPojoSchemaUploadJson() throws Exception {
+
+        ContainerExecResult containerExecResult = pulsarCluster.runAdminCommandOnAnyBroker(
+                "schemas",
+                "extract", "--jar", "/pulsar/examples/api-examples.jar", "--type", "json",
+                "--classname", "org.apache.pulsar.functions.api.examples.pojo.Tick",
+                "persistent://public/default/pojo-json");
+
+        Assert.assertEquals(containerExecResult.getExitCode(), 0);
+        testPublishAndConsume("persistent://public/default/pojo-json", "json", Schema.JSON(Tick.class));
+    }
+
+    private void testPublishAndConsume(String topic, String sub, Schema type) throws PulsarClientException {
+
+        PulsarClient client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+
+        Producer<Tick> producer = client.newProducer(type)
+                .topic(topic + "-message")
+                .create();
+
+        Consumer<Tick> consumer = client.newConsumer(type)
+                .topic(topic + "-message")
+                .subscriptionName(sub)
+                .subscribe();
+
+        final int numOfMessages = 10;
+
+        for (int i = 1; i < numOfMessages; ++i) {
+            producer.send(new Tick(i, "Stock_" + i, 100 + i, 110 + i));
+        }
+
+        for (int i = 1; i < numOfMessages; ++i) {
+            Tick expected = new Tick(i, "Stock_" + i, 100 + i, 110 + i);
+            Message<Tick> receive = consumer.receive(5, TimeUnit.SECONDS);
+            Assert.assertEquals(receive.getValue(), expected);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
 }