You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/18 04:57:33 UTC

[pulsar] branch master updated: Added log statements to debug PulsarFunctionsTest.testJavaExclamationTopicPatternFunction (#2806)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d69ad8  Added log statements to debug PulsarFunctionsTest.testJavaExclamationTopicPatternFunction (#2806)
8d69ad8 is described below

commit 8d69ad86f7726e5798b21bc1bf31baf49f88a29f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Oct 17 21:57:28 2018 -0700

    Added log statements to debug PulsarFunctionsTest.testJavaExclamationTopicPatternFunction (#2806)
---
 .../tests/integration/functions/PulsarFunctionsTest.java | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0ea5404..b8452f1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -78,12 +78,12 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
     public void testCassandraArchiveSink() throws Exception {
         testSink(CassandraSinkTester.createTester(false), false);
     }
-    
+
     @Test(enabled = false)
     public void testHdfsSink() throws Exception {
         testSink(new HdfsSinkTester(), false);
     }
-    
+
     @Test
     public void testJdbcSink() throws Exception {
         testSink(new JdbcSinkTester(), true);
@@ -93,7 +93,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
     public void testElasticSearchSink() throws Exception {
         testSink(new ElasticSearchSinkTester(), true);
     }
-    
+
     private void testSink(SinkTester tester, boolean builtin) throws Exception {
         tester.startServiceContainer(pulsarCluster);
         try {
@@ -669,9 +669,12 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                                            String functionClass,
                                            Schema<T> inputTopicSchema) throws Exception {
         CommandGenerator generator;
+        log.info("------- INPUT TOPIC: '{}'", inputTopicName);
         if (inputTopicName.endsWith(".*")) {
+            log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
             generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
         } else {
+            log.info("----- CREATING REGULAR FUNCTION --- ");
             generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
         }
         generator.setSinkTopic(outputTopicName);
@@ -685,6 +688,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         } else {
             throw new IllegalArgumentException("Unsupported runtime : " + runtime);
         }
+
+        log.info("---------- Function command: {}", command);
         String[] commands = {
             "sh", "-c", command
         };
@@ -721,6 +726,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             "--namespace", "default",
             "--name", functionName
         );
+
+        log.info("FUNCTION STATE: {}", result.getStdout());
         assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\""));
     }
 
@@ -795,7 +802,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         }
 
         for (int i = 0; i < numMessages; i++) {
-            Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
+            Message<String> msg = consumer.receive(30, TimeUnit.SECONDS);
+            log.info("Received: {}", msg.getValue());
             assertTrue(expectedMessages.contains(msg.getValue()));
             expectedMessages.remove(msg.getValue());
         }