You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/15 16:22:09 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby

vvcephei commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r425900580



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
##########
@@ -19,13 +19,18 @@
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
 import java.util.Map;
 
 public interface RecordCollector {
 
+    BytesSerializer BYTES_KEY_SERIALIZER = new BytesSerializer();
+    ByteArraySerializer BYTE_ARRAY_VALUE_SERIALIZER = new ByteArraySerializer();
+

Review comment:
       It doesn't seem like these need to be defined here, since they're only used outside of this interface.
   
   They actually only have two, independent, usages, and it doesn't seem that important to de-duplicate the instances. Can we just copy them to separate constants in the classes that need them?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -218,4 +230,16 @@ public void initialize() {
     public void uninitialize() {
         initialized = false;
     }
+
+    @Override
+    public TaskType taskType() {
+        return stateManager.taskType();
+    }

Review comment:
       Looks like this doesn't need to be defaulted here. If the logic doesn't apply to all the implementing classes, it's better not to define it in the abstract class.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
##########
@@ -116,4 +119,20 @@ public void register(final StateStore store, final StateRestoreCallback stateRes
     public StateRestoreCallback stateRestoreCallback(final String storeName) {
         return restoreCallbacks.get(storeName);
     }
+
+    @Override
+    public TaskType taskType() {
+        return taskType;
+    }
+
+    public void setTaskType(final TaskType newType) {
+        taskType = newType;
+    }

Review comment:
       Looks like this is unused.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
##########
@@ -113,24 +114,26 @@ public void shouldDelegateToUnderlyingStoreWhenFetchingRange() {
     @SuppressWarnings("deprecation")
     public void shouldRetainDuplicatesWhenSet() {
         store = new ChangeLoggingWindowBytesStore(inner, true);
+
         inner.put(bytesKey, value, 0);
         EasyMock.expectLastCall().times(2);
 
         init();
-        store.put(bytesKey, value);
-        store.put(bytesKey, value);
 
         final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
         final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
-        assertThat(collector.collected().size(), equalTo(2));
-        assertThat(collector.collected().get(0).key(), equalTo(key1));
-        assertThat(collector.collected().get(0).value(), equalTo(value));
-        assertThat(collector.collected().get(0).timestamp(), equalTo(0L));
-        assertThat(collector.collected().get(1).key(), equalTo(key2));
-        assertThat(collector.collected().get(1).value(), equalTo(value));
-        assertThat(collector.collected().get(1).timestamp(), equalTo(0L));
 
-        EasyMock.verify(inner);
+        EasyMock.reset(context);
+        EasyMock.expect(context.timestamp()).andStubReturn(0L);
+        context.logChange(store.name(), key1, value, 0L);
+        context.logChange(store.name(), key2, value, 0L);

Review comment:
       We don't need `expectLastCall()` on these (and everywhere else)?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -218,4 +230,16 @@ public void initialize() {
     public void uninitialize() {
         initialized = false;
     }
+
+    @Override
+    public TaskType taskType() {
+        return stateManager.taskType();
+    }
+
+    void throwUnsupportedOperationExceptionIfStandby(final String operationName) {
+        if (taskType() == TaskType.STANDBY) {
+            throw new UnsupportedOperationException(
+                "this should not happen: " + operationName + "() is not supported in standby tasks.");
+        }
+    }

Review comment:
       It seems like an abstraction error to have something like this in an abstract class. Much better to just move all the implementations that need it to the concrete classes.
   
   For example, it's unclear whether the logic that's protected by this method should include global tasks or not. I.e., was it intended to "throw if not Active" (and we just forgot that there are also global tasks), or "throw if not Active or Global"? I'm not asking you to answer this question; I'm pointing out that putting this in the abstract class makes the code ambiguous. Even if the code is all correct right now, it's dangerous for maintenence because it would be easy to make the mistake of forgetting about global tasks at any point in the future and introducing a bug.
   
   OTOH, if all this logic gets pushed into the implementations, then the ProcessorContextImpl can assert that it only gets Active or Standby, and it can safely use this method, while the GlobalContext can take care of itself.

##########
File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##########
@@ -75,6 +75,7 @@
     private final List<CapturedForward> capturedForwards = new LinkedList<>();
     private boolean committed = false;
 
+

Review comment:
       Not sure about this change ;) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org