You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2021/02/18 17:18:55 UTC

[camel-kafka-connector] branch master updated: fixed #980 : camel.source.contentLogLevel config not honored in source connectors

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b91864  fixed #980 : camel.source.contentLogLevel config not honored in source connectors
3b91864 is described below

commit 3b91864173ef7dcc3e99eea4bcd17f0d7561ca71
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Thu Feb 4 23:12:27 2021 +0100

    fixed #980 : camel.source.contentLogLevel config not honored in source connectors
---
 .../org/apache/camel/kafkaconnector/CamelSinkTask.java    | 14 +++++++++++---
 .../org/apache/camel/kafkaconnector/CamelSourceTask.java  | 15 +++++++++++----
 .../apache/camel/kafkaconnector/CamelSinkTaskTest.java    | 13 +++++++++++++
 .../apache/camel/kafkaconnector/CamelSourceTaskTest.java  | 14 ++++++++++++++
 4 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 9352ed6..82c16d2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -54,10 +54,10 @@ public class CamelSinkTask extends SinkTask {
     private static final String LOCAL_URL = "direct:start";
     private ErrantRecordReporter reporter;
 
-
     private CamelKafkaConnectMain cms;
     private ProducerTemplate producer;
     private Endpoint localEndpoint;
+
     private LoggingLevel loggingLevel = LoggingLevel.OFF;
     private boolean mapProperties;
     private boolean mapHeaders;
@@ -83,11 +83,11 @@ public class CamelSinkTask extends SinkTask {
                 }
             }
 
+            String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
             try {
-                String levelStr = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
                 loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
             } catch (Exception e) {
-                LOG.debug("Invalid value for {} property", CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
+                LOG.debug("Invalid value {} for {} property", levelStr.toUpperCase(), CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF);
             }
 
             String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
@@ -239,4 +239,12 @@ public class CamelSinkTask extends SinkTask {
     CamelKafkaConnectMain getCms() {
         return cms;
     }
+
+    public LoggingLevel getLoggingLevel() {
+        return loggingLevel;
+    }
+
+    public void setLoggingLevel(LoggingLevel loggingLevel) {
+        this.loggingLevel = loggingLevel;
+    }
 }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 4c138af..16e6bfc 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -73,11 +73,11 @@ public class CamelSourceTask extends SourceTask {
             Map<String, String> actualProps = TaskHelper.combineDefaultAndLoadedProperties(getDefaultConfig(), props);
             CamelSourceConnectorConfig config = getCamelSourceConnectorConfig(actualProps);
 
+            String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
             try {
-                String levelStr = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
-                loggingLevel = LoggingLevel.valueOf(levelStr.toLowerCase());
+                loggingLevel = LoggingLevel.valueOf(levelStr.toUpperCase());
             } catch (Exception e) {
-                LOG.debug("Invalid value for {} property", CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
+                LOG.error("Invalid value {} for {} property", levelStr.toUpperCase(), CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF);
             }
 
             maxBatchPollSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF);
@@ -153,7 +153,6 @@ public class CamelSourceTask extends SourceTask {
         return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli);
     }
 
-
     @Override
     public synchronized List<SourceRecord> poll() {
         final long startPollEpochMilli = Instant.now().toEpochMilli();
@@ -313,4 +312,12 @@ public class CamelSourceTask extends SourceTask {
     CamelKafkaConnectMain getCms() {
         return cms;
     }
+
+    public LoggingLevel getLoggingLevel() {
+        return loggingLevel;
+    }
+
+    public void setLoggingLevel(LoggingLevel loggingLevel) {
+        this.loggingLevel = loggingLevel;
+    }
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 5aaca7f..bab0a5d 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -1111,4 +1111,17 @@ public class CamelSinkTaskTest {
         }
     }
 
+    @Test
+    public void testContentLogLevelConfiguration() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, "INFO");
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+        assertEquals(LoggingLevel.INFO, sinkTask.getLoggingLevel());
+
+        sinkTask.stop();
+    }
 }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index b2a7c4e..21d56fc 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -607,4 +607,18 @@ public class CamelSourceTaskTest {
             sourceTask.stop();
         }
     }
+
+    @Test
+    public void testContentLogLevelConfiguration() {
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI);
+        props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, "INFO");
+
+        CamelSourceTask sourceTask = new CamelSourceTask();
+        sourceTask.start(props);
+        assertEquals(LoggingLevel.INFO, sourceTask.getLoggingLevel());
+
+        sourceTask.stop();
+    }
 }