You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/28 20:55:35 UTC

[GitHub] [kafka] vvcephei opened a new pull request #9346: KAFKA-8410: Split ProcessorContext into Processor/StateStore/RecordContext

vvcephei opened a new pull request #9346:
URL: https://github.com/apache/kafka/pull/9346


   Split up the ProcessorContext into separate containers more appropriate
   for usage in `Processor#init` (`api.ProcessorContext`), 
   `StateStore#init` (`StateStoreContext`), and `Processor#process`
   (the new `Record` and `RecordMetadata` classes).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei edited a comment on pull request #9346: KAFKA-10535: Split ProcessorContext into Processor/StateStore/RecordContext

Posted by GitBox <gi...@apache.org>.
vvcephei edited a comment on pull request #9346:
URL: https://github.com/apache/kafka/pull/9346#issuecomment-702209092


   Closing POC in favor of new PR: https://github.com/apache/kafka/pull/9361


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei closed pull request #9346: KAFKA-10535: Split ProcessorContext into Processor/StateStore/RecordContext

Posted by GitBox <gi...@apache.org>.
vvcephei closed pull request #9346:
URL: https://github.com/apache/kafka/pull/9346


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9346: KAFKA-8410: Split ProcessorContext into Processor/StateStore/RecordContext

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9346:
URL: https://github.com/apache/kafka/pull/9346#discussion_r496227502



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -90,7 +90,7 @@ void register(final StateStore store,
      * @param name The store name
      * @return The state store instance
      */
-    StateStore getStateStore(final String name);
+    <S extends StateStore> S getStateStore(final String name);

Review comment:
       This is a backward-compatible change. It's necessary if we want to capitalize on the opportunity to implement all the new interfaces with a common ProcessorContextImpl (which I think we do).

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * State store context interface.
+ */
+public interface StateStoreContext {

Review comment:
       The new StateStoreContext. It contains only the parts of the ProcessorContext that should be appropriate within a StateStore.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    public Record(final K key, final V value, final long timestamp, final Headers headers) {

Review comment:
       The new data object.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+public interface RecordMetadata {

Review comment:
       Metadata is an interface rather than a data class, since it'll never be constructed by users. Note that this also enables us to just re-use the ProcessorContextImpl as the implementation for this.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +125,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);
 
     /**
-     * Forwards a key/value pair to the specified downstream processors.
-     * Can be used to set the timestamp of the output record.
+     * Forwards a record to the specified child processor.
      *
-     * @param key key
-     * @param value value
-     * @param to the options to use when forwarding
+     * @param record The record to forward
+     * @param childName The name of the child processor to receive the record
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record, final String childName);

Review comment:
       Here are the symmetrical `forward` calls. Note, I dropped `To` from the API, since the timestamp will now flow through `Record`, and the only other piece of information was the child name vs. all. If desired, we could add a new "To-esque" object instead of just a String child name (if we think we may want to extend it in the future).

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    public Record(final K key, final V value, final long timestamp, final Headers headers) {
+        this.key = key;
+        this.value = value;
+        this.timestamp = timestamp;
+        this.headers = StreamsHeaders.fromHeaders(headers);
+    }
+
+    public Record(final K key, final V value, final long timestamp) {
+        this.key = key;
+        this.value = value;
+        this.timestamp = timestamp;
+        this.headers = StreamsHeaders.emptyHeaders();
+    }
+
+    public K key() {
+        return key;
+    }
+
+    public V value() {
+        return value;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    public Headers headers() {
+        return headers;
+    }
+
+    public <NewK> Record<NewK, V> withKey(final NewK key) {
+        return new Record<>(key, value, timestamp, headers);
+    }

Review comment:
       Note, each of these methods returns a shallow copy class, so this class itself is immutable (though the values may be mutable).

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
##########
@@ -46,12 +47,12 @@
     default void init(final ProcessorContext<KOut, VOut> context) {}
 
     /**
-     * Process the record with the given key and value.
+     * Process the record. Note that record metadata is undefined in cases such as a forward call from a punctuator.
      *
-     * @param key the key for the record
-     * @param value the value for the record
+     * @param record the record to process
+     * @param recordMetadata the metadata of the record, if it is defined
      */
-    void process(KIn key, VIn value);
+    void process(Record<KIn, VIn> record, Optional<RecordMetadata> recordMetadata);

Review comment:
       Here's what the updated `Processor` API looks like. The "data" fields are in the `Record` (note that `ProcessorContext#forward` also accepts this class), and the "context" fields are in `RecordMetadata`, which is Optional, since it's undefined sometimes (like during a punctuation). 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -61,7 +84,9 @@
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      */
-    void init(ProcessorContext context, StateStore root);
+    default void init(final StateStoreContext context, final StateStore root) {
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }

Review comment:
       Here's where the newly proposed StateStoreContext links in.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/api/StreamsHeaders.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.stream.Collectors;
+
+public class StreamsHeaders implements Headers {

Review comment:
       This is an immutable Headers implementation. Whenever you add or remove a header, you actually get a new, independent object. This is the real, fundamental solution to the problem we patched in https://github.com/apache/kafka/pull/8181

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -147,7 +148,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce
             globalStoreNames.add(stateStore.name());
             final String sourceTopic = storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
+            stateStore.init((StateStoreContext) globalProcessorContext, stateStore);

Review comment:
       When you see this, it's because the concrete context type implements both StateStoreContext and ProcessorContext, and we have `StateStore#init` overloads that take both, so I had to select the method we want to invoke.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -156,9 +157,10 @@ public long offset() {
     @Override
     public Headers headers() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as headers() should only be called while a record is processed");
+            return StreamsHeaders.emptyHeaders();

Review comment:
       I did some hacking around in this PR to make things work reasonably. I'm not sure I always made exactly the right call, but the main point was to prototype the proposed public API, which seems to work nicely.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
##########
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.RecordContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-import org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
-
-/**
- * For internal use so we can update the {@link RecordContext} and current
- * {@link ProcessorNode} when we are forwarding items that have been evicted or flushed from
- * {@link ThreadCache}
- */
-public interface InternalApiProcessorContext<KForward, VForward> extends ProcessorContext<KForward, VForward> {

Review comment:
       Because the updated new API no longer clashes with the old one, we don't need a separate implementation anymore.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
##########
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextAdapter<KForward, VForward>
-    implements ProcessorContext<KForward, VForward>, InternalApiProcessorContext<KForward, VForward> {

Review comment:
       Because the updated new API no longer clashes with the old one, we also don't need these adapters anymore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9346: KAFKA-10535: Split ProcessorContext into Processor/StateStore/RecordContext

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9346:
URL: https://github.com/apache/kafka/pull/9346#issuecomment-702209092


   Closing POC in favor of new PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org