You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by aa...@apache.org on 2019/09/09 22:56:33 UTC

[pulsar] 01/01: Change logging for debezium key value pairs

This is an automated email from the ASF dual-hosted git repository.

aahmed pushed a commit to branch postgres2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fceee7c96312d4aa8b078aae66e03347d5b98381
Author: Ali Ahmed <al...@gmail.com>
AuthorDate: Mon Sep 9 15:55:40 2019 -0700

    Change logging for debezium key value pairs
---
 .../pulsar/tests/integration/io/DebeziumMySqlSourceTester.java      | 6 +++---
 .../pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
index 2d1b4b5..580bf2c 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java
@@ -91,9 +91,9 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain
         Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
         while(msg != null) {
             recordsNumber ++;
-            log.info("Received message: {}.", msg.getValue());
-            String key = new String(msg.getValue().getKey());
-            String value = new String(msg.getValue().getValue());
+            final String key = new String(msg.getValue().getKey());
+            final String value = new String(msg.getValue().getValue());
+            log.info("Received message: key = {}, value = {}.", key, value);
             Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
             Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
             consumer.acknowledge(msg);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
index 8d445f5..c3b6786 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumPostgreSqlSourceTester.java
@@ -92,9 +92,9 @@ public class DebeziumPostgreSqlSourceTester extends SourceTester<DebeziumPostgre
         Message<KeyValue<byte[], byte[]>> msg = consumer.receive(2, TimeUnit.SECONDS);
         while(msg != null) {
             recordsNumber ++;
-            log.info("Received message: {}.", msg.getValue());
-            String key = new String(msg.getValue().getKey());
-            String value = new String(msg.getValue().getValue());
+            final String key = new String(msg.getValue().getKey());
+            final String value = new String(msg.getValue().getValue());
+            log.info("Received message: key = {}, value = {}.", key, value);
             Assert.assertTrue(key.contains("dbserver1.inventory.products.Key"));
             Assert.assertTrue(value.contains("dbserver1.inventory.products.Value"));
             consumer.acknowledge(msg);