You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/09/30 23:53:06 UTC

[kafka] branch trunk updated: MINOR: don't log config during unit tests (#5671)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 49b5206  MINOR: don't log config during unit tests (#5671)
49b5206 is described below

commit 49b5206a82d0a47e01f4c5f7098119440061a1bb
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sun Sep 30 18:52:55 2018 -0500

    MINOR: don't log config during unit tests (#5671)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../apache/kafka/streams/TopologyTestDriver.java   |  3 +-
 .../streams/internals/QuietStreamsConfig.java      | 33 ++++++++++++++++++++++
 .../streams/processor/MockProcessorContext.java    | 18 ++++++------
 3 files changed, 43 insertions(+), 11 deletions(-)

diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 05a128b..fd3dcfe 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.internals.QuietStreamsConfig;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -235,7 +236,7 @@ public class TopologyTestDriver implements Closeable {
     private TopologyTestDriver(final InternalTopologyBuilder builder,
                                final Properties config,
                                final long initialWallClockTimeMs) {
-        final StreamsConfig streamsConfig = new StreamsConfig(config);
+        final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
         mockWallClockTime = new MockTime(initialWallClockTimeMs);
 
         internalTopologyBuilder = builder;
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java
new file mode 100644
index 0000000..6132668
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.Map;
+
+/**
+ * A {@link StreamsConfig} that does not log its configuration on construction.
+ *
+ * This producer cleaner output for unit tests using the {@code test-utils},
+ * since logging the config is not really valuable in this context.
+ */
+public class QuietStreamsConfig extends StreamsConfig {
+    public QuietStreamsConfig(final Map<?, ?> props) {
+        super(props, false);
+    }
+}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index dc854b0..553428d 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.internals.QuietStreamsConfig;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -201,7 +202,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
      */
     @SuppressWarnings({"WeakerAccess", "unused"})
     public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
-        final StreamsConfig streamsConfig = new StreamsConfig(config);
+        final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
         this.taskId = taskId;
         this.config = streamsConfig;
         this.stateDir = stateDir;
@@ -382,12 +383,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
         punctuators.add(capturedPunctuator);
 
-        return new Cancellable() {
-            @Override
-            public void cancel() {
-                capturedPunctuator.cancel();
-            }
-        };
+        return capturedPunctuator::cancel;
     }
 
     /**
@@ -506,8 +502,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         // This interface is assumed by state stores that add change-logging.
         // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
 
-        throw new UnsupportedOperationException("MockProcessorContext does not provide record collection. " +
-            "For processor unit tests, use an in-memory state store with change-logging disabled. " +
-            "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration.");
+        throw new UnsupportedOperationException(
+            "MockProcessorContext does not provide record collection. " +
+                "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+                "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
+        );
     }
 }