You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/04 19:54:10 UTC

[pulsar] branch master updated: [tests][integration] Fix errors in integration tests (#2716)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 861f155  [tests][integration] Fix errors in integration tests (#2716)
861f155 is described below

commit 861f155ce853258174915718faa04e56733e5efa
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Thu Oct 4 12:54:02 2018 -0700

    [tests][integration] Fix errors in integration tests (#2716)
    
    - Tick doesn't implement equals
    - TestBasicPresto doesn't run
---
 .../pulsar/functions/api/examples/pojo/Tick.java   | 40 +++-----------
 .../tests/integration/presto/TestBasicPresto.java  | 61 ++++++++++------------
 .../integration/topologies/PulsarCluster.java      | 40 ++++++++++++--
 .../src/test/resources/pulsar-process.xml          |  1 +
 4 files changed, 71 insertions(+), 71 deletions(-)

diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
index fc16e70..67d056e 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
@@ -18,9 +18,16 @@
  */
 package org.apache.pulsar.functions.api.examples.pojo;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
 /**
  * Pojo to represent a stock tick
  */
+@Data
+@ToString
+@EqualsAndHashCode
 public class Tick {
 
     private long timeStamp;
@@ -28,39 +35,6 @@ public class Tick {
     private double bid;
     private double ask;
 
-    public void setTimeStamp(long timeStamp) {
-        this.timeStamp = timeStamp;
-    }
-
-    public void setStockSymbol(String stockSymbol) {
-        this.stockSymbol = stockSymbol;
-    }
-
-    public void setBid(double bid) {
-        this.bid = bid;
-    }
-
-    public void setAsk(double ask) {
-        this.ask = ask;
-    }
-
-    public long getTimeStamp() {
-
-        return timeStamp;
-    }
-
-    public String getStockSymbol() {
-        return stockSymbol;
-    }
-
-    public double getBid() {
-        return bid;
-    }
-
-    public double getAsk() {
-        return ask;
-    }
-
     public Tick() {}
 
     public Tick(long timeStamp, String stockSymbol, double bid, double ask) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 0007e3d..c51b25f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -23,52 +23,47 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterSuite;
-import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.util.stream.Stream;
-
-import static java.util.stream.Collectors.joining;
 import static org.assertj.core.api.Assertions.assertThat;
 
 @Slf4j
-public class TestBasicPresto extends PulsarClusterTestBase {
+public class TestBasicPresto extends PulsarTestSuite {
 
     private static final int NUM_OF_STOCKS = 10;
 
-    @BeforeSuite
-    @Override
-    public void setupCluster() throws Exception {
-        final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5))
-                .filter(s -> s != null && !s.isEmpty())
-                .collect(joining("-"));
-
-        PulsarClusterSpec spec = PulsarClusterSpec.builder()
-                .numBookies(2)
-                .numBrokers(1)
-                .enablePrestoWorker(true)
-                .clusterName(clusterName)
-                .build();
-
-        log.info("Setting up cluster {} with {} bookies, {} brokers",
-                spec.clusterName(), spec.numBookies(), spec.numBrokers());
-
-        pulsarCluster = PulsarCluster.forSpec(spec);
-        pulsarCluster.start();
-
-        log.info("Cluster {} is setup with presto worker", spec.clusterName());
+    @BeforeClass
+    public void setupPresto() throws Exception {
+        pulsarCluster.startPrestoWorker();
+
+        // wait until presto worker started
+        ContainerExecResult result;
+        do {
+            try {
+                result = execQuery("show catalogs;");
+                assertThat(result.getExitCode()).isEqualTo(0);
+                assertThat(result.getStdout()).contains("pulsar", "system");
+                break;
+            } catch (ContainerExecException cee) {
+                if (cee.getResult().getStderr().contains("Presto server is still initializing")) {
+                    Thread.sleep(10000);
+                } else {
+                    throw cee;
+                }
+            }
+        } while (true);
     }
 
-    @Test
-    public void testDefaultCatalog() throws Exception {
-        ContainerExecResult containerExecResult = execQuery("show catalogs;");
-        assertThat(containerExecResult.getExitCode()).isEqualTo(0);
-        assertThat(containerExecResult.getStdout()).contains("pulsar", "system");
+    @AfterClass
+    public void teardownPresto() {
+        pulsarCluster.stopPrestoWorker();;
     }
 
     @Test
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 8ee2f57..7f9986e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -80,7 +80,7 @@ public class PulsarCluster {
     private final Map<String, BrokerContainer> brokerContainers;
     private final Map<String, WorkerContainer> workerContainers;
     private final ProxyContainer proxyContainer;
-    private final PrestoWorkerContainer prestoWorkerContainer;
+    private PrestoWorkerContainer prestoWorkerContainer;
     private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap();
     private final boolean enablePrestoWorker;
 
@@ -277,10 +277,18 @@ public class PulsarCluster {
             containers.addAll(externalServices.values());
         }
 
-        containers.add(proxyContainer);
-        containers.add(csContainer);
-        containers.add(zkContainer);
-        containers.add(prestoWorkerContainer);
+        if (null != proxyContainer) {
+            containers.add(proxyContainer);
+        }
+        if (null != csContainer) {
+            containers.add(csContainer);
+        }
+        if (null != zkContainer) {
+            containers.add(zkContainer);
+        }
+        if (null != prestoWorkerContainer) {
+            containers.add(prestoWorkerContainer);
+        }
 
         containers.parallelStream()
                 .filter(Objects::nonNull)
@@ -295,6 +303,28 @@ public class PulsarCluster {
         }
     }
 
+    public void startPrestoWorker() {
+        if (null == prestoWorkerContainer) {
+            prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME)
+                    .withNetwork(network)
+                    .withNetworkAliases(PrestoWorkerContainer.NAME)
+                    .withEnv("clusterName", clusterName)
+                    .withEnv("zkServers", ZKContainer.NAME)
+                    .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
+                    .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080");
+        }
+        log.info("Starting Presto Worker");
+        prestoWorkerContainer.start();
+    }
+
+    public void stopPrestoWorker() {
+        if (null != prestoWorkerContainer) {
+            prestoWorkerContainer.stop();
+            log.info("Stopped Presto Worker");
+            prestoWorkerContainer = null;
+        }
+    }
+
     public synchronized void setupFunctionWorkers(String suffix, FunctionRuntimeType runtimeType, int numFunctionWorkers) {
         switch (runtimeType) {
             case THREAD:
diff --git a/tests/integration/src/test/resources/pulsar-process.xml b/tests/integration/src/test/resources/pulsar-process.xml
index 482e570..e29a05e 100644
--- a/tests/integration/src/test/resources/pulsar-process.xml
+++ b/tests/integration/src/test/resources/pulsar-process.xml
@@ -25,6 +25,7 @@
             <class name="org.apache.pulsar.tests.integration.cli.CLITest" />
             <class name="org.apache.pulsar.tests.integration.compaction.TestCompaction" />
             <class name="org.apache.pulsar.tests.integration.functions.PulsarFunctionsProcessTest" />
+            <class name="org.apache.pulsar.tests.integration.presto.TestBasicPresto" />
         </classes>
     </test>
 </suite>
\ No newline at end of file