You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/11/30 06:30:22 UTC

[kafka] branch 2.1 updated: MINOR: Add logging to Connect SMTs

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 45b3971  MINOR: Add logging to Connect SMTs
45b3971 is described below

commit 45b39710d8bfccf945f6aa1e392704ef6008339d
Author: Cyrus Vafadari <cy...@confluent.io>
AuthorDate: Thu Nov 29 22:29:50 2018 -0800

    MINOR: Add logging to Connect SMTs
    
    Includes Update to ConnectRecord string representation to give
    visibility into schemas, useful in SMT tracing
    
    Author: Cyrus Vafadari <cy...@confluent.io>
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5860 from cyrusv/cyrus-logging
    
    (cherry picked from commit 4712a3641619e86b8e6d901355088f6ae06e9f37)
    Signed-off-by: Ewen Cheslack-Postava <me...@ewencp.org>
---
 .../org/apache/kafka/connect/connector/ConnectRecord.java   |  2 ++
 .../apache/kafka/connect/runtime/TransformationChain.java   | 13 +++++++++++++
 .../main/java/org/apache/kafka/connect/runtime/Worker.java  |  2 ++
 .../main/java/org/apache/kafka/connect/transforms/Cast.java |  4 ++++
 .../apache/kafka/connect/transforms/SetSchemaMetadata.java  |  7 ++++++-
 5 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
index 2c5f514..55272c2 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
@@ -140,7 +140,9 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
                 "topic='" + topic + '\'' +
                 ", kafkaPartition=" + kafkaPartition +
                 ", key=" + key +
+                ", keySchema=" + keySchema.toString() +
                 ", value=" + value +
+                ", valueSchema=" + valueSchema.toString() +
                 ", timestamp=" + timestamp +
                 ", headers=" + headers +
                 '}';
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
index 3680905..a077a01 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -20,11 +20,15 @@ import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.StringJoiner;
 
 public class TransformationChain<R extends ConnectRecord<R>> {
+    private static final Logger log = LoggerFactory.getLogger(TransformationChain.class);
 
     private final List<Transformation<R>> transformations;
     private final RetryWithToleranceOperator retryWithToleranceOperator;
@@ -40,6 +44,8 @@ public class TransformationChain<R extends ConnectRecord<R>> {
         for (final Transformation<R> transformation : transformations) {
             final R current = record;
 
+            log.trace("Applying transformation {} to {}",
+                transformation.getClass().getName(), record);
             // execute the operation
             record = retryWithToleranceOperator.execute(() -> transformation.apply(current), Stage.TRANSFORMATION, transformation.getClass());
 
@@ -68,4 +74,11 @@ public class TransformationChain<R extends ConnectRecord<R>> {
         return Objects.hash(transformations);
     }
 
+    public String toString() {
+        StringJoiner chain = new StringJoiner(", ", getClass().getName() + "{", "}");
+        for (Transformation<R> transformation : transformations) {
+            chain.add(transformation.getClass().getName());
+        }
+        return chain.toString();
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index df73a43..1fd91d3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -493,6 +493,7 @@ public class Worker {
         if (task instanceof SourceTask) {
             retryWithToleranceOperator.reporters(sourceTaskReporters(id, connConfig, errorHandlingMetrics));
             TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
             OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
@@ -505,6 +506,7 @@ public class Worker {
                     time, retryWithToleranceOperator);
         } else if (task instanceof SinkTask) {
             TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics));
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index 07ccd37..3dc6dc7 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -32,6 +32,8 @@ import org.apache.kafka.connect.data.Values;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.transforms.util.SchemaUtil;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -44,6 +46,7 @@ import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
 import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
 
 public abstract class Cast<R extends ConnectRecord<R>> implements Transformation<R> {
+    private static final Logger log = LoggerFactory.getLogger(Cast.class);
 
     // TODO: Currently we only support top-level field casting. Ideally we could use a dotted notation in the spec to
     // allow casting nested fields.
@@ -156,6 +159,7 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
             final Object origFieldValue = value.get(field);
             final Schema.Type targetType = casts.get(field.name());
             final Object newFieldValue = targetType != null ? castValueToType(field.schema(), origFieldValue, targetType) : origFieldValue;
+            log.trace("Cast field '{}' from '{}' to '{}'", field.name(), origFieldValue, newFieldValue);
             updatedValue.put(updatedSchema.field(field.name()), newFieldValue);
         }
         return newRecord(record, updatedSchema, updatedValue);
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
index 901ac9f..fd3cbf3 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
@@ -24,12 +24,15 @@ import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 import static org.apache.kafka.connect.transforms.util.Requirements.requireSchema;
 
 public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements Transformation<R> {
+    private static final Logger log = LoggerFactory.getLogger(SetSchemaMetadata.class);
 
     public static final String OVERVIEW_DOC =
             "Set the schema name, version or both on the record's key (<code>" + Key.class.getName() + "</code>)"
@@ -76,6 +79,8 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T
                 isMap ? schema.keySchema() : null,
                 isMap || isArray ? schema.valueSchema() : null
         );
+        log.trace("Applying SetSchemaMetadata SMT. Original schema: {}, updated schema: {}",
+            schema, updatedSchema);
         return newRecord(record, updatedSchema);
     }
 
@@ -149,4 +154,4 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T
         }
         return keyOrValue;
     }
-}
\ No newline at end of file
+}