You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/02 20:49:31 UTC

[GitHub] srkukarni closed pull request #2504: Add Presto Sql Test

srkukarni closed pull request #2504: Add Presto Sql Test
URL: https://github.com/apache/pulsar/pull/2504
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index ddc7dee7e0..27a44c15a8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -843,6 +843,11 @@ flexible messaging model and an intuitive client API.</description>
         <artifactId>cassandra-driver-core</artifactId>
         <version>${cassandra.version}</version>
       </dependency>
+      <dependency>
+    	<groupId>org.assertj</groupId>
+    	<artifactId>assertj-core</artifactId>
+    	<version>3.11.1</version>
+	  </dependency>
     </dependencies>
   </dependencyManagement>
 
@@ -879,6 +884,12 @@ flexible messaging model and an intuitive client API.</description>
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+    	<groupId>org.assertj</groupId>
+    	<artifactId>assertj-core</artifactId>
+    	<scope>test</scope>
+	 </dependency>
+
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>lombok</artifactId>
diff --git a/tests/docker-images/latest-version-image/Dockerfile b/tests/docker-images/latest-version-image/Dockerfile
index 862b53cd05..491a913098 100644
--- a/tests/docker-images/latest-version-image/Dockerfile
+++ b/tests/docker-images/latest-version-image/Dockerfile
@@ -25,11 +25,11 @@ RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p /pulsa
 
 COPY conf/supervisord.conf /etc/supervisord.conf
 COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf conf/functions_worker.conf \
-     conf/proxy.conf /etc/supervisord/conf.d/
+     conf/proxy.conf conf/presto_worker.conf /etc/supervisord/conf.d/
 
 COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
      ssl/admin.key-pk8.pem ssl/admin.cert.pem /pulsar/ssl/
 
 COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
-     scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh \
+     scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh scripts/run-presto-worker.sh \
      /pulsar/bin/
diff --git a/tests/docker-images/latest-version-image/conf/presto_worker.conf b/tests/docker-images/latest-version-image/conf/presto_worker.conf
new file mode 100644
index 0000000000..74172ae092
--- /dev/null
+++ b/tests/docker-images/latest-version-image/conf/presto_worker.conf
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+[program:presto-worker]
+autostart=false
+redirect_stderr=true
+stdout_logfile=/var/log/pulsar/presto_worker.log
+directory=/pulsar
+environment=PULSAR_MEM=-Xms128M
+command=/pulsar/bin/pulsar sql-worker start
\ No newline at end of file
diff --git a/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh b/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
new file mode 100755
index 0000000000..a78f955f7a
--- /dev/null
+++ b/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh
@@ -0,0 +1,29 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+bin/apply-config-from-env.py conf/presto/catalog/pulsar.properties && \
+    bin/apply-config-from-env.py conf/pulsar_env.sh
+
+if [ -z "$NO_AUTOSTART" ]; then
+    sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/presto_worker.conf
+fi
+
+bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
+exec /usr/bin/supervisord -c /etc/supervisord.conf
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index f4712c8ad1..fc5ae42c78 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -77,6 +77,12 @@
     <dependency>
       <groupId>com.datastax.cassandra</groupId>
       <artifactId>cassandra-driver-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-handler</artifactId>
+        </exclusion>
+      </exclusions>
       <scope>test</scope>
     </dependency>
     <dependency>
@@ -116,6 +122,7 @@
       <artifactId>jackson-databind</artifactId>
       <scope>test</scope>
     </dependency>
+
     <dependency>
       <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-yaml</artifactId>
@@ -127,7 +134,7 @@
   	  <artifactId>elasticsearch-rest-high-level-client</artifactId>
   	  <version>6.3.2</version>
   	</dependency>
-  	
+
   </dependencies>
 
   <build>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
