You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/09/30 23:53:06 UTC
[kafka] branch trunk updated: MINOR: don't log config during unit
tests (#5671)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 49b5206 MINOR: don't log config during unit tests (#5671)
49b5206 is described below
commit 49b5206a82d0a47e01f4c5f7098119440061a1bb
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sun Sep 30 18:52:55 2018 -0500
MINOR: don't log config during unit tests (#5671)
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../apache/kafka/streams/TopologyTestDriver.java | 3 +-
.../streams/internals/QuietStreamsConfig.java | 33 ++++++++++++++++++++++
.../streams/processor/MockProcessorContext.java | 18 ++++++------
3 files changed, 43 insertions(+), 11 deletions(-)
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 05a128b..fd3dcfe 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -235,7 +236,7 @@ public class TopologyTestDriver implements Closeable {
private TopologyTestDriver(final InternalTopologyBuilder builder,
final Properties config,
final long initialWallClockTimeMs) {
- final StreamsConfig streamsConfig = new StreamsConfig(config);
+ final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
mockWallClockTime = new MockTime(initialWallClockTimeMs);
internalTopologyBuilder = builder;
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java
new file mode 100644
index 0000000..6132668
--- /dev/null
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/internals/QuietStreamsConfig.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals;
+
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.util.Map;
+
+/**
+ * A {@link StreamsConfig} that does not log its configuration on construction.
+ *
+ * This producer cleaner output for unit tests using the {@code test-utils},
+ * since logging the config is not really valuable in this context.
+ */
+public class QuietStreamsConfig extends StreamsConfig {
+ public QuietStreamsConfig(final Map<?, ?> props) {
+ super(props, false);
+ }
+}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index dc854b0..553428d 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -201,7 +202,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
- final StreamsConfig streamsConfig = new StreamsConfig(config);
+ final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
this.taskId = taskId;
this.config = streamsConfig;
this.stateDir = stateDir;
@@ -382,12 +383,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
punctuators.add(capturedPunctuator);
- return new Cancellable() {
- @Override
- public void cancel() {
- capturedPunctuator.cancel();
- }
- };
+ return capturedPunctuator::cancel;
}
/**
@@ -506,8 +502,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
// This interface is assumed by state stores that add change-logging.
// Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
- throw new UnsupportedOperationException("MockProcessorContext does not provide record collection. " +
- "For processor unit tests, use an in-memory state store with change-logging disabled. " +
- "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration.");
+ throw new UnsupportedOperationException(
+ "MockProcessorContext does not provide record collection. " +
+ "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+ "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
+ );
}
}