You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/30 19:02:07 UTC

[GitHub] sijie closed pull request #2578: Add support for schema extraction from a jar

sijie closed pull request #2578: Add support for schema extraction from a jar
URL: https://github.com/apache/pulsar/pull/2578
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
index cab463f3fb..554f290f9b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
@@ -22,6 +22,10 @@
 import com.beust.jcommander.Parameters;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.schema.PostSchemaPayload;
 
@@ -34,6 +38,7 @@ public CmdSchemas(PulsarAdmin admin) {
         jcommander.addCommand("get", new GetSchema());
         jcommander.addCommand("delete", new DeleteSchema());
         jcommander.addCommand("upload", new UploadSchema());
+        jcommander.addCommand("extract", new ExtractSchema());
     }
 
     @Parameters(commandDescription = "Get the schema for a topic")
@@ -83,4 +88,44 @@ void run() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Provide the schema via a topic")
+    private class ExtractSchema extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-j", "--jar" }, description = "jar filepath", required = true)
+        private String jarFilePath;
+
+        @Parameter(names = { "-t", "--type" }, description = "type avro or json", required = true)
+        private String type;
+
+        @Parameter(names = { "-c", "--classname" }, description = "class name of pojo", required = true)
+        private String className;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+
+            File file  = new File(jarFilePath);
+            ClassLoader cl = new URLClassLoader(new URL[]{ file.toURI().toURL() });
+
+            Class cls = cl.loadClass(className);
+
+            PostSchemaPayload input = new PostSchemaPayload();
+
+            if (type.toLowerCase().equalsIgnoreCase("avro")) {
+                input.setType("AVRO");
+                input.setSchema(SchemaExtractor.getAvroSchemaInfo(cls));
+            } else if (type.toLowerCase().equalsIgnoreCase("json")){
+                input.setType("JSON");
+                input.setSchema(SchemaExtractor.getJsonSchemaInfo(cls));
+            }
+            else {
+                throw new Exception("Unknown schema type specified as type");
+            }
+
+            admin.schemas().createSchema(topic, input);
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java
new file mode 100644
index 0000000000..53269a8200
--- /dev/null
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/utils/SchemaExtractor.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli.utils;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.nio.charset.StandardCharsets;
+
+public class SchemaExtractor {
+
+    public static String getJsonSchemaInfo(Class clazz) {
+
+        return new String(Schema.JSON(clazz).getSchemaInfo().getSchema(), StandardCharsets.UTF_8);
+    }
+
+    public static String getAvroSchemaInfo(Class clazz) {
+
+        return new String(Schema.AVRO(clazz).getSchemaInfo().getSchema(), StandardCharsets.UTF_8);
+    }
+
+}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
new file mode 100644
index 0000000000..8a94a19670
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples.pojo;
+
+/**
+ * Pojo to represent a stock tick
+ */
+public class Tick {
+
+    private long timeStamp;
+    private String stockSymbol;
+    private double bid;
+    private double ask;
+
+    public void setTimeStamp(long timeStamp) {
+        this.timeStamp = timeStamp;
+    }
+
+    public void setStockSymbol(String stockSymbol) {
+        this.stockSymbol = stockSymbol;
+    }
+
+    public void setBid(double bid) {
+        this.bid = bid;
+    }
+
+    public void setAsk(double ask) {
+        this.ask = ask;
+    }
+
+    public long getTimeStamp() {
+
+        return timeStamp;
+    }
+
+    public String getStockSymbol() {
+        return stockSymbol;
+    }
+
+    public double getBid() {
+        return bid;
+    }
+
+    public double getAsk() {
+        return ask;
+    }
+
+    public Tick(long timeStamp, String stockSymbol, double bid, double ask) {
+
+        this.timeStamp = timeStamp;
+        this.stockSymbol = stockSymbol;
+        this.bid = bid;
+        this.ask = ask;
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 8526ded2e2..4311645a05 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -18,18 +18,30 @@
  */
 package org.apache.pulsar.tests.integration.cli;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.functions.api.examples.pojo.Tick;
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThat;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 /**
  * Test Pulsar CLI.
  */
@@ -232,4 +244,60 @@ public void testGrantPermissionsAuthorizationDisabled() throws Exception {
         }
     }
 
+    @Test
+    public void testJarPojoSchemaUploadAvro() throws Exception {
+
+        ContainerExecResult containerExecResult = pulsarCluster.runAdminCommandOnAnyBroker(
+                "schemas",
+                "extract", "--jar", "/pulsar/examples/api-examples.jar", "--type", "avro",
+                "--classname", "org.apache.pulsar.functions.api.examples.pojo.Tick",
+                "persistent://public/default/pojo-avro");
+
+        Assert.assertEquals(containerExecResult.getExitCode(), 0);
+        testPublishAndConsume("persistent://public/default/pojo-avro", "avro", Schema.AVRO(Tick.class));
+    }
+
+    @Test
+    public void testJarPojoSchemaUploadJson() throws Exception {
+
+        ContainerExecResult containerExecResult = pulsarCluster.runAdminCommandOnAnyBroker(
+                "schemas",
+                "extract", "--jar", "/pulsar/examples/api-examples.jar", "--type", "json",
+                "--classname", "org.apache.pulsar.functions.api.examples.pojo.Tick",
+                "persistent://public/default/pojo-json");
+
+        Assert.assertEquals(containerExecResult.getExitCode(), 0);
+        testPublishAndConsume("persistent://public/default/pojo-json", "json", Schema.JSON(Tick.class));
+    }
+
+    private void testPublishAndConsume(String topic, String sub, Schema type) throws PulsarClientException {
+
+        PulsarClient client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+
+        Producer<Tick> producer = client.newProducer(type)
+                .topic(topic + "-message")
+                .create();
+
+        Consumer<Tick> consumer = client.newConsumer(type)
+                .topic(topic + "-message")
+                .subscriptionName(sub)
+                .subscribe();
+
+        final int numOfMessages = 10;
+
+        for (int i = 1; i < numOfMessages; ++i) {
+            producer.send(new Tick(i, "Stock_" + i, 100 + i, 110 + i));
+        }
+
+        for (int i = 1; i < numOfMessages; ++i) {
+            Tick expected = new Tick(i, "Stock_" + i, 100 + i, 110 + i);
+            Message<Tick> receive = consumer.receive(5, TimeUnit.SECONDS);
+            Assert.assertEquals(receive.getValue(), expected);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services