You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/10/18 04:57:33 UTC
[pulsar] branch master updated: Added log statements to debug
PulsarFunctionsTest.testJavaExclamationTopicPatternFunction (#2806)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8d69ad8 Added log statements to debug PulsarFunctionsTest.testJavaExclamationTopicPatternFunction (#2806)
8d69ad8 is described below
commit 8d69ad86f7726e5798b21bc1bf31baf49f88a29f
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Oct 17 21:57:28 2018 -0700
Added log statements to debug PulsarFunctionsTest.testJavaExclamationTopicPatternFunction (#2806)
---
.../tests/integration/functions/PulsarFunctionsTest.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0ea5404..b8452f1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -78,12 +78,12 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
public void testCassandraArchiveSink() throws Exception {
testSink(CassandraSinkTester.createTester(false), false);
}
-
+
@Test(enabled = false)
public void testHdfsSink() throws Exception {
testSink(new HdfsSinkTester(), false);
}
-
+
@Test
public void testJdbcSink() throws Exception {
testSink(new JdbcSinkTester(), true);
@@ -93,7 +93,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
public void testElasticSearchSink() throws Exception {
testSink(new ElasticSearchSinkTester(), true);
}
-
+
private void testSink(SinkTester tester, boolean builtin) throws Exception {
tester.startServiceContainer(pulsarCluster);
try {
@@ -669,9 +669,12 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String functionClass,
Schema<T> inputTopicSchema) throws Exception {
CommandGenerator generator;
+ log.info("------- INPUT TOPIC: '{}'", inputTopicName);
if (inputTopicName.endsWith(".*")) {
+ log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
} else {
+ log.info("----- CREATING REGULAR FUNCTION --- ");
generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
}
generator.setSinkTopic(outputTopicName);
@@ -685,6 +688,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
} else {
throw new IllegalArgumentException("Unsupported runtime : " + runtime);
}
+
+ log.info("---------- Function command: {}", command);
String[] commands = {
"sh", "-c", command
};
@@ -721,6 +726,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
"--namespace", "default",
"--name", functionName
);
+
+ log.info("FUNCTION STATE: {}", result.getStdout());
assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\""));
}
@@ -795,7 +802,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
for (int i = 0; i < numMessages; i++) {
- Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
+ Message<String> msg = consumer.receive(30, TimeUnit.SECONDS);
+ log.info("Received: {}", msg.getValue());
assertTrue(expectedMessages.contains(msg.getValue()));
expectedMessages.remove(msg.getValue());
}