You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/15 13:45:53 UTC

[camel-kafka-connector] 02/06: core: compute content log level once upon startup

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 7a0e751ee9ee2643627a344465e954765676f7e0
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Thu Oct 15 12:18:27 2020 +0200

    core: compute content log level once upon startup
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java | 14 ++++-
 .../camel/kafkaconnector/CamelSourceTask.java      | 14 ++++-
 .../camel/kafkaconnector/utils/TaskHelper.java     | 58 ++++++++-----------
 .../camel/kafkaconnector/utils/TaskHelperTest.java | 66 ++++++++--------------
 4 files changed, 67 insertions(+), 85 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 25d94f1..e64ee16 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -25,6 +25,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.LoggingLevel;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
@@ -55,8 +56,8 @@ public class CamelSinkTask extends SinkTask {
 
     private CamelKafkaConnectMain cms;
     private ProducerTemplate producer;
-    private CamelSinkConnectorConfig config;
     private Endpoint localEndpoint;
+    private LoggingLevel loggingLevel = LoggingLevel.OFF;
 
     @Override
     public String version() {
@@ -68,7 +69,14 @@ public class CamelSinkTask extends SinkTask {
         try {
             LOG.info("Starting CamelSinkTask connector task");
             Map<String, String> actualProps = TaskHelper.mergeProperties(getDefaultConfig(), props);
-            config = getCamelSinkConnectorConfig(actualProps);
+            CamelSinkConnectorConfig config = getCamelSinkConnectorConfig(actualProps);
+
+            try {
+                String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
+                loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
+            } catch (Exception e) {
+                LOG.debug("Invalid value for " + CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF + "property");
+            }
 
             String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
             final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
@@ -124,7 +132,7 @@ public class CamelSinkTask extends SinkTask {
     @Override
     public void put(Collection<SinkRecord> sinkRecords) {
         for (SinkRecord record : sinkRecords) {
-            TaskHelper.logRecordContent(LOG, record, config);
+            TaskHelper.logRecordContent(LOG, loggingLevel, record);
 
             Exchange exchange = new DefaultExchange(producer.getCamelContext());
             exchange.getMessage().setBody(record.value());
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index c793f1b..247edd3 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.LoggingLevel;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
@@ -52,12 +53,12 @@ public class CamelSourceTask extends SourceTask {
     private static final String LOCAL_URL = "direct:end";
 
     private CamelKafkaConnectMain cms;
-    private CamelSourceConnectorConfig config;
     private PollingConsumer consumer;
     private String[] topics;
     private Long maxBatchPollSize;
     private Long maxPollDuration;
     private String camelMessageHeaderKey;
+    private LoggingLevel loggingLevel = LoggingLevel.OFF;
 
     @Override
     public String version() {
@@ -69,7 +70,14 @@ public class CamelSourceTask extends SourceTask {
         try {
             LOG.info("Starting CamelSourceTask connector task");
             Map<String, String> actualProps = TaskHelper.mergeProperties(getDefaultConfig(), props);
-            config = getCamelSourceConnectorConfig(actualProps);
+            CamelSourceConnectorConfig config = getCamelSourceConnectorConfig(actualProps);
+
+            try {
+                String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
+                loggingLevel = LoggingLevel.valueOf(levelStr.toLowerCase());
+            } catch (Exception e) {
+                LOG.debug("Invalid value for " + CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF + "property");
+            }
 
             maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
             maxPollDuration = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF);
@@ -156,7 +164,7 @@ public class CamelSourceTask extends SourceTask {
                     setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
                 }
 
-                TaskHelper.logRecordContent(LOG, record, config);
+                TaskHelper.logRecordContent(LOG, loggingLevel, record);
                 records.add(record);
             }
             collectedRecords++;
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index ecdb5a3..e7f8c83 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -24,14 +24,10 @@ import java.util.Set;
 
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.catalog.RuntimeCamelCatalog;
-import org.apache.camel.kafkaconnector.CamelSinkConnectorConfig;
-import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
 import org.apache.camel.tooling.model.BaseOptionModel;
 import org.apache.camel.tooling.model.ComponentModel;
 import org.apache.camel.tooling.model.JsonMapper;
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 
 public final class TaskHelper {
@@ -98,37 +94,29 @@ public final class TaskHelper {
         return false;
     }
 
-    public static <CFG extends AbstractConfig> void logRecordContent(Logger logger, ConnectRecord<?> record, CFG config) {
-        if (logger != null && record != null && config != null) {
-            // do not log record's content by default, as it may contain sensitive information
-            LoggingLevel level = LoggingLevel.OFF;
-            try {
-                final String key = (record instanceof SourceRecord)
-                    ? CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF
-                    : CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF;
-                level = LoggingLevel.valueOf(config.getString(key).toUpperCase());
-            } catch (Exception e) {
-                logger.warn("Invalid value for contentLogLevel property");
-            }
-            switch (level) {
-                case TRACE:
-                    logger.trace(record.toString());
-                    break;
-                case DEBUG:
-                    logger.debug(record.toString());
-                    break;
-                case INFO:
-                    logger.info(record.toString());
-                    break;
-                case WARN:
-                    logger.warn(record.toString());
-                    break;
-                case ERROR:
-                    logger.error(record.toString());
-                    break;
-                default:
-                    break;
-            }
+    public static void logRecordContent(Logger logger, LoggingLevel level, ConnectRecord<?> record) {
+        if (level == LoggingLevel.OFF) {
+            return;
+        }
+
+        switch (level) {
+            case TRACE:
+                logger.trace(record.toString());
+                break;
+            case DEBUG:
+                logger.debug(record.toString());
+                break;
+            case INFO:
+                logger.info(record.toString());
+                break;
+            case WARN:
+                logger.warn(record.toString());
+                break;
+            case ERROR:
+                logger.error(record.toString());
+                break;
+            default:
+                break;
         }
     }
 
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
index 5da421c..f8e4758 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
@@ -27,13 +27,13 @@ import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.catalog.RuntimeCamelCatalog;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.ext.LoggerWrapper;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -144,58 +144,36 @@ public class TaskHelperTest {
         assertEquals("cql:localhost:8080/test", result);
     }
 
-    private CamelSourceConnectorConfig getSourceConnectorConfig(String logLevel) {
-        return new CamelSourceConnectorConfig(CamelSourceConnectorConfig.conf(),
-            Collections.singletonMap(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, logLevel));
-    }
-
     @Test
     public void testlogRecordContent() {
-        String partName = "abc123";
-        Logger logger = new MyLogger(LoggerFactory.getLogger(TaskHelperTest.class), null);
-        SourceRecord record = new SourceRecord(Collections.singletonMap("partition", partName),
+        final String partName = "abc123";
+        final MyLogger logger = new MyLogger(LoggerFactory.getLogger(TaskHelperTest.class), null);
+        final SourceRecord record = new SourceRecord(
+            Collections.singletonMap("partition", partName),
             Collections.singletonMap("offset", "0"), null, null, null, null);
-        Queue<String> logEvents = ((MyLogger)logger).getEvents();
-
-        String offLevel = LoggingLevel.OFF.toString();
-        TaskHelper.logRecordContent(logger,  record, getSourceConnectorConfig(offLevel));
-        assertNull(logEvents.poll());
-
-        String traceLevel = LoggingLevel.TRACE.toString();
-        TaskHelper.logRecordContent(logger,  record, getSourceConnectorConfig(traceLevel));
-        assertTrue(logEvents.peek().contains(traceLevel) && logEvents.poll().contains(partName));
-
-        String debugLevel = LoggingLevel.DEBUG.toString();
-        TaskHelper.logRecordContent(logger,  record, getSourceConnectorConfig(debugLevel));
-        assertTrue(logEvents.peek().contains(debugLevel) && logEvents.poll().contains(partName));
-
-        String infoLevel = LoggingLevel.INFO.toString();
-        TaskHelper.logRecordContent(logger,  record, getSourceConnectorConfig(infoLevel));
-        assertTrue(logEvents.peek().contains(infoLevel) && logEvents.poll().contains(partName));
-
-        String warnLevel = LoggingLevel.WARN.toString();
-        TaskHelper.logRecordContent(logger,  record, getSourceConnectorConfig(warnLevel));
-        assertTrue(logEvents.peek().contains(warnLevel) && logEvents.poll().contains(partName));
 
-        String errorLevel = LoggingLevel.ERROR.toString();
-        TaskHelper.logRecordContent(logger,  record, getSourceConnectorConfig(errorLevel));
-        assertTrue(logEvents.peek().contains(errorLevel) && logEvents.poll().contains(partName));
+        TaskHelper.logRecordContent(logger, LoggingLevel.OFF, record);
+        assertNull(logger.getEvents().poll());
 
-        TaskHelper.logRecordContent(null, record, getSourceConnectorConfig(debugLevel));
-        assertNull(logEvents.poll());
+        TaskHelper.logRecordContent(logger, LoggingLevel.TRACE, record);
+        assertThat(logger.getEvents().peek()).isNotNull().contains(LoggingLevel.TRACE.toString());
+        assertThat(logger.getEvents().poll()).isNotNull().contains(partName);
 
-        TaskHelper.logRecordContent(logger,  null, getSourceConnectorConfig(debugLevel));
-        assertNull(logEvents.poll());
+        TaskHelper.logRecordContent(logger,  LoggingLevel.DEBUG, record);
+        assertThat(logger.getEvents().peek()).isNotNull().contains(LoggingLevel.DEBUG.toString());
+        assertThat(logger.getEvents().poll()).isNotNull().contains(partName);
 
-        TaskHelper.logRecordContent(logger,  record, null);
-        assertNull(logEvents.poll());
+        TaskHelper.logRecordContent(logger,  LoggingLevel.INFO, record);
+        assertThat(logger.getEvents().peek()).isNotNull().contains(LoggingLevel.INFO.toString());
+        assertThat(logger.getEvents().poll()).isNotNull().contains(partName);
 
-        String invalidLevel = "NOLOG";
-        TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(invalidLevel));
-        assertTrue(logEvents.poll().contains(warnLevel));
+        TaskHelper.logRecordContent(logger,  LoggingLevel.WARN, record);
+        assertThat(logger.getEvents().peek()).isNotNull().contains(LoggingLevel.WARN.toString());
+        assertThat(logger.getEvents().poll()).isNotNull().contains(partName);
 
-        TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(null));
-        assertTrue(logEvents.poll().contains(warnLevel));
+        TaskHelper.logRecordContent(logger,  LoggingLevel.ERROR, record);
+        assertThat(logger.getEvents().peek()).isNotNull().contains(LoggingLevel.ERROR.toString());
+        assertThat(logger.getEvents().poll()).isNotNull().contains(partName);
     }
 
     class MyLogger extends LoggerWrapper {