new file mode 100644
index 0000000000..edb0a0c34f
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+/**
+ * A pulsar container that runs the presto worker
+ */
+public class PrestoWorkerContainer extends PulsarContainer<PrestoWorkerContainer> {
+
+    public static final String NAME = "presto-worker";
+    public static final int PRESTO_HTTP_PORT = 8081;
+
+    public PrestoWorkerContainer(String clusterName, String hostname) {
+        super(
+                clusterName,
+                hostname,
+                hostname,
+                "bin/run-presto-worker.sh",
+                -1,
+                PRESTO_HTTP_PORT,
+                "/v1/node");
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/Stock.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/Stock.java
new file mode 100644
index 0000000000..f5d85f159c
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/Stock.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.presto;
+
+import java.util.Objects;
+
+public class Stock {
+
+    private int entryId;
+    private String symbol;
+    private double sharePrice;
+
+    public Stock(int entryId, String symbol, double sharePrice) {
+        this.entryId = entryId;
+        this.symbol = symbol;
+        this.sharePrice = sharePrice;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Stock stock = (Stock) o;
+        return entryId == stock.entryId &&
+                Double.compare(stock.sharePrice, sharePrice) == 0 &&
+                Objects.equals(symbol, stock.symbol);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(symbol, sharePrice);
+    }
+
+    @Override
+    public String toString() {
+        return "Stock{" +
+                "entryId=" + entryId +
+                ", symbol='" + symbol + '\'' +
+                ", sharePrice=" + sharePrice +
+                '}';
+    }
+
+    public int getEntryId() {
+        return entryId;
+    }
+
+    public void setEntryId(int entryId) {
+        this.entryId = entryId;
+    }
+
+    public String getSymbol() {
+        return symbol;
+    }
+
+    public void setSymbol(String symbol) {
+        this.symbol = symbol;
+    }
+
+    public double getSharePrice() {
+        return sharePrice;
+    }
+
+    public void setSharePrice(double sharePrice) {
+        this.sharePrice = sharePrice;
+    }
+}
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
new file mode 100644
index 0000000000..0007e3d970
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.presto;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.joining;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Slf4j
+public class TestBasicPresto extends PulsarClusterTestBase {
+
+    private static final int NUM_OF_STOCKS = 10;
+
+    @BeforeSuite
+    @Override
+    public void setupCluster() throws Exception {
+        final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5))
+                .filter(s -> s != null && !s.isEmpty())
+                .collect(joining("-"));
+
+        PulsarClusterSpec spec = PulsarClusterSpec.builder()
+                .numBookies(2)
+                .numBrokers(1)
+                .enablePrestoWorker(true)
+                .clusterName(clusterName)
+                .build();
+
+        log.info("Setting up cluster {} with {} bookies, {} brokers",
+                spec.clusterName(), spec.numBookies(), spec.numBrokers());
+
+        pulsarCluster = PulsarCluster.forSpec(spec);
+        pulsarCluster.start();
+
+        log.info("Cluster {} is setup with presto worker", spec.clusterName());
+    }
+
+    @Test
+    public void testDefaultCatalog() throws Exception {
+        ContainerExecResult containerExecResult = execQuery("show catalogs;");
+        assertThat(containerExecResult.getExitCode()).isEqualTo(0);
+        assertThat(containerExecResult.getStdout()).contains("pulsar", "system");
+    }
+
+    @Test
+    public void testSimpleSQLQuery() throws Exception {
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder()
+                                    .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                                    .build();
+
+        final String stocksTopic = "stocks";
+
+        @Cleanup
+        Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
+                .topic(stocksTopic)
+                .create();
+
+
+        for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
+            final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
+            producer.send(stock);
+        }
+
+        ContainerExecResult containerExecResult = execQuery("select * from pulsar.\"public/default\".stocks order by entryid;");
+        assertThat(containerExecResult.getExitCode()).isEqualTo(0);
+        log.info("select sql query output \n{}", containerExecResult.getStdout());
+        String[] split = containerExecResult.getStdout().split("\n");
+        assertThat(split.length).isGreaterThan(NUM_OF_STOCKS - 2);
+
+        String[] split2 = containerExecResult.getStdout().split("\n|,");
+
+        for (int i = 0; i < NUM_OF_STOCKS - 2; ++i) {
+            assertThat(split2).contains("\"" + i + "\"");
+            assertThat(split2).contains("\"" + "STOCK_" + i + "\"");
+            assertThat(split2).contains("\"" + (100.0 + i * 10) + "\"");
+        }
+
+    }
+
+    @AfterSuite
+    @Override
+    public void tearDownCluster() {
+        super.tearDownCluster();
+    }
+
+    public static ContainerExecResult execQuery(final String query) throws Exception {
+        ContainerExecResult containerExecResult;
+
+        containerExecResult = pulsarCluster.getPrestoWorkerContainer()
+                .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'");
+
+        return containerExecResult;
+
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index af6071220d..8ee2f57327 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -25,19 +25,21 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.function.Function;
-import java.util.stream.Stream;
+import java.util.stream.Collectors;
 
-import com.google.common.collect.Streams;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.tests.integration.containers.BKContainer;
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.containers.CSContainer;
+import org.apache.pulsar.tests.integration.containers.PrestoWorkerContainer;
 import org.apache.pulsar.tests.integration.containers.ProxyContainer;
 import org.apache.pulsar.tests.integration.containers.PulsarContainer;
 import org.apache.pulsar.tests.integration.containers.WorkerContainer;
@@ -78,13 +80,29 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) {
     private final Map<String, BrokerContainer> brokerContainers;
     private final Map<String, WorkerContainer> workerContainers;
     private final ProxyContainer proxyContainer;
+    private final PrestoWorkerContainer prestoWorkerContainer;
     private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap();
+    private final boolean enablePrestoWorker;
 
     private PulsarCluster(PulsarClusterSpec spec) {
 
         this.spec = spec;
         this.clusterName = spec.clusterName();
         this.network = Network.newNetwork();
+        this.enablePrestoWorker = spec.enablePrestoWorker();
+
+        if (enablePrestoWorker) {
+            prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME)
+                    .withNetwork(network)
+                    .withNetworkAliases(PrestoWorkerContainer.NAME)
+                    .withEnv("clusterName", clusterName)
+                    .withEnv("zkServers", ZKContainer.NAME)
+                    .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+                    .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
+        } else {
+            prestoWorkerContainer = null;
+        }
+
 
         this.zkContainer = new ZKContainer(clusterName);
         this.zkContainer
@@ -196,6 +214,11 @@ public void start() throws Exception {
         log.info("\tBinary Service Url : {}", getPlainTextServiceUrl());
         log.info("\tHttp Service Url : {}", getHttpServiceUrl());
 
+        if (enablePrestoWorker) {
+            log.info("Starting Presto Worker");
+            prestoWorkerContainer.start();
+        }
+
         // start external services
         this.externalServices = spec.externalServices;
         if (null != externalServices) {
@@ -238,19 +261,32 @@ public void stopService(String networkAlias,
         return containers;
     }
 
+    public PrestoWorkerContainer getPrestoWorkerContainer() {
+        return prestoWorkerContainer;
+    }
+
     public synchronized void stop() {
-        Stream<GenericContainer> containers = Streams.concat(
-                workerContainers.values().stream(),
-                brokerContainers.values().stream(),
-                bookieContainers.values().stream(),
-                Stream.of(proxyContainer, csContainer, zkContainer)
-        );
 
-        if (spec.externalServices() != null) {
-            containers = Streams.concat(containers, spec.externalServices().values().stream());
+        List<GenericContainer> containers = new ArrayList<>();
+
+        containers.addAll(workerContainers.values());
+        containers.addAll(brokerContainers.values());
+        containers.addAll(bookieContainers.values());
+
+        if (externalServices != null) {
+            containers.addAll(externalServices.values());
         }
 
-        containers.parallel().forEach(GenericContainer::stop);
+        containers.add(proxyContainer);
+        containers.add(csContainer);
+        containers.add(zkContainer);
+        containers.add(prestoWorkerContainer);
+
+        containers.parallelStream()
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+
+        containers.parallelStream().forEach(GenericContainer::stop);
 
         try {
             network.close();
@@ -349,11 +385,19 @@ public void stopContainers(Map<String, GenericContainer<?>> containers) {
     }
 
     public BrokerContainer getAnyBroker() {
-        return getAnyContainer(brokerContainers, "broker");
+        return getAnyContainer(brokerContainers, "pulsar-broker");
     }
 
     public synchronized WorkerContainer getAnyWorker() {
-        return getAnyContainer(workerContainers, "functions-worker");
+        return getAnyContainer(workerContainers, "pulsar-functions-worker");
+    }
+
+    public BrokerContainer getBroker(int index) {
+        return getAnyContainer(brokerContainers, "pulsar-broker", index);
+    }
+
+    public synchronized WorkerContainer getWorker(int index) {
+        return getAnyContainer(workerContainers, "pulsar-functions-worker", index);
     }
 
     private <T> T getAnyContainer(Map<String, T> containers, String serviceName) {
@@ -364,6 +408,12 @@ public synchronized WorkerContainer getAnyWorker() {
         return containerList.get(0);
     }
 
+    private <T> T getAnyContainer(Map<String, T> containers, String serviceName, int index) {
+        checkArgument(!containers.isEmpty(), "No " + serviceName + " is alive");
+        checkArgument((index >= 0 && index < containers.size()), "Index : " + index + " is out range");
+        return containers.get(serviceName.toLowerCase() + "-" + index);
+    }
+
     public Collection<BrokerContainer> getBrokers() {
         return brokerContainers.values();
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
index f49200ecb4..3aeb6c606c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterSpec.java
@@ -78,6 +78,15 @@
     @Default
     int numFunctionWorkers = 0;
 
+
+    /**
+     * Enable a Preto Worker Node
+     *
+     * @return the flag whether presto worker is eanbled
+     */
+    @Default
+    boolean enablePrestoWorker = false;
+
     /**
      * Returns the function runtime type.
      *


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services