You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/10/15 13:45:53 UTC
[camel-kafka-connector] 02/06: core: compute content log level once
upon startup
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 7a0e751ee9ee2643627a344465e954765676f7e0
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Thu Oct 15 12:18:27 2020 +0200
core: compute content log level once upon startup
---
.../apache/camel/kafkaconnector/CamelSinkTask.java | 14 ++++-
.../camel/kafkaconnector/CamelSourceTask.java | 14 ++++-
.../camel/kafkaconnector/utils/TaskHelper.java | 58 ++++++++-----------
.../camel/kafkaconnector/utils/TaskHelperTest.java | 66 ++++++++--------------
4 files changed, 67 insertions(+), 85 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 25d94f1..e64ee16 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -25,6 +25,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.LoggingLevel;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
@@ -55,8 +56,8 @@ public class CamelSinkTask extends SinkTask {
private CamelKafkaConnectMain cms;
private ProducerTemplate producer;
- private CamelSinkConnectorConfig config;
private Endpoint localEndpoint;
+ private LoggingLevel loggingLevel = LoggingLevel.OFF;
@Override
public String version() {
@@ -68,7 +69,14 @@ public class CamelSinkTask extends SinkTask {
try {
LOG.info("Starting CamelSinkTask connector task");
Map<String, String> actualProps = TaskHelper.mergeProperties(getDefaultConfig(), props);
- config = getCamelSinkConnectorConfig(actualProps);
+ CamelSinkConnectorConfig config = getCamelSinkConnectorConfig(actualProps);
+
+ try {
+ String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
+ loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
+ } catch (Exception e) {
+ LOG.debug("Invalid value for " + CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF + "property");
+ }
String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
@@ -124,7 +132,7 @@ public class CamelSinkTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> sinkRecords) {
for (SinkRecord record : sinkRecords) {
- TaskHelper.logRecordContent(LOG, record, config);
+ TaskHelper.logRecordContent(LOG, loggingLevel, record);
Exchange exchange = new DefaultExchange(producer.getCamelContext());
exchange.getMessage().setBody(record.value());
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index c793f1b..247edd3 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.LoggingLevel;
import org.apache.camel.PollingConsumer;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain;
@@ -52,12 +53,12 @@ public class CamelSourceTask extends SourceTask {
private static final String LOCAL_URL = "direct:end";
private CamelKafkaConnectMain cms;
- private CamelSourceConnectorConfig config;
private PollingConsumer consumer;
private String[] topics;
private Long maxBatchPollSize;
private Long maxPollDuration;
private String camelMessageHeaderKey;
+ private LoggingLevel loggingLevel = LoggingLevel.OFF;
@Override
public String version() {
@@ -69,7 +70,14 @@ public class CamelSourceTask extends SourceTask {
try {
LOG.info("Starting CamelSourceTask connector task");
Map<String, String> actualProps = TaskHelper.mergeProperties(getDefaultConfig(), props);
- config = getCamelSourceConnectorConfig(actualProps);
+ CamelSourceConnectorConfig config = getCamelSourceConnectorConfig(actualProps);
+
+ try {
+ String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
+ loggingLevel = LoggingLevel.valueOf(levelStr.toLowerCase());
+ } catch (Exception e) {
+ LOG.debug("Invalid value for " + CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF + "property");
+ }
maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
maxPollDuration = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF);
@@ -156,7 +164,7 @@ public class CamelSourceTask extends SourceTask {
setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
}
- TaskHelper.logRecordContent(LOG, record, config);
+ TaskHelper.logRecordContent(LOG, loggingLevel, record);
records.add(record);
}
collectedRecords++;
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index ecdb5a3..e7f8c83 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -24,14 +24,10 @@ import java.util.Set;
import org.apache.camel.LoggingLevel;
import org.apache.camel.catalog.RuntimeCamelCatalog;
-import org.apache.camel.kafkaconnector.CamelSinkConnectorConfig;
-import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
import org.apache.camel.tooling.model.BaseOptionModel;
import org.apache.camel.tooling.model.ComponentModel;
import org.apache.camel.tooling.model.JsonMapper;
-import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
public final class TaskHelper {
@@ -98,37 +94,29 @@ public final class TaskHelper {
return false;
}
- public static <CFG extends AbstractConfig> void logRecordContent(Logger logger, ConnectRecord<?> record, CFG config) {
- if (logger != null && record != null && config != null) {
- // do not log record's content by default, as it may contain sensitive information
- LoggingLevel level = LoggingLevel.OFF;
- try {
- final String key = (record instanceof SourceRecord)
- ? CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF
- : CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF;
- level = LoggingLevel.valueOf(config.getString(key).toUpperCase());
- } catch (Exception e) {
- logger.warn("Invalid value for contentLogLevel property");
- }
- switch (level) {
- case TRACE:
- logger.trace(record.toString());
- break;
- case DEBUG:
- logger.debug(record.toString());
- break;
- case INFO:
- logger.info(record.toString());
- break;
- case WARN:
- logger.warn(record.toString());
- break;
- case ERROR:
- logger.error(record.toString());
- break;
- default:
- break;
- }
+ public static void logRecordContent(Logger logger, LoggingLevel level, ConnectRecord<?> record) {
+ if (level == LoggingLevel.OFF) {
+ return;
+ }
+
+ switch (level) {
+ case TRACE:
+ logger.trace(record.toString());
+ break;
+ case DEBUG:
+ logger.debug(record.toString());
+ break;
+ case INFO:
+ logger.info(record.toString());
+ break;
+ case WARN:
+ logger.warn(record.toString());
+ break;
+ case ERROR:
+ logger.error(record.toString());
+ break;
+ default:
+ break;
}
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
index 5da421c..f8e4758 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
@@ -27,13 +27,13 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.catalog.RuntimeCamelCatalog;
import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.kafkaconnector.CamelSourceConnectorConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.ext.LoggerWrapper;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -144,58 +144,36 @@ public class TaskHelperTest {
assertEquals("cql:localhost:8080/test", result);
}
- private CamelSourceConnectorConfig getSourceConnectorConfig(String logLevel) {
- return new CamelSourceConnectorConfig(CamelSourceConnectorConfig.conf(),
- Collections.singletonMap(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, logLevel));
- }
-
@Test
public void testlogRecordContent() {
- String partName = "abc123";
- Logger logger = new MyLogger(LoggerFactory.getLogger(TaskHelperTest.class), null);
- SourceRecord record = new SourceRecord(Collections.singletonMap("partition", partName),
+ final String partName = "abc123";
+ final MyLogger logger = new MyLogger(LoggerFactory.getLogger(TaskHelperTest.class), null);
+ final SourceRecord record = new SourceRecord(
+ Collections.singletonMap("partition", partName),
Collections.singletonMap("offset", "0"), null, null, null, null);
- Queue<String> logEvents = ((MyLogger)logger).getEvents();
-
- String offLevel = LoggingLevel.OFF.toString();
- TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(offLevel));
- assertNull(logEvents.poll());
-
- String traceLevel = LoggingLevel.TRACE.toString();
- TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(traceLevel));
- assertTrue(logEvents.peek().contains(traceLevel) && logEvents.poll().contains(partName));
-
- String debugLevel = LoggingLevel.DEBUG.toString();
- TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(debugLevel));
- assertTrue(logEvents.peek().contains(debugLevel) && logEvents.poll().contains(partName));
-
- String infoLevel = LoggingLevel.INFO.toString();
- TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(infoLevel));
- assertTrue(logEvents.peek().contains(infoLevel) && logEvents.poll().contains(partName));
-
- String warnLevel = LoggingLevel.WARN.toString();
- TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(warnLevel));
- assertTrue(logEvents.peek().contains(warnLevel) && logEvents.poll().contains(partName));
- String errorLevel = LoggingLevel.ERROR.toString();
- TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(errorLevel));
- assertTrue(logEvents.peek().contains(errorLevel) && logEvents.poll().contains(partName));
+ TaskHelper.logRecordContent(logger, LoggingLevel.OFF, record);
+ assertNull(logger.getEvents().poll());
- TaskHelper.logRecordContent(null, record, getSourceConnectorConfig(debugLevel));
- assertNull(logEvents.poll());
+ TaskHelper.logRecordContent(logger, LoggingLevel.TRACE, record);
+ assertThat(logger.getEvents().peek()).isNotNull().contains(LoggingLevel.TRACE.toString());
+ assertThat(logger.getEvents().poll()).isNotNull().contains(partName);
- TaskHelper.logRecordContent(logger, null, getSourceConnectorConfig(debugLevel));
- assertNull(logEvents.poll());
+ TaskHelper.logRecordContent(logger, LoggingLevel.DEBUG, record);
+ assertThat(logger.getEvents().peek()).isNotNull().contains(LoggingLevel.DEBUG.toString());
+ assertThat(logger.getEvents().poll()).isNotNull().contains(partName);
- TaskHelper.logRecordContent(logger, record, null);
- assertNull(logEvents.poll());
+ TaskHelper.logRecordContent(logger, LoggingLevel.INFO, record);
+ assertThat(logger.getEvents().peek()).isNotNull().contains(LoggingLevel.INFO.toString());
+ assertThat(logger.getEvents().poll()).isNotNull().contains(partName);
- String invalidLevel = "NOLOG";
- TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(invalidLevel));
- assertTrue(logEvents.poll().contains(warnLevel));
+ TaskHelper.logRecordContent(logger, LoggingLevel.WARN, record);
+ assertThat(logger.getEvents().peek()).isNotNull().contains(LoggingLevel.WARN.toString());
+ assertThat(logger.getEvents().poll()).isNotNull().contains(partName);
- TaskHelper.logRecordContent(logger, record, getSourceConnectorConfig(null));
- assertTrue(logEvents.poll().contains(warnLevel));
+ TaskHelper.logRecordContent(logger, LoggingLevel.ERROR, record);
+ assertThat(logger.getEvents().peek()).isNotNull().contains(LoggingLevel.ERROR.toString());
+ assertThat(logger.getEvents().poll()).isNotNull().contains(partName);
}
class MyLogger extends LoggerWrapper {