You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/14 22:31:40 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #8669: MINOR: consolidate processor context for active/standby

ableegoldman commented on a change in pull request #8669:
URL: https://github.com/apache/kafka/pull/8669#discussion_r425467074



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -198,433 +196,51 @@ public StateStore getStateStore(final String name) {
 
     @Override
     public void commit() {
-        task.requestCommit();
+        throwUnsupportedOperationExceptionIfStandby("commit");
+        applyStreamTaskOperation(StreamTask::requestCommit);
     }
 
     @Override
     @Deprecated
     public Cancellable schedule(final long intervalMs,
                                 final PunctuationType type,
                                 final Punctuator callback) {
+        throwUnsupportedOperationExceptionIfStandby("schedule");
         if (intervalMs < 1) {
             throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
         }
-        return task.schedule(intervalMs, type, callback);
+        return returnStreamTaskOperation(t -> t.schedule(intervalMs, type, callback));
     }
 
     @SuppressWarnings("deprecation") // removing #schedule(final long intervalMs,...) will fix this
     @Override
     public Cancellable schedule(final Duration interval,
                                 final PunctuationType type,
                                 final Punctuator callback) throws IllegalArgumentException {
+        throwUnsupportedOperationExceptionIfStandby("schedule");
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(interval, "interval");
         return schedule(ApiUtils.validateMillisecondDuration(interval, msgPrefix), type, callback);
     }
 
-    private abstract static class StateStoreReadOnlyDecorator<T extends StateStore, K, V>

Review comment:
       I felt these were just cluttering up this class so I moved them to a new file




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org