You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/08 17:29:17 UTC

[GitHub] [kafka] vvcephei opened a new pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

vvcephei opened a new pull request #9396:
URL: https://github.com/apache/kafka/pull/9396


   Implements KIP-478 for the test-utils module:
   * adds mocks of the new ProcessorContext and StateStoreContext
   * adds tests that all stores and store builders are usable with the new mock
   * adds tests that the new Processor api is usable with the new mock
   * updates the demonstration Processor to the new api
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#discussion_r502737067



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {

Review comment:
       Unfortunately, that would actually expose `InternalProcessorContext` itself as a public interface (any supertype of a public type is a public type).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#discussion_r501891955



##########
File path: checkstyle/suppressions.xml
##########
@@ -194,13 +194,13 @@
               files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest).java"/>
+              files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>

Review comment:
       This test violates both measures of complexity by virtue of the way it works: the test specifically includes a lot of loops so that it can generate every combination of store and store builder configuration to verify that everything works as expected.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -162,4 +164,30 @@ public Headers headers() {
     public Record<K, V> withHeaders(final Headers headers) {
         return new Record<>(key, value, timestamp, headers);
     }
+
+    @Override
+    public String toString() {

Review comment:
       added for quality-of-life debugging

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -75,18 +75,25 @@ public String name() {
     @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        this.context = (InternalProcessorContext) context;
-
-        final StreamsMetricsImpl metrics = this.context.metrics();
         final String threadId = Thread.currentThread().getName();
         final String taskName = context.taskId().toString();
-        expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
-            threadId,
-            taskName,
-            metricScope,
-            name,
-            metrics
-        );
+
+        // The provided context is not required to implement InternalProcessorContext,
+        // If it doesn't, we can't record this metric.
+        if (context instanceof InternalProcessorContext) {

Review comment:
       As a testament to `MockProcessorContextStateStoreTest`, it actually found this bug. I had overlooked this usage while making the other root stores context-implementation agnostic in the last PR.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata ================================================
+    private MockRecordMetadata recordMetadata;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward<? extends KForward, ? extends VForward>> capturedForwards = new LinkedList<>();
+    private boolean committed = false;
+
+    private static final class MockRecordMetadata implements RecordMetadata {
+        private final String topic;
+        private final int partition;
+        private final long offset;
+
+        private MockRecordMetadata(final String topic, final int partition, final long offset) {
+            this.topic = topic;
+            this.partition = partition;
+            this.offset = offset;
+        }
+
+        @Override
+        public String topic() {
+            return topic;
+        }
+
+        @Override
+        public int partition() {
+            return partition;
+        }
+
+        @Override
+        public long offset() {
+            return offset;
+        }
+    }
+
+    /**
+     * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+     */
+    public static final class CapturedPunctuator {
+        private final Duration interval;
+        private final PunctuationType type;
+        private final Punctuator punctuator;
+        private boolean cancelled = false;
+
+        private CapturedPunctuator(final Duration interval, final PunctuationType type, final Punctuator punctuator) {
+            this.interval = interval;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        public Duration getInterval() {
+            return interval;
+        }
+
+        public PunctuationType getType() {
+            return type;
+        }
+
+        public Punctuator getPunctuator() {
+            return punctuator;
+        }
+
+        public void cancel() {
+            cancelled = true;
+        }
+
+        public boolean cancelled() {
+            return cancelled;
+        }
+    }
+
+    public static final class CapturedForward<K, V> {
+
+        private final Record<K, V> record;
+        private final Optional<String> childName;
+
+        public CapturedForward(final Record<K, V> record) {
+            this(record, Optional.empty());
+        }
+
+        public CapturedForward(final Record<K, V> record, final Optional<String> childName) {
+            this.record = Objects.requireNonNull(record);
+            this.childName = Objects.requireNonNull(childName);
+        }
+
+        /**
+         * The child this data was forwarded to.
+         *
+         * @return If present, the child name the record was forwarded to.
+         *         If empty, the forward was a broadcast.
+         */
+        public Optional<String> childName() {
+            return childName;
+        }
+
+        /**
+         * The record that was forwarded.
+         *
+         * @return The forwarded record. Not null.
+         */
+        public Record<K, V> record() {
+            return record;
+        }
+
+        @Override
+        public String toString() {
+            return "CapturedForward{" +
+                "record=" + record +
+                ", childName=" + childName +
+                '}';
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final CapturedForward<?, ?> that = (CapturedForward<?, ?>) o;
+            return Objects.equals(record, that.record) &&
+                Objects.equals(childName, that.childName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(record, childName);
+        }
+    }
+
+    // constructors ================================================
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     */
+    public MockProcessorContext() {
+        this(
+            mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+            )),
+            new TaskId(0, 0),
+            null
+        );
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     *
+     * @param config a Properties object, used to configure the context and the processor.
+     */
+    public MockProcessorContext(final Properties config) {
+        this(config, new TaskId(0, 0), null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+     *
+     * @param config   a {@link Properties} object, used to configure the context and the processor.
+     * @param taskId   a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+     * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+     */
+    public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+        this.taskId = taskId;
+        this.config = streamsConfig;
+        this.stateDir = stateDir;
+        final MetricConfig metricConfig = new MetricConfig();
+        metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+        final String threadId = Thread.currentThread().getName();
+        metrics = new StreamsMetricsImpl(
+            new Metrics(metricConfig),
+            threadId,
+            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM
+        );
+        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
+    }
+
+    @Override
+    public String applicationId() {
+        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        final Map<String, Object> combined = new HashMap<>();
+        combined.putAll(config.originals());
+        combined.putAll(config.values());
+        return combined;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return config.defaultKeySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return config.defaultValueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return Objects.requireNonNull(
+            stateDir,
+            "The stateDir constructor argument was needed (probably for a state store) but not supplied. " +
+                "You can either reconfigure your test so that it doesn't need access to the disk " +
+                "(such as using an in-memory store), or use the full MockProcessorContext constructor to supply " +
+                "a non-null stateDir argument."
+        );

Review comment:
       `stateDir` isn't a required argument, but having a null value for it and then trying to use a rocksdb store leads to a _very_ cryptic error like, "you have to open all column families". Placing a null-check right here allows us to fail fast and prints an explanatory message any time you use the context with a component that actually needs the stateDir.
   
   This was also a problem with the prior implementation, but I didn't notice it until I implemented `MockProcessorContextStateStoreTest`.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorSupplier.java
##########
@@ -18,43 +18,41 @@
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 
 import java.time.Duration;
 import java.util.Locale;
 
-public final class WindowedWordCountProcessorSupplier implements ProcessorSupplier<String, String> {
+public final class WindowedWordCountProcessorSupplier implements ProcessorSupplier<String, String, String, String> {

Review comment:
       Converting this processor to the new API.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {

Review comment:
       This is a clone of the `processor.MockProcessorContext`, but implementing the new `api.ProcessorContext` interface.
   
   The two context interfaces don't technically collide, but creating a standalone implementation allows us to provide type safety in the CapturedForward inner class without any backward compatibility concerns.
   
   It also (IMO) makes it easier for users to make a clean switch to the new API by removing any ambiguity about whether a test is making use of the old API or the new API, or some mixture of both.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -162,4 +164,30 @@ public Headers headers() {
     public Record<K, V> withHeaders(final Headers headers) {
         return new Record<>(key, value, timestamp, headers);
     }
+
+    @Override
+    public String toString() {
+        return "Record{" +
+            "key=" + key +
+            ", value=" + value +
+            ", timestamp=" + timestamp +
+            ", headers=" + headers +
+            '}';
+    }
+
+    @Override
+    public boolean equals(final Object o) {

Review comment:
       added for easier assertions in unit tests

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
##########
@@ -231,14 +230,13 @@ public void process(final String key, final Long value) {
 
         assertFalse(context.committed());
     }
-    @SuppressWarnings({"deprecation", "unchecked"}) // TODO deprecation will be fixed in KAFKA-10437

Review comment:
       Don't need to suppress deprecation anymore, since it's moved to the class level, and don't need unchecked anymore thanks to the return-type change of `getStateStore`.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+public class MockProcessorContextAPITest {

Review comment:
       This is a copy of the unit test for the old MockProcessorContext.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextStateStoreTest.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(value = Parameterized.class)
+public class MockProcessorContextStateStoreTest {

Review comment:
       This is a new test. I wanted to make sure that no matter what kind of stores users select, they'll get well-defined results with the new MockProcessorContext.
   
   Some store features are well-defined not to be supported (they throw an IllegalArgumentException when you try to enable caching or logging), but all store types should otherwise work.
   
   Here, we're defining "work" as "`init` doesn't throw an exception", since this is the main involvement that the context itself has with the store.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
##########
@@ -43,6 +43,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation") // this is a test of a deprecated API

Review comment:
       This is the pre-existing test of the old-API ProcessorContext.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
##########
@@ -53,9 +54,9 @@ public void setup() {
         keyValueStoreFacade = new KeyValueStoreFacade<>(mockedKeyValueTimestampStore);
     }
 
-    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
+    @SuppressWarnings("deprecation") // test of deprecated method

Review comment:
       My prior comment was wrong. We do want to test the deprecated init method as well as the non-deprecated one.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata ================================================
+    private MockRecordMetadata recordMetadata;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward<? extends KForward, ? extends VForward>> capturedForwards = new LinkedList<>();
+    private boolean committed = false;
+
+    private static final class MockRecordMetadata implements RecordMetadata {
+        private final String topic;
+        private final int partition;
+        private final long offset;
+
+        private MockRecordMetadata(final String topic, final int partition, final long offset) {
+            this.topic = topic;
+            this.partition = partition;
+            this.offset = offset;
+        }
+
+        @Override
+        public String topic() {
+            return topic;
+        }
+
+        @Override
+        public int partition() {
+            return partition;
+        }
+
+        @Override
+        public long offset() {
+            return offset;
+        }
+    }
+
+    /**
+     * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+     */
+    public static final class CapturedPunctuator {
+        private final Duration interval;
+        private final PunctuationType type;
+        private final Punctuator punctuator;
+        private boolean cancelled = false;
+
+        private CapturedPunctuator(final Duration interval, final PunctuationType type, final Punctuator punctuator) {
+            this.interval = interval;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        public Duration getInterval() {
+            return interval;
+        }
+
+        public PunctuationType getType() {
+            return type;
+        }
+
+        public Punctuator getPunctuator() {
+            return punctuator;
+        }
+
+        public void cancel() {
+            cancelled = true;
+        }
+
+        public boolean cancelled() {
+            return cancelled;
+        }
+    }
+
+    public static final class CapturedForward<K, V> {
+
+        private final Record<K, V> record;
+        private final Optional<String> childName;
+
+        public CapturedForward(final Record<K, V> record) {
+            this(record, Optional.empty());
+        }
+
+        public CapturedForward(final Record<K, V> record, final Optional<String> childName) {
+            this.record = Objects.requireNonNull(record);
+            this.childName = Objects.requireNonNull(childName);
+        }
+
+        /**
+         * The child this data was forwarded to.
+         *
+         * @return If present, the child name the record was forwarded to.
+         *         If empty, the forward was a broadcast.
+         */
+        public Optional<String> childName() {
+            return childName;
+        }
+
+        /**
+         * The record that was forwarded.
+         *
+         * @return The forwarded record. Not null.
+         */
+        public Record<K, V> record() {
+            return record;
+        }
+
+        @Override
+        public String toString() {
+            return "CapturedForward{" +
+                "record=" + record +
+                ", childName=" + childName +
+                '}';
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final CapturedForward<?, ?> that = (CapturedForward<?, ?>) o;
+            return Objects.equals(record, that.record) &&
+                Objects.equals(childName, that.childName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(record, childName);
+        }
+    }
+
+    // constructors ================================================
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     */
+    public MockProcessorContext() {
+        this(
+            mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+            )),
+            new TaskId(0, 0),
+            null
+        );
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     *
+     * @param config a Properties object, used to configure the context and the processor.
+     */
+    public MockProcessorContext(final Properties config) {
+        this(config, new TaskId(0, 0), null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+     *
+     * @param config   a {@link Properties} object, used to configure the context and the processor.
+     * @param taskId   a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+     * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+     */
+    public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+        this.taskId = taskId;
+        this.config = streamsConfig;
+        this.stateDir = stateDir;
+        final MetricConfig metricConfig = new MetricConfig();
+        metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+        final String threadId = Thread.currentThread().getName();
+        metrics = new StreamsMetricsImpl(
+            new Metrics(metricConfig),
+            threadId,
+            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM
+        );
+        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
+    }
+
+    @Override
+    public String applicationId() {
+        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        final Map<String, Object> combined = new HashMap<>();
+        combined.putAll(config.originals());
+        combined.putAll(config.values());
+        return combined;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return config.defaultKeySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return config.defaultValueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return Objects.requireNonNull(
+            stateDir,
+            "The stateDir constructor argument was needed (probably for a state store) but not supplied. " +
+                "You can either reconfigure your test so that it doesn't need access to the disk " +
+                "(such as using an in-memory store), or use the full MockProcessorContext constructor to supply " +
+                "a non-null stateDir argument."
+        );
+    }
+
+    @Override
+    public StreamsMetrics metrics() {
+        return metrics;
+    }
+
+    // settable record metadata ================================================
+
+    /**
+     * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set them directly.
+     *
+     * @param topic     A topic name
+     * @param partition A partition number
+     * @param offset    A record offset
+     */
+    public void setRecordMetadata(final String topic,
+                                  final int partition,
+                                  final long offset) {
+        recordMetadata = new MockRecordMetadata(topic, partition, offset);
+    }
+
+    @Override
+    public Optional<RecordMetadata> recordMetadata() {
+        return Optional.ofNullable(recordMetadata);
+    }
+
+    // mocks ================================================
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <S extends StateStore> S getStateStore(final String name) {
+        return (S) stateStores.get(name);
+    }
+
+    public <S extends StateStore> void addStateStore(final S stateStore) {
+        stateStores.put(stateStore.name(), stateStore);
+    }
+
+    @Override
+    public Cancellable schedule(final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
+        final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(interval, type, callback);
+
+        punctuators.add(capturedPunctuator);
+
+        return capturedPunctuator::cancel;
+    }
+
+    /**
+     * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
+     *
+     * @return A list of captured punctuators.
+     */
+    public List<CapturedPunctuator> scheduledPunctuators() {
+        return new LinkedList<>(punctuators);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final Record<K, V> record, final String childName) {
+        capturedForwards.add(new CapturedForward<>(record, Optional.ofNullable(childName)));
+    }
+
+    /**
+     * Get all the forwarded data this context has observed. The returned list will not be
+     * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
+     * {@code forward(...)}.
+     *
+     * @return A list of records that were previously passed to the context.
+     */
+    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded() {
+        return new LinkedList<>(capturedForwards);
+    }
+
+    /**
+     * Get all the forwarded data this context has observed for a specific child by name.
+     * The returned list will not be affected by subsequent interactions with the context.
+     * The data in the list is in the same order as the calls to {@code forward(...)}.
+     *
+     * @param childName The child name to retrieve forwards for
+     * @return A list of records that were previously passed to the context.
+     */
+    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded(final String childName) {
+        final LinkedList<CapturedForward<? extends KForward, ? extends VForward>> result = new LinkedList<>();
+        for (final CapturedForward<? extends KForward, ? extends VForward> capture : capturedForwards) {
+            if (!capture.childName().isPresent() || capture.childName().equals(Optional.of(childName))) {
+                result.add(capture);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Clear the captured forwarded data.
+     */
+    public void resetForwards() {
+        capturedForwards.clear();
+    }
+
+    @Override
+    public void commit() {
+        committed = true;
+    }
+
+    /**
+     * Whether {@link ProcessorContext#commit()} has been called in this context.
+     *
+     * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
+     */
+    public boolean committed() {
+        return committed;
+    }
+
+    /**
+     * Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
+     */
+    public void resetCommit() {
+        committed = false;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        // This interface is assumed by state stores that add change-logging.
+        // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
+
+        throw new UnsupportedOperationException(
+            "MockProcessorContext does not provide record collection. " +
+                "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+                "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
+        );
+    }
+
+    /**
+     * Used to get a {@link StateStoreContext} for use with
+     * {@link StateStore#init(StateStoreContext, StateStore)}
+     * if you need to initialize a store for your tests.
+     * @return a {@link StateStoreContext} that delegates to this ProcessorContext.
+     */
+    public StateStoreContext getStateStoreContext() {

Review comment:
       Rather than have the MockProcessorContext implement StateStoreContext as well, I think it's a good idea to explicitly get a StateStoreContext view of the same object. This will help users to have a clearer understanding of which context is which.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
##########
@@ -31,19 +32,17 @@
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
-import java.util.HashMap;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 
+import static java.util.Arrays.asList;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThrows;
 
 public class WindowedWordCountProcessorTest {
-    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldWorkWithInMemoryStore() {
-        final MockProcessorContext context = new MockProcessorContext();
+        final MockProcessorContext<String, String> context = new MockProcessorContext<>();

Review comment:
       Converting the test for the processor to the new API as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#discussion_r503574736



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {

Review comment:
       `InternalProcessorContext` is already public interface but it's in `internals` package, so I figured it is okay?
   
   Anyways, this is not much blocking this PR, so feel free to merge it anyways and we can keep discussing here while you merge.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#issuecomment-707844037


   The test failure was unrelated:
   `Build / JDK 11 / kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#discussion_r504073110



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {

Review comment:
       Thanks. I'll go ahead with the merge, then.
   
   It seems like `InternalProcessorContext` is only "public" in the sense that Java 8 is insufficiently expressive to make it private. I guess I should say "internal" and "external". But it is certainly an _internal_ type, therefore, we should not ever return it to the users when they call _external_ APIs (such as the constructor of this class).
   
   Once we upgrade to java 9, we should add module definitions so that we stop exporting internal types in the public API completely. As I understand things, it would actually become a compiler error at that point for us to provide an external class that inherits from an internal interface.
   
   I guess I don't see this as a big concern anyway. The way I implemented the state stores in the last PR makes them capable of accepting any implementation of `ProcessorContext` or `StateStoreContext`, although they'll disable some features if the context isn't an instance of `InternalProcessorContext`. To me, this seems like a fine situation. We could also note that there is https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext, which would remove the need for casting to InternalProcessorContext to begin with.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#discussion_r502067124



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {

Review comment:
       Since we are adding a new class, could we have it extend InternalProcessorContext then, so that when we remove the old ones we can also cleanup the non-testing functions that branch on checking `instanceof InternalProcessorContext`: I'm assuming that `InternalProcessorContext` would stay in the end state, it would just extend `api.ProcessorContext<Object, Object>` and not `StateStoreContext` in the future.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata ================================================
+    private MockRecordMetadata recordMetadata;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward<? extends KForward, ? extends VForward>> capturedForwards = new LinkedList<>();
+    private boolean committed = false;
+
+    private static final class MockRecordMetadata implements RecordMetadata {
+        private final String topic;
+        private final int partition;
+        private final long offset;
+
+        private MockRecordMetadata(final String topic, final int partition, final long offset) {
+            this.topic = topic;
+            this.partition = partition;
+            this.offset = offset;
+        }
+
+        @Override
+        public String topic() {
+            return topic;
+        }
+
+        @Override
+        public int partition() {
+            return partition;
+        }
+
+        @Override
+        public long offset() {
+            return offset;
+        }
+    }
+
+    /**
+     * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+     */
+    public static final class CapturedPunctuator {
+        private final Duration interval;
+        private final PunctuationType type;
+        private final Punctuator punctuator;
+        private boolean cancelled = false;
+
+        private CapturedPunctuator(final Duration interval, final PunctuationType type, final Punctuator punctuator) {
+            this.interval = interval;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        public Duration getInterval() {
+            return interval;
+        }
+
+        public PunctuationType getType() {
+            return type;
+        }
+
+        public Punctuator getPunctuator() {
+            return punctuator;
+        }
+
+        public void cancel() {
+            cancelled = true;
+        }
+
+        public boolean cancelled() {
+            return cancelled;
+        }
+    }
+
+    public static final class CapturedForward<K, V> {
+
+        private final Record<K, V> record;
+        private final Optional<String> childName;
+
+        public CapturedForward(final Record<K, V> record) {
+            this(record, Optional.empty());
+        }
+
+        public CapturedForward(final Record<K, V> record, final Optional<String> childName) {
+            this.record = Objects.requireNonNull(record);
+            this.childName = Objects.requireNonNull(childName);
+        }
+
+        /**
+         * The child this data was forwarded to.
+         *
+         * @return If present, the child name the record was forwarded to.
+         *         If empty, the forward was a broadcast.
+         */
+        public Optional<String> childName() {
+            return childName;
+        }
+
+        /**
+         * The record that was forwarded.
+         *
+         * @return The forwarded record. Not null.
+         */
+        public Record<K, V> record() {
+            return record;
+        }
+
+        @Override
+        public String toString() {
+            return "CapturedForward{" +
+                "record=" + record +
+                ", childName=" + childName +
+                '}';
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final CapturedForward<?, ?> that = (CapturedForward<?, ?>) o;
+            return Objects.equals(record, that.record) &&
+                Objects.equals(childName, that.childName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(record, childName);
+        }
+    }
+
+    // constructors ================================================
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     */
+    public MockProcessorContext() {
+        this(
+            mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+            )),
+            new TaskId(0, 0),
+            null
+        );
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     *
+     * @param config a Properties object, used to configure the context and the processor.
+     */
+    public MockProcessorContext(final Properties config) {
+        this(config, new TaskId(0, 0), null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+     *
+     * @param config   a {@link Properties} object, used to configure the context and the processor.
+     * @param taskId   a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+     * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+     */
+    public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+        this.taskId = taskId;
+        this.config = streamsConfig;
+        this.stateDir = stateDir;
+        final MetricConfig metricConfig = new MetricConfig();
+        metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+        final String threadId = Thread.currentThread().getName();
+        metrics = new StreamsMetricsImpl(
+            new Metrics(metricConfig),
+            threadId,
+            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM
+        );
+        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
+    }
+
+    @Override
+    public String applicationId() {
+        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        final Map<String, Object> combined = new HashMap<>();
+        combined.putAll(config.originals());
+        combined.putAll(config.values());
+        return combined;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return config.defaultKeySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return config.defaultValueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return Objects.requireNonNull(
+            stateDir,
+            "The stateDir constructor argument was needed (probably for a state store) but not supplied. " +
+                "You can either reconfigure your test so that it doesn't need access to the disk " +
+                "(such as using an in-memory store), or use the full MockProcessorContext constructor to supply " +
+                "a non-null stateDir argument."
+        );
+    }
+
+    @Override
+    public StreamsMetrics metrics() {
+        return metrics;
+    }
+
+    // settable record metadata ================================================
+
+    /**
+     * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set them directly.
+     *
+     * @param topic     A topic name
+     * @param partition A partition number
+     * @param offset    A record offset
+     */
+    public void setRecordMetadata(final String topic,
+                                  final int partition,
+                                  final long offset) {
+        recordMetadata = new MockRecordMetadata(topic, partition, offset);
+    }
+
+    @Override
+    public Optional<RecordMetadata> recordMetadata() {
+        return Optional.ofNullable(recordMetadata);
+    }
+
+    // mocks ================================================
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <S extends StateStore> S getStateStore(final String name) {
+        return (S) stateStores.get(name);
+    }
+
+    public <S extends StateStore> void addStateStore(final S stateStore) {
+        stateStores.put(stateStore.name(), stateStore);
+    }
+
+    @Override
+    public Cancellable schedule(final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
+        final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(interval, type, callback);
+
+        punctuators.add(capturedPunctuator);
+
+        return capturedPunctuator::cancel;
+    }
+
+    /**
+     * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
+     *
+     * @return A list of captured punctuators.
+     */
+    public List<CapturedPunctuator> scheduledPunctuators() {
+        return new LinkedList<>(punctuators);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final Record<K, V> record, final String childName) {
+        capturedForwards.add(new CapturedForward<>(record, Optional.ofNullable(childName)));
+    }
+
+    /**
+     * Get all the forwarded data this context has observed. The returned list will not be
+     * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
+     * {@code forward(...)}.
+     *
+     * @return A list of records that were previously passed to the context.
+     */
+    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded() {
+        return new LinkedList<>(capturedForwards);
+    }
+
+    /**
+     * Get all the forwarded data this context has observed for a specific child by name.
+     * The returned list will not be affected by subsequent interactions with the context.
+     * The data in the list is in the same order as the calls to {@code forward(...)}.
+     *
+     * @param childName The child name to retrieve forwards for
+     * @return A list of records that were previously passed to the context.
+     */
+    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded(final String childName) {
+        final LinkedList<CapturedForward<? extends KForward, ? extends VForward>> result = new LinkedList<>();
+        for (final CapturedForward<? extends KForward, ? extends VForward> capture : capturedForwards) {
+            if (!capture.childName().isPresent() || capture.childName().equals(Optional.of(childName))) {
+                result.add(capture);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Clear the captured forwarded data.
+     */
+    public void resetForwards() {
+        capturedForwards.clear();
+    }
+
+    @Override
+    public void commit() {
+        committed = true;
+    }
+
+    /**
+     * Whether {@link ProcessorContext#commit()} has been called in this context.
+     *
+     * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
+     */
+    public boolean committed() {
+        return committed;
+    }
+
+    /**
+     * Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
+     */
+    public void resetCommit() {
+        committed = false;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        // This interface is assumed by state stores that add change-logging.
+        // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
+
+        throw new UnsupportedOperationException(
+            "MockProcessorContext does not provide record collection. " +
+                "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+                "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
+        );
+    }
+
+    /**
+     * Used to get a {@link StateStoreContext} for use with
+     * {@link StateStore#init(StateStoreContext, StateStore)}
+     * if you need to initialize a store for your tests.
+     * @return a {@link StateStoreContext} that delegates to this ProcessorContext.
+     */
+    public StateStoreContext getStateStoreContext() {

Review comment:
       I agree. I think in the long run MockProcessorContext could extend InternalProcessorContext, which only extends api.ProcessorContext. When that's the case we should be able to remove the non-testing code's safeguards.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
##########
@@ -136,54 +133,18 @@ public void shouldWorkWithPersistentStore() throws IOException {
             context.scheduledPunctuators().get(0).getPunctuator().punctuate(1_000L);
 
             // finally, we can verify the output.
-            final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[alpha@100/200]", "2")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[beta@100/200]", "1")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[delta@200/300]", "1")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@100/200]", "1")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@200/300]", "1")));
-            assertThat(capturedForwards.hasNext(), is(false));
+            final List<CapturedForward<? extends String, ? extends String>> capturedForwards = context.forwarded();
+            final List<CapturedForward<? extends String, ? extends String>> expected = asList(
+                new CapturedForward<>(new Record<>("[alpha@100/200]", "2", 1_000L)),
+                new CapturedForward<>(new Record<>("[beta@100/200]", "1", 1_000L)),
+                new CapturedForward<>(new Record<>("[delta@200/300]", "1", 1_000L)),
+                new CapturedForward<>(new Record<>("[gamma@100/200]", "1", 1_000L)),
+                new CapturedForward<>(new Record<>("[gamma@200/300]", "1", 1_000L))
+            );
+
+            assertThat(capturedForwards, is(expected));
         } finally {
             Utils.delete(stateDir);
         }
     }
-
-    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
-    @Test
-    public void shouldFailWithLogging() {

Review comment:
       Can we remove those two tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#discussion_r501891955



##########
File path: checkstyle/suppressions.xml
##########
@@ -194,13 +194,13 @@
               files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest).java"/>
+              files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>

Review comment:
       This test violates both measures of complexity by virtue of the way it works: the test specifically includes a lot of loops so that it can generate every combination of store and store builder configuration to verify that everything works as expected.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -162,4 +164,30 @@ public Headers headers() {
     public Record<K, V> withHeaders(final Headers headers) {
         return new Record<>(key, value, timestamp, headers);
     }
+
+    @Override
+    public String toString() {

Review comment:
       added for quality-of-life debugging

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -75,18 +75,25 @@ public String name() {
     @Deprecated
     @Override
     public void init(final ProcessorContext context, final StateStore root) {
-        this.context = (InternalProcessorContext) context;
-
-        final StreamsMetricsImpl metrics = this.context.metrics();
         final String threadId = Thread.currentThread().getName();
         final String taskName = context.taskId().toString();
-        expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
-            threadId,
-            taskName,
-            metricScope,
-            name,
-            metrics
-        );
+
+        // The provided context is not required to implement InternalProcessorContext,
+        // If it doesn't, we can't record this metric.
+        if (context instanceof InternalProcessorContext) {

Review comment:
       As a testament to `MockProcessorContextStateStoreTest`, it actually found this bug. I had overlooked this usage while making the other root stores context-implementation agnostic in the last PR.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata ================================================
+    private MockRecordMetadata recordMetadata;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward<? extends KForward, ? extends VForward>> capturedForwards = new LinkedList<>();
+    private boolean committed = false;
+
+    private static final class MockRecordMetadata implements RecordMetadata {
+        private final String topic;
+        private final int partition;
+        private final long offset;
+
+        private MockRecordMetadata(final String topic, final int partition, final long offset) {
+            this.topic = topic;
+            this.partition = partition;
+            this.offset = offset;
+        }
+
+        @Override
+        public String topic() {
+            return topic;
+        }
+
+        @Override
+        public int partition() {
+            return partition;
+        }
+
+        @Override
+        public long offset() {
+            return offset;
+        }
+    }
+
+    /**
+     * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+     */
+    public static final class CapturedPunctuator {
+        private final Duration interval;
+        private final PunctuationType type;
+        private final Punctuator punctuator;
+        private boolean cancelled = false;
+
+        private CapturedPunctuator(final Duration interval, final PunctuationType type, final Punctuator punctuator) {
+            this.interval = interval;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        public Duration getInterval() {
+            return interval;
+        }
+
+        public PunctuationType getType() {
+            return type;
+        }
+
+        public Punctuator getPunctuator() {
+            return punctuator;
+        }
+
+        public void cancel() {
+            cancelled = true;
+        }
+
+        public boolean cancelled() {
+            return cancelled;
+        }
+    }
+
+    public static final class CapturedForward<K, V> {
+
+        private final Record<K, V> record;
+        private final Optional<String> childName;
+
+        public CapturedForward(final Record<K, V> record) {
+            this(record, Optional.empty());
+        }
+
+        public CapturedForward(final Record<K, V> record, final Optional<String> childName) {
+            this.record = Objects.requireNonNull(record);
+            this.childName = Objects.requireNonNull(childName);
+        }
+
+        /**
+         * The child this data was forwarded to.
+         *
+         * @return If present, the child name the record was forwarded to.
+         *         If empty, the forward was a broadcast.
+         */
+        public Optional<String> childName() {
+            return childName;
+        }
+
+        /**
+         * The record that was forwarded.
+         *
+         * @return The forwarded record. Not null.
+         */
+        public Record<K, V> record() {
+            return record;
+        }
+
+        @Override
+        public String toString() {
+            return "CapturedForward{" +
+                "record=" + record +
+                ", childName=" + childName +
+                '}';
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final CapturedForward<?, ?> that = (CapturedForward<?, ?>) o;
+            return Objects.equals(record, that.record) &&
+                Objects.equals(childName, that.childName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(record, childName);
+        }
+    }
+
+    // constructors ================================================
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     */
+    public MockProcessorContext() {
+        this(
+            mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+            )),
+            new TaskId(0, 0),
+            null
+        );
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     *
+     * @param config a Properties object, used to configure the context and the processor.
+     */
+    public MockProcessorContext(final Properties config) {
+        this(config, new TaskId(0, 0), null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+     *
+     * @param config   a {@link Properties} object, used to configure the context and the processor.
+     * @param taskId   a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+     * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+     */
+    public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+        this.taskId = taskId;
+        this.config = streamsConfig;
+        this.stateDir = stateDir;
+        final MetricConfig metricConfig = new MetricConfig();
+        metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+        final String threadId = Thread.currentThread().getName();
+        metrics = new StreamsMetricsImpl(
+            new Metrics(metricConfig),
+            threadId,
+            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM
+        );
+        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
+    }
+
+    @Override
+    public String applicationId() {
+        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        final Map<String, Object> combined = new HashMap<>();
+        combined.putAll(config.originals());
+        combined.putAll(config.values());
+        return combined;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return config.defaultKeySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return config.defaultValueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return Objects.requireNonNull(
+            stateDir,
+            "The stateDir constructor argument was needed (probably for a state store) but not supplied. " +
+                "You can either reconfigure your test so that it doesn't need access to the disk " +
+                "(such as using an in-memory store), or use the full MockProcessorContext constructor to supply " +
+                "a non-null stateDir argument."
+        );

Review comment:
       `stateDir` isn't a required argument, but having a null value for it and then trying to use a rocksdb store leads to a _very_ cryptic error like, "you have to open all column families". Placing a null-check right here allows us to fail fast and prints an explanatory message any time you use the context with a component that actually needs the stateDir.
   
   This was also a problem with the prior implementation, but I didn't notice it until I implemented `MockProcessorContextStateStoreTest`.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorSupplier.java
##########
@@ -18,43 +18,41 @@
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 
 import java.time.Duration;
 import java.util.Locale;
 
-public final class WindowedWordCountProcessorSupplier implements ProcessorSupplier<String, String> {
+public final class WindowedWordCountProcessorSupplier implements ProcessorSupplier<String, String, String, String> {

Review comment:
       Converting this processor to the new API.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {

Review comment:
       This is a clone of the `processor.MockProcessorContext`, but implementing the new `api.ProcessorContext` interface.
   
   The two context interfaces don't technically collide, but creating a standalone implementation allows us to provide type safety in the CapturedForward inner class without any backward compatibility concerns.
   
   It also (IMO) makes it easier for users to make a clean switch to the new API by removing any ambiguity about whether a test is making use of the old API or the new API, or some mixture of both.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -162,4 +164,30 @@ public Headers headers() {
     public Record<K, V> withHeaders(final Headers headers) {
         return new Record<>(key, value, timestamp, headers);
     }
+
+    @Override
+    public String toString() {
+        return "Record{" +
+            "key=" + key +
+            ", value=" + value +
+            ", timestamp=" + timestamp +
+            ", headers=" + headers +
+            '}';
+    }
+
+    @Override
+    public boolean equals(final Object o) {

Review comment:
       added for easier assertions in unit tests

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
##########
@@ -231,14 +230,13 @@ public void process(final String key, final Long value) {
 
         assertFalse(context.committed());
     }
-    @SuppressWarnings({"deprecation", "unchecked"}) // TODO deprecation will be fixed in KAFKA-10437

Review comment:
       Don't need to suppress deprecation anymore, since it's moved to the class level, and don't need unchecked anymore thanks to the return-type change of `getStateStore`.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+public class MockProcessorContextAPITest {

Review comment:
       This is a copy of the unit test for the old MockProcessorContext.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextStateStoreTest.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.test;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(value = Parameterized.class)
+public class MockProcessorContextStateStoreTest {

Review comment:
       This is a new test. I wanted to make sure that no matter what kind of stores users select, they'll get well-defined results with the new MockProcessorContext.
   
   Some store features are well-defined not to be supported (they throw an IllegalArgumentException when you try to enable caching or logging), but all store types should otherwise work.
   
   Here, we're defining "work" as "`init` doesn't throw an exception", since this is the main involvement that the context itself has with the store.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java
##########
@@ -43,6 +43,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("deprecation") // this is a test of a deprecated API

Review comment:
       This is the pre-existing test of the old-API ProcessorContext.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/internals/KeyValueStoreFacadeTest.java
##########
@@ -53,9 +54,9 @@ public void setup() {
         keyValueStoreFacade = new KeyValueStoreFacade<>(mockedKeyValueTimestampStore);
     }
 
-    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
+    @SuppressWarnings("deprecation") // test of deprecated method

Review comment:
       My prior comment was wrong. We do want to test the deprecated init method as well as the non-deprecated one.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata ================================================
+    private MockRecordMetadata recordMetadata;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward<? extends KForward, ? extends VForward>> capturedForwards = new LinkedList<>();
+    private boolean committed = false;
+
+    private static final class MockRecordMetadata implements RecordMetadata {
+        private final String topic;
+        private final int partition;
+        private final long offset;
+
+        private MockRecordMetadata(final String topic, final int partition, final long offset) {
+            this.topic = topic;
+            this.partition = partition;
+            this.offset = offset;
+        }
+
+        @Override
+        public String topic() {
+            return topic;
+        }
+
+        @Override
+        public int partition() {
+            return partition;
+        }
+
+        @Override
+        public long offset() {
+            return offset;
+        }
+    }
+
+    /**
+     * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+     */
+    public static final class CapturedPunctuator {
+        private final Duration interval;
+        private final PunctuationType type;
+        private final Punctuator punctuator;
+        private boolean cancelled = false;
+
+        private CapturedPunctuator(final Duration interval, final PunctuationType type, final Punctuator punctuator) {
+            this.interval = interval;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        public Duration getInterval() {
+            return interval;
+        }
+
+        public PunctuationType getType() {
+            return type;
+        }
+
+        public Punctuator getPunctuator() {
+            return punctuator;
+        }
+
+        public void cancel() {
+            cancelled = true;
+        }
+
+        public boolean cancelled() {
+            return cancelled;
+        }
+    }
+
+    public static final class CapturedForward<K, V> {
+
+        private final Record<K, V> record;
+        private final Optional<String> childName;
+
+        public CapturedForward(final Record<K, V> record) {
+            this(record, Optional.empty());
+        }
+
+        public CapturedForward(final Record<K, V> record, final Optional<String> childName) {
+            this.record = Objects.requireNonNull(record);
+            this.childName = Objects.requireNonNull(childName);
+        }
+
+        /**
+         * The child this data was forwarded to.
+         *
+         * @return If present, the child name the record was forwarded to.
+         *         If empty, the forward was a broadcast.
+         */
+        public Optional<String> childName() {
+            return childName;
+        }
+
+        /**
+         * The record that was forwarded.
+         *
+         * @return The forwarded record. Not null.
+         */
+        public Record<K, V> record() {
+            return record;
+        }
+
+        @Override
+        public String toString() {
+            return "CapturedForward{" +
+                "record=" + record +
+                ", childName=" + childName +
+                '}';
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final CapturedForward<?, ?> that = (CapturedForward<?, ?>) o;
+            return Objects.equals(record, that.record) &&
+                Objects.equals(childName, that.childName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(record, childName);
+        }
+    }
+
+    // constructors ================================================
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     */
+    public MockProcessorContext() {
+        this(
+            mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+            )),
+            new TaskId(0, 0),
+            null
+        );
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     *
+     * @param config a Properties object, used to configure the context and the processor.
+     */
+    public MockProcessorContext(final Properties config) {
+        this(config, new TaskId(0, 0), null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+     *
+     * @param config   a {@link Properties} object, used to configure the context and the processor.
+     * @param taskId   a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+     * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+     */
+    public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+        this.taskId = taskId;
+        this.config = streamsConfig;
+        this.stateDir = stateDir;
+        final MetricConfig metricConfig = new MetricConfig();
+        metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+        final String threadId = Thread.currentThread().getName();
+        metrics = new StreamsMetricsImpl(
+            new Metrics(metricConfig),
+            threadId,
+            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM
+        );
+        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
+    }
+
+    @Override
+    public String applicationId() {
+        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        final Map<String, Object> combined = new HashMap<>();
+        combined.putAll(config.originals());
+        combined.putAll(config.values());
+        return combined;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return config.defaultKeySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return config.defaultValueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return Objects.requireNonNull(
+            stateDir,
+            "The stateDir constructor argument was needed (probably for a state store) but not supplied. " +
+                "You can either reconfigure your test so that it doesn't need access to the disk " +
+                "(such as using an in-memory store), or use the full MockProcessorContext constructor to supply " +
+                "a non-null stateDir argument."
+        );
+    }
+
+    @Override
+    public StreamsMetrics metrics() {
+        return metrics;
+    }
+
+    // settable record metadata ================================================
+
+    /**
+     * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set them directly.
+     *
+     * @param topic     A topic name
+     * @param partition A partition number
+     * @param offset    A record offset
+     */
+    public void setRecordMetadata(final String topic,
+                                  final int partition,
+                                  final long offset) {
+        recordMetadata = new MockRecordMetadata(topic, partition, offset);
+    }
+
+    @Override
+    public Optional<RecordMetadata> recordMetadata() {
+        return Optional.ofNullable(recordMetadata);
+    }
+
+    // mocks ================================================
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <S extends StateStore> S getStateStore(final String name) {
+        return (S) stateStores.get(name);
+    }
+
+    public <S extends StateStore> void addStateStore(final S stateStore) {
+        stateStores.put(stateStore.name(), stateStore);
+    }
+
+    @Override
+    public Cancellable schedule(final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
+        final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(interval, type, callback);
+
+        punctuators.add(capturedPunctuator);
+
+        return capturedPunctuator::cancel;
+    }
+
+    /**
+     * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
+     *
+     * @return A list of captured punctuators.
+     */
+    public List<CapturedPunctuator> scheduledPunctuators() {
+        return new LinkedList<>(punctuators);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final Record<K, V> record, final String childName) {
+        capturedForwards.add(new CapturedForward<>(record, Optional.ofNullable(childName)));
+    }
+
+    /**
+     * Get all the forwarded data this context has observed. The returned list will not be
+     * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
+     * {@code forward(...)}.
+     *
+     * @return A list of records that were previously passed to the context.
+     */
+    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded() {
+        return new LinkedList<>(capturedForwards);
+    }
+
+    /**
+     * Get all the forwarded data this context has observed for a specific child by name.
+     * The returned list will not be affected by subsequent interactions with the context.
+     * The data in the list is in the same order as the calls to {@code forward(...)}.
+     *
+     * @param childName The child name to retrieve forwards for
+     * @return A list of records that were previously passed to the context.
+     */
+    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded(final String childName) {
+        final LinkedList<CapturedForward<? extends KForward, ? extends VForward>> result = new LinkedList<>();
+        for (final CapturedForward<? extends KForward, ? extends VForward> capture : capturedForwards) {
+            if (!capture.childName().isPresent() || capture.childName().equals(Optional.of(childName))) {
+                result.add(capture);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Clear the captured forwarded data.
+     */
+    public void resetForwards() {
+        capturedForwards.clear();
+    }
+
+    @Override
+    public void commit() {
+        committed = true;
+    }
+
+    /**
+     * Whether {@link ProcessorContext#commit()} has been called in this context.
+     *
+     * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
+     */
+    public boolean committed() {
+        return committed;
+    }
+
+    /**
+     * Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
+     */
+    public void resetCommit() {
+        committed = false;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        // This interface is assumed by state stores that add change-logging.
+        // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
+
+        throw new UnsupportedOperationException(
+            "MockProcessorContext does not provide record collection. " +
+                "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+                "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
+        );
+    }
+
+    /**
+     * Used to get a {@link StateStoreContext} for use with
+     * {@link StateStore#init(StateStoreContext, StateStore)}
+     * if you need to initialize a store for your tests.
+     * @return a {@link StateStoreContext} that delegates to this ProcessorContext.
+     */
+    public StateStoreContext getStateStoreContext() {

Review comment:
       Rather than have the MockProcessorContext implement StateStoreContext as well, I think it's a good idea to explicitly get a StateStoreContext view of the same object. This will help users to have a clearer understanding of which context is which.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
##########
@@ -31,19 +32,17 @@
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
-import java.util.HashMap;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 
+import static java.util.Arrays.asList;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThrows;
 
 public class WindowedWordCountProcessorTest {
-    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void shouldWorkWithInMemoryStore() {
-        final MockProcessorContext context = new MockProcessorContext();
+        final MockProcessorContext<String, String> context = new MockProcessorContext<>();

Review comment:
       Converting the test for the processor to the new API as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#discussion_r502737231



##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
##########
@@ -136,54 +133,18 @@ public void shouldWorkWithPersistentStore() throws IOException {
             context.scheduledPunctuators().get(0).getPunctuator().punctuate(1_000L);
 
             // finally, we can verify the output.
-            final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[alpha@100/200]", "2")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[beta@100/200]", "1")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[delta@200/300]", "1")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@100/200]", "1")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@200/300]", "1")));
-            assertThat(capturedForwards.hasNext(), is(false));
+            final List<CapturedForward<? extends String, ? extends String>> capturedForwards = context.forwarded();
+            final List<CapturedForward<? extends String, ? extends String>> expected = asList(
+                new CapturedForward<>(new Record<>("[alpha@100/200]", "2", 1_000L)),
+                new CapturedForward<>(new Record<>("[beta@100/200]", "1", 1_000L)),
+                new CapturedForward<>(new Record<>("[delta@200/300]", "1", 1_000L)),
+                new CapturedForward<>(new Record<>("[gamma@100/200]", "1", 1_000L)),
+                new CapturedForward<>(new Record<>("[gamma@200/300]", "1", 1_000L))
+            );
+
+            assertThat(capturedForwards, is(expected));
         } finally {
             Utils.delete(stateDir);
         }
     }
-
-    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
-    @Test
-    public void shouldFailWithLogging() {

Review comment:
       Yes, they are superceded by the new `MockProcessorContextStateStoreTest`, which verifies that any state store configured with logging or caching is rejected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9396:
URL: https://github.com/apache/kafka/pull/9396


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#issuecomment-707874181


   Cherry-picked to 2.7 (cc @bbejeck )


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#discussion_r502067124



##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {

Review comment:
       Since we are adding a new class, could we have it extend InternalProcessorContext then, so that when we remove the old ones we can also cleanup the non-testing functions that branch on checking `instanceof InternalProcessorContext`: I'm assuming that `InternalProcessorContext` would stay in the end state, it would just extend `api.ProcessorContext<Object, Object>` and not `StateStoreContext` in the future.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * <p>
+ * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * <p>
+ * Note that this class does not take any automated actions (such as firing scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
+    // Immutable fields ================================================
+    private final StreamsMetricsImpl metrics;
+    private final TaskId taskId;
+    private final StreamsConfig config;
+    private final File stateDir;
+
+    // settable record metadata ================================================
+    private MockRecordMetadata recordMetadata;
+
+    // mocks ================================================
+    private final Map<String, StateStore> stateStores = new HashMap<>();
+    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
+    private final List<CapturedForward<? extends KForward, ? extends VForward>> capturedForwards = new LinkedList<>();
+    private boolean committed = false;
+
+    private static final class MockRecordMetadata implements RecordMetadata {
+        private final String topic;
+        private final int partition;
+        private final long offset;
+
+        private MockRecordMetadata(final String topic, final int partition, final long offset) {
+            this.topic = topic;
+            this.partition = partition;
+            this.offset = offset;
+        }
+
+        @Override
+        public String topic() {
+            return topic;
+        }
+
+        @Override
+        public int partition() {
+            return partition;
+        }
+
+        @Override
+        public long offset() {
+            return offset;
+        }
+    }
+
+    /**
+     * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information.
+     */
+    public static final class CapturedPunctuator {
+        private final Duration interval;
+        private final PunctuationType type;
+        private final Punctuator punctuator;
+        private boolean cancelled = false;
+
+        private CapturedPunctuator(final Duration interval, final PunctuationType type, final Punctuator punctuator) {
+            this.interval = interval;
+            this.type = type;
+            this.punctuator = punctuator;
+        }
+
+        public Duration getInterval() {
+            return interval;
+        }
+
+        public PunctuationType getType() {
+            return type;
+        }
+
+        public Punctuator getPunctuator() {
+            return punctuator;
+        }
+
+        public void cancel() {
+            cancelled = true;
+        }
+
+        public boolean cancelled() {
+            return cancelled;
+        }
+    }
+
+    public static final class CapturedForward<K, V> {
+
+        private final Record<K, V> record;
+        private final Optional<String> childName;
+
+        public CapturedForward(final Record<K, V> record) {
+            this(record, Optional.empty());
+        }
+
+        public CapturedForward(final Record<K, V> record, final Optional<String> childName) {
+            this.record = Objects.requireNonNull(record);
+            this.childName = Objects.requireNonNull(childName);
+        }
+
+        /**
+         * The child this data was forwarded to.
+         *
+         * @return If present, the child name the record was forwarded to.
+         *         If empty, the forward was a broadcast.
+         */
+        public Optional<String> childName() {
+            return childName;
+        }
+
+        /**
+         * The record that was forwarded.
+         *
+         * @return The forwarded record. Not null.
+         */
+        public Record<K, V> record() {
+            return record;
+        }
+
+        @Override
+        public String toString() {
+            return "CapturedForward{" +
+                "record=" + record +
+                ", childName=" + childName +
+                '}';
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            final CapturedForward<?, ?> that = (CapturedForward<?, ?>) o;
+            return Objects.equals(record, that.record) &&
+                Objects.equals(childName, that.childName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(record, childName);
+        }
+    }
+
+    // constructors ================================================
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     */
+    public MockProcessorContext() {
+        this(
+            mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
+            )),
+            new TaskId(0, 0),
+            null
+        );
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}.
+     * Most unit tests using this mock won't need to know the taskId,
+     * and most unit tests should be able to get by with the
+     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
+     *
+     * @param config a Properties object, used to configure the context and the processor.
+     */
+    public MockProcessorContext(final Properties config) {
+        this(config, new TaskId(0, 0), null);
+    }
+
+    /**
+     * Create a {@link MockProcessorContext} with a specified taskId and null stateDir.
+     *
+     * @param config   a {@link Properties} object, used to configure the context and the processor.
+     * @param taskId   a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}.
+     * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}.
+     */
+    public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
+        final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
+        this.taskId = taskId;
+        this.config = streamsConfig;
+        this.stateDir = stateDir;
+        final MetricConfig metricConfig = new MetricConfig();
+        metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
+        final String threadId = Thread.currentThread().getName();
+        metrics = new StreamsMetricsImpl(
+            new Metrics(metricConfig),
+            threadId,
+            streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+            Time.SYSTEM
+        );
+        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics);
+    }
+
+    @Override
+    public String applicationId() {
+        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+    }
+
+    @Override
+    public TaskId taskId() {
+        return taskId;
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        final Map<String, Object> combined = new HashMap<>();
+        combined.putAll(config.originals());
+        combined.putAll(config.values());
+        return combined;
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return config.originalsWithPrefix(prefix);
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return config.defaultKeySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return config.defaultValueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return Objects.requireNonNull(
+            stateDir,
+            "The stateDir constructor argument was needed (probably for a state store) but not supplied. " +
+                "You can either reconfigure your test so that it doesn't need access to the disk " +
+                "(such as using an in-memory store), or use the full MockProcessorContext constructor to supply " +
+                "a non-null stateDir argument."
+        );
+    }
+
+    @Override
+    public StreamsMetrics metrics() {
+        return metrics;
+    }
+
+    // settable record metadata ================================================
+
+    /**
+     * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework,
+     * but for the purpose of driving unit tests, you can set them directly.
+     *
+     * @param topic     A topic name
+     * @param partition A partition number
+     * @param offset    A record offset
+     */
+    public void setRecordMetadata(final String topic,
+                                  final int partition,
+                                  final long offset) {
+        recordMetadata = new MockRecordMetadata(topic, partition, offset);
+    }
+
+    @Override
+    public Optional<RecordMetadata> recordMetadata() {
+        return Optional.ofNullable(recordMetadata);
+    }
+
+    // mocks ================================================
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <S extends StateStore> S getStateStore(final String name) {
+        return (S) stateStores.get(name);
+    }
+
+    public <S extends StateStore> void addStateStore(final S stateStore) {
+        stateStores.put(stateStore.name(), stateStore);
+    }
+
+    @Override
+    public Cancellable schedule(final Duration interval,
+                                final PunctuationType type,
+                                final Punctuator callback) {
+        final CapturedPunctuator capturedPunctuator = new CapturedPunctuator(interval, type, callback);
+
+        punctuators.add(capturedPunctuator);
+
+        return capturedPunctuator::cancel;
+    }
+
+    /**
+     * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}.
+     *
+     * @return A list of captured punctuators.
+     */
+    public List<CapturedPunctuator> scheduledPunctuators() {
+        return new LinkedList<>(punctuators);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final Record<K, V> record) {
+        forward(record, null);
+    }
+
+    @Override
+    public <K extends KForward, V extends VForward> void forward(final Record<K, V> record, final String childName) {
+        capturedForwards.add(new CapturedForward<>(record, Optional.ofNullable(childName)));
+    }
+
+    /**
+     * Get all the forwarded data this context has observed. The returned list will not be
+     * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to
+     * {@code forward(...)}.
+     *
+     * @return A list of records that were previously passed to the context.
+     */
+    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded() {
+        return new LinkedList<>(capturedForwards);
+    }
+
+    /**
+     * Get all the forwarded data this context has observed for a specific child by name.
+     * The returned list will not be affected by subsequent interactions with the context.
+     * The data in the list is in the same order as the calls to {@code forward(...)}.
+     *
+     * @param childName The child name to retrieve forwards for
+     * @return A list of records that were previously passed to the context.
+     */
+    public List<CapturedForward<? extends KForward, ? extends VForward>> forwarded(final String childName) {
+        final LinkedList<CapturedForward<? extends KForward, ? extends VForward>> result = new LinkedList<>();
+        for (final CapturedForward<? extends KForward, ? extends VForward> capture : capturedForwards) {
+            if (!capture.childName().isPresent() || capture.childName().equals(Optional.of(childName))) {
+                result.add(capture);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Clear the captured forwarded data.
+     */
+    public void resetForwards() {
+        capturedForwards.clear();
+    }
+
+    @Override
+    public void commit() {
+        committed = true;
+    }
+
+    /**
+     * Whether {@link ProcessorContext#commit()} has been called in this context.
+     *
+     * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset.
+     */
+    public boolean committed() {
+        return committed;
+    }
+
+    /**
+     * Reset the commit capture to {@code false} (whether or not it was previously {@code true}).
+     */
+    public void resetCommit() {
+        committed = false;
+    }
+
+    @Override
+    public RecordCollector recordCollector() {
+        // This interface is assumed by state stores that add change-logging.
+        // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception.
+
+        throw new UnsupportedOperationException(
+            "MockProcessorContext does not provide record collection. " +
+                "For processor unit tests, use an in-memory state store with change-logging disabled. " +
+                "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration."
+        );
+    }
+
+    /**
+     * Used to get a {@link StateStoreContext} for use with
+     * {@link StateStore#init(StateStoreContext, StateStore)}
+     * if you need to initialize a store for your tests.
+     * @return a {@link StateStoreContext} that delegates to this ProcessorContext.
+     */
+    public StateStoreContext getStateStoreContext() {

Review comment:
       I agree. I think in the long run MockProcessorContext could extend InternalProcessorContext, which only extends api.ProcessorContext. When that's the case we should be able to remove the non-testing code's safeguards.

##########
File path: streams/test-utils/src/test/java/org/apache/kafka/streams/test/wordcount/WindowedWordCountProcessorTest.java
##########
@@ -136,54 +133,18 @@ public void shouldWorkWithPersistentStore() throws IOException {
             context.scheduledPunctuators().get(0).getPunctuator().punctuate(1_000L);
 
             // finally, we can verify the output.
-            final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator();
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[alpha@100/200]", "2")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[beta@100/200]", "1")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[delta@200/300]", "1")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@100/200]", "1")));
-            assertThat(capturedForwards.next().keyValue(), is(new KeyValue<>("[gamma@200/300]", "1")));
-            assertThat(capturedForwards.hasNext(), is(false));
+            final List<CapturedForward<? extends String, ? extends String>> capturedForwards = context.forwarded();
+            final List<CapturedForward<? extends String, ? extends String>> expected = asList(
+                new CapturedForward<>(new Record<>("[alpha@100/200]", "2", 1_000L)),
+                new CapturedForward<>(new Record<>("[beta@100/200]", "1", 1_000L)),
+                new CapturedForward<>(new Record<>("[delta@200/300]", "1", 1_000L)),
+                new CapturedForward<>(new Record<>("[gamma@100/200]", "1", 1_000L)),
+                new CapturedForward<>(new Record<>("[gamma@200/300]", "1", 1_000L))
+            );
+
+            assertThat(capturedForwards, is(expected));
         } finally {
             Utils.delete(stateDir);
         }
     }
-
-    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
-    @Test
-    public void shouldFailWithLogging() {

Review comment:
       Can we remove those two tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org