You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/02/18 17:18:55 UTC
[camel-kafka-connector] branch master updated: fixed #980 :
camel.source.contentLogLevel config not honored in source connectors
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 3b91864 fixed #980 : camel.source.contentLogLevel config not honored in source connectors
3b91864 is described below
commit 3b91864173ef7dcc3e99eea4bcd17f0d7561ca71
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Thu Feb 4 23:12:27 2021 +0100
fixed #980 : camel.source.contentLogLevel config not honored in source connectors
---
.../org/apache/camel/kafkaconnector/CamelSinkTask.java | 14 +++++++++++---
.../org/apache/camel/kafkaconnector/CamelSourceTask.java | 15 +++++++++++----
.../apache/camel/kafkaconnector/CamelSinkTaskTest.java | 13 +++++++++++++
.../apache/camel/kafkaconnector/CamelSourceTaskTest.java | 14 ++++++++++++++
4 files changed, 49 insertions(+), 7 deletions(-)
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 9352ed6..82c16d2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -54,10 +54,10 @@ public class CamelSinkTask extends SinkTask {
private static final String LOCAL_URL = "direct:start";
private ErrantRecordReporter reporter;
-
private CamelKafkaConnectMain cms;
private ProducerTemplate producer;
private Endpoint localEndpoint;
+
private LoggingLevel loggingLevel = LoggingLevel.OFF;
private boolean mapProperties;
private boolean mapHeaders;
@@ -83,11 +83,11 @@ public class CamelSinkTask extends SinkTask {
}
}
+ String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
try {
- String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
} catch (Exception e) {
- LOG.debug("Invalid value for {} property", CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
+ LOG.debug("Invalid value {} for {} property", levelStr.toUpperCase(), CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
}
String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
@@ -239,4 +239,12 @@ public class CamelSinkTask extends SinkTask {
CamelKafkaConnectMain getCms() {
return cms;
}
+
+ public LoggingLevel getLoggingLevel() {
+ return loggingLevel;
+ }
+
+ public void setLoggingLevel(LoggingLevel loggingLevel) {
+ this.loggingLevel = loggingLevel;
+ }
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 4c138af..16e6bfc 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -73,11 +73,11 @@ public class CamelSourceTask extends SourceTask {
Map<String, String> actualProps = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props);
CamelSourceConnectorConfig config = getCamelSourceConnectorConfig(actualProps);
+ String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
try {
- String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
- loggingLevel = LoggingLevel.valueOf(levelStr.toLowerCase());
+ loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
} catch (Exception e) {
- LOG.debug("Invalid value for {} property", CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
+ LOG.error("Invalid value {} for {} property", levelStr.toUpperCase(), CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
}
maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
@@ -153,7 +153,6 @@ public class CamelSourceTask extends SourceTask {
return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
}
-
@Override
public synchronized List<SourceRecord> poll() {
final long startPollEpochMilli = Instant.now().toEpochMilli();
@@ -313,4 +312,12 @@ public class CamelSourceTask extends SourceTask {
CamelKafkaConnectMain getCms() {
return cms;
}
+
+ public LoggingLevel getLoggingLevel() {
+ return loggingLevel;
+ }
+
+ public void setLoggingLevel(LoggingLevel loggingLevel) {
+ this.loggingLevel = loggingLevel;
+ }
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 5aaca7f..bab0a5d 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -1111,4 +1111,17 @@ public class CamelSinkTaskTest {
}
}
+ @Test
+ public void testContentLogLevelConfiguration() {
+ Map<String, String> props = new HashMap<>();
+ props.put(TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+ props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, "INFO");
+
+ CamelSinkTask sinkTask = new CamelSinkTask();
+ sinkTask.start(props);
+ assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel());
+
+ sinkTask.stop();
+ }
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index b2a7c4e..21d56fc 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -607,4 +607,18 @@ public class CamelSourceTaskTest {
sourceTask.stop();
}
}
+
+ @Test
+ public void testContentLogLevelConfiguration() {
+ Map<String, String> props = new HashMap<>();
+ props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI);
+ props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, "INFO");
+
+ CamelSourceTask sourceTask = new CamelSourceTask();
+ sourceTask.start(props);
+ assertEquals(LoggingLevel.INFO, sourceTask.getLoggingLevel());
+
+ sourceTask.stop();
+ }
}