You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/11/30 06:30:08 UTC
[kafka] branch trunk updated: MINOR: Add logging to Connect SMTs
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4712a36 MINOR: Add logging to Connect SMTs
4712a36 is described below
commit 4712a3641619e86b8e6d901355088f6ae06e9f37
Author: Cyrus Vafadari <cy...@confluent.io>
AuthorDate: Thu Nov 29 22:29:50 2018 -0800
MINOR: Add logging to Connect SMTs
Includes Update to ConnectRecord string representation to give
visibility into schemas, useful in SMT tracing
Author: Cyrus Vafadari <cy...@confluent.io>
Reviewers: Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #5860 from cyrusv/cyrus-logging
---
.../org/apache/kafka/connect/connector/ConnectRecord.java | 2 ++
.../apache/kafka/connect/runtime/TransformationChain.java | 13 +++++++++++++
.../main/java/org/apache/kafka/connect/runtime/Worker.java | 2 ++
.../main/java/org/apache/kafka/connect/transforms/Cast.java | 4 ++++
.../apache/kafka/connect/transforms/SetSchemaMetadata.java | 7 ++++++-
5 files changed, 27 insertions(+), 1 deletion(-)
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
index aa58e63..7eced85 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
@@ -140,7 +140,9 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
"topic='" + topic + '\'' +
", kafkaPartition=" + kafkaPartition +
", key=" + key +
+ ", keySchema=" + keySchema.toString() +
", value=" + value +
+ ", valueSchema=" + valueSchema.toString() +
", timestamp=" + timestamp +
", headers=" + headers +
'}';
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
index 3680905..a077a01 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -20,11 +20,15 @@ import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.transforms.Transformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Objects;
+import java.util.StringJoiner;
public class TransformationChain<R extends ConnectRecord<R>> {
+ private static final Logger log = LoggerFactory.getLogger(TransformationChain.class);
private final List<Transformation<R>> transformations;
private final RetryWithToleranceOperator retryWithToleranceOperator;
@@ -40,6 +44,8 @@ public class TransformationChain<R extends ConnectRecord<R>> {
for (final Transformation<R> transformation : transformations) {
final R current = record;
+ log.trace("Applying transformation {} to {}",
+ transformation.getClass().getName(), record);
// execute the operation
record = retryWithToleranceOperator.execute(() -> transformation.apply(current), Stage.TRANSFORMATION, transformation.getClass());
@@ -68,4 +74,11 @@ public class TransformationChain<R extends ConnectRecord<R>> {
return Objects.hash(transformations);
}
+ public String toString() {
+ StringJoiner chain = new StringJoiner(", ", getClass().getName() + "{", "}");
+ for (Transformation<R> transformation : transformations) {
+ chain.add(transformation.getClass().getName());
+ }
+ return chain.toString();
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 6e021b9..81a165c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -494,6 +494,7 @@ public class Worker {
if (task instanceof SourceTask) {
retryWithToleranceOperator.reporters(sourceTaskReporters(id, connConfig, errorHandlingMetrics));
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
+ log.info("Initializing: {}", transformationChain);
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
@@ -506,6 +507,7 @@ public class Worker {
time, retryWithToleranceOperator);
} else if (task instanceof SinkTask) {
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator);
+ log.info("Initializing: {}", transformationChain);
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings());
retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics));
return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index 07ccd37..3dc6dc7 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -32,6 +32,8 @@ import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.EnumSet;
import java.util.HashMap;
@@ -44,6 +46,7 @@ import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public abstract class Cast<R extends ConnectRecord<R>> implements Transformation<R> {
+ private static final Logger log = LoggerFactory.getLogger(Cast.class);
// TODO: Currently we only support top-level field casting. Ideally we could use a dotted notation in the spec to
// allow casting nested fields.
@@ -156,6 +159,7 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
final Object origFieldValue = value.get(field);
final Schema.Type targetType = casts.get(field.name());
final Object newFieldValue = targetType != null ? castValueToType(field.schema(), origFieldValue, targetType) : origFieldValue;
+ log.trace("Cast field '{}' from '{}' to '{}'", field.name(), origFieldValue, newFieldValue);
updatedValue.put(updatedSchema.field(field.name()), newFieldValue);
}
return newRecord(record, updatedSchema, updatedValue);
diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
index 901ac9f..fd3cbf3 100644
--- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
+++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java
@@ -24,12 +24,15 @@ import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireSchema;
public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements Transformation<R> {
+ private static final Logger log = LoggerFactory.getLogger(SetSchemaMetadata.class);
public static final String OVERVIEW_DOC =
"Set the schema name, version or both on the record's key (<code>" + Key.class.getName() + "</code>)"
@@ -76,6 +79,8 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T
isMap ? schema.keySchema() : null,
isMap || isArray ? schema.valueSchema() : null
);
+ log.trace("Applying SetSchemaMetadata SMT. Original schema: {}, updated schema: {}",
+ schema, updatedSchema);
return newRecord(record, updatedSchema);
}
@@ -149,4 +154,4 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T
}
return keyOrValue;
}
-}
\ No newline at end of file
+}