You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by aa...@apache.org on 2019/09/09 22:56:33 UTC
[pulsar] 01/01: Change logging for debezium key value pairs
This is an automated email from the ASF dual-hosted git repository.
aahmed pushed a commit to branch postgres2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fceee7c96312d4aa8b078aae66e03347d5b98381
Author: Ali Ahmed <al...@gmail.com>
AuthorDate: Mon Sep 9 15:55:40 2019 -0700
Change logging for debezium key value pairs
---
.../pulsar/tests/integration/io/DebeziumMySqlSourceTester.java | 6 +++---
.../pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java | 6 +++---
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
index 2d1b4b5..580bf2c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -91,9 +91,9 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain
Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
while(msg != null) {
recordsNumber ++;
- log.info("Received message: {}.", msg.getValue());
- String key = new String(msg.getValue().getKey());
- String value = new String(msg.getValue().getValue());
+ final String key = new String(msg.getValue().getKey());
+ final String value = new String(msg.getValue().getValue());
+ log.info("Received message: key = {}, value = {}.", key, value);
Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
consumer.acknowledge(msg);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
index 8d445f5..c3b6786 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
@@ -92,9 +92,9 @@ public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgre
Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
while(msg != null) {
recordsNumber ++;
- log.info("Received message: {}.", msg.getValue());
- String key = new String(msg.getValue().getKey());
- String value = new String(msg.getValue().getValue());
+ final String key = new String(msg.getValue().getKey());
+ final String value = new String(msg.getValue().getValue());
+ log.info("Received message: key = {}, value = {}.", key, value);
Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
consumer.acknowledge(msg);