You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/07 03:23:52 UTC

[GitHub] [kafka] vvcephei opened a new pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

vvcephei opened a new pull request #9388:
URL: https://github.com/apache/kafka/pull/9388


   * all wrapping stores should pass StateStoreContext init through to the same
     method on the wrapped store and not translate it to ProcessorContext init
   * base-level stores should handle StateStoreContext init so that callers passing
     a non-InternalProcessorContext implementation will be able to initialize the store
   * extra tests are added to verify the desired behavior
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r501432967



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -83,14 +85,40 @@
         this.valueSerde = valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
         taskId = context.taskId().toString();
         initStoreSerde(context);
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+        registerMetrics();
+        final Sensor restoreSensor =
+            StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+
+        // register and possibly restore the state from the logs
+        maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
+    }
+
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+        taskId = context.taskId().toString();
+        initStoreSerde(context);
+        streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+        registerMetrics();
+        final Sensor restoreSensor =

Review comment:
       huh. I'll double-check and take it out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r501432866



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -29,7 +29,9 @@
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;

Review comment:
       Oh, probably just overlooked it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#issuecomment-705316202


   Thanks for the review, @guozhangwang ! I'll go ahead and merge this, and deal with the couple of cleanup issues in a follow-on PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r500715702



##########
File path: streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
##########
@@ -34,6 +34,7 @@
  * Demonstrate the use of {@link MockProcessorContext} for testing the {@link Processor} in the {@link WordCountProcessorDemo}.
  */
 public class WordCountProcessorTest {
+    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437

Review comment:
       This ticket needs to go in to 2.7.0 also, but I split it out for reviewability.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
##########
@@ -45,12 +46,19 @@ public void flush() {
         throw new UnsupportedOperationException(ERROR_MESSAGE);
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
         throw new UnsupportedOperationException(ERROR_MESSAGE);
     }
 
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        throw new UnsupportedOperationException(ERROR_MESSAGE);

Review comment:
       There are going to be a lot of duplicated init methods. It's not great, but hopefully we can drop the old API before too long.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##########
@@ -47,9 +48,42 @@ public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context)
         return (StreamsMetricsImpl) context.metrics();
     }
 
+    /**
+     * Should be removed as part of KAFKA-10217
+     */
+    public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) {
+        return (StreamsMetricsImpl) context.metrics();
+    }
+
     public static String changelogFor(final ProcessorContext context, final String storeName) {
         return context instanceof InternalProcessorContext
             ? ((InternalProcessorContext) context).changelogFor(storeName)
             : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
     }
+
+    public static String changelogFor(final StateStoreContext context, final String storeName) {
+        return context instanceof InternalProcessorContext
+            ? ((InternalProcessorContext) context).changelogFor(storeName)
+            : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
+    }
+
+    public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) {
+        if (context instanceof InternalProcessorContext) {
+            return (InternalProcessorContext) context;
+        } else {
+            throw new IllegalArgumentException(
+                "This component requires internal features of Kafka Streams and must be disabled for unit tests."
+            );
+        }
+    }

Review comment:
       I replaced a lot of casts with this checked-cast method, which also lets us get rid of a lot of similar cast-checking blocks, which were inconsistently applied.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -83,14 +85,40 @@
         this.valueSerde = valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
         taskId = context.taskId().toString();
         initStoreSerde(context);
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+        registerMetrics();

Review comment:
       I wasn't able to extract out quite as much common code in the Metered implementations because they need to work regardless of whether the context is an InternalProcessorContext or whether it's a straight mock (for unit tests).

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -65,7 +65,11 @@
      *
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
+     * @deprecated Since 2.7.0. Callers should invoke {@link this#init(StateStoreContext, StateStore)} instead.
+     *             Implementers may choose to implement this method for backward compatibility or to throw an
+     *             informative exception instead.
      */
+    @Deprecated

Review comment:
       Adding the deprecation tag right now lets us be sure we encountered all places this method appears in the codebase.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -225,6 +225,7 @@ public String name() {
         return name;
     }
 
+    @Deprecated

Review comment:
       There are a handful of these also, just passing the deprecation on to the callers.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
##########
@@ -45,10 +46,9 @@ public int compareTo(final KeyValueSegment segment) {
     }
 
     @Override
-    public void openDB(final ProcessorContext context) {
-        super.openDB(context);
+    public void openDB(final Map<String, Object> configs, final File stateDir) {
+        super.openDB(configs, stateDir);

Review comment:
       I was able to remove the type-dependency of the context by re-specifying the interface in terms of the only two properties it needed.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##########
@@ -47,9 +48,42 @@ public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context)
         return (StreamsMetricsImpl) context.metrics();
     }
 
+    /**
+     * Should be removed as part of KAFKA-10217
+     */
+    public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) {
+        return (StreamsMetricsImpl) context.metrics();
+    }

Review comment:
       Added a bunch of duplicated extractors here to help keep the implementation classes clean.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -122,7 +122,7 @@
     }
 
     @SuppressWarnings("unchecked")
-    void openDB(final ProcessorContext context) {
+    void openDB(final Map<String, Object> configs, final File stateDir) {

Review comment:
       Here's the interface change that saved us from needing two openDB methods.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
##########
@@ -100,6 +100,31 @@ public void after() {
         return store;
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void shouldDelegateDeprecatedInit() {

Review comment:
       These are the new unit tests I added to make sure that all the store builder wrappers transmit init calls correctly. They are frustratingly similar, but not exactly the same across different test classes because the test classes follow different idioms.
   
   I think it'd be nice to follow up with a general store-verification test that's parameterized by the exact store types so we can specify this test logic once and apply it to all the stores. That would also be handy for most of the rest of these tests. But I don't think we need to distract this PR with that concern.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
##########
@@ -147,7 +147,7 @@ public void shouldNotAllowToSchedulePunctuations() {
     public void shouldNotAllowInitForKeyValueStore() {
         final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
         try {
-            store.init((ProcessorContext) null, null);
+            store.init((StateStoreContext) null, null);

Review comment:
       This was actually a bug before, which this PR fixes: the wrapping layers should transmit the init call straight down, rather than translating it. There are a whole set of new unit tests making sure that this works properly for both the new and old init methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r501433923



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -83,14 +85,40 @@
         this.valueSerde = valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
         taskId = context.taskId().toString();
         initStoreSerde(context);
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+        registerMetrics();
+        final Sensor restoreSensor =
+            StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+
+        // register and possibly restore the state from the logs
+        maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
+    }
+
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+        taskId = context.taskId().toString();
+        initStoreSerde(context);
+        streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+        registerMetrics();
+        final Sensor restoreSensor =

Review comment:
       Actually, I'll do that in a quick follow-up PR, so I can go ahead and merge this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r501435613



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -122,7 +158,7 @@ public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listene
     @Override
     public void put(final K key,
                     final V value) {
-        put(key, value, context.timestamp());
+        put(key, value, context != null ? context.timestamp() : 0L);

Review comment:
       Yeah, it's not something I normally like to do, either. In this case, though, it's necessary. The thing is that all our internal StateStoreContexts are InternalProcessorContext implementations, and therefore, they are also ProcessorContext implementations, so they have a `timestamp()` method.
   
   The thing that makes this unavoidable is that it's ok for users to `init` a state store using the MockProcessorContext we provide for them in `test-utils`. This is a bit of a bleed-over from the _next_ pr, which I'm still finishing up, but it's better if we keep their context "pure". I.e., I'm going to propose to add a new context that's _just_ an `api.ProcessorContext` and a separate implementation that _just_ a `StateStoreContext`. We should discuss on that PR whether that's really the best way to present it, but if you ultimately agree, then it means we have to expect a null context here.
   
   Note that the only functionality it affects is the recording of metrics that probably don't matter in unit tests and this stub behavior for a deprecated method that people shouldn't be using.
   
   If after reviewing the next PR, we do wind up converging the implementations, I'll come back and undo these checks here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9388:
URL: https://github.com/apache/kafka/pull/9388


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#discussion_r501347228



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -83,14 +85,40 @@
         this.valueSerde = valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
         taskId = context.taskId().toString();
         initStoreSerde(context);
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+        registerMetrics();
+        final Sensor restoreSensor =
+            StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
+
+        // register and possibly restore the state from the logs
+        maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
+    }
+
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+        taskId = context.taskId().toString();
+        initStoreSerde(context);
+        streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+        registerMetrics();
+        final Sensor restoreSensor =

Review comment:
       Actually I think we should remove the `restoreSensor` since we no longer restore the state upon init any more? In KIP-444 we no longer have it as a state-store level metric.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -122,7 +158,7 @@ public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V> listene
     @Override
     public void put(final K key,
                     final V value) {
-        put(key, value, context.timestamp());
+        put(key, value, context != null ? context.timestamp() : 0L);

Review comment:
       Thinking about this a bit more: is `this.context` only null in unit tests? It seems a bit overkill to let non-testing code to cope with testing code if yes..
   
   Could we let the mock class to extend from InternalProcessorContext as well?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##########
@@ -68,26 +70,47 @@
         this.time = time;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
         initStoreSerde(context);
         taskId = context.taskId().toString();
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
-        putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-        fetchSensor = StateStoreMetrics.fetchSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-        flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-        removeSensor = StateStoreMetrics.removeSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
-        e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId, metricsScope, name(), streamsMetrics);
+        registerMetrics();
         final Sensor restoreSensor =
             StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics);
 
         // register and possibly restore the state from the logs
         maybeMeasureLatency(() -> super.init(context, root), time, restoreSensor);
     }
 
+    @Override
+    public void init(final StateStoreContext context,
+                     final StateStore root) {
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
+        initStoreSerde(context);
+        taskId = context.taskId().toString();
+        streamsMetrics = (StreamsMetricsImpl) context.metrics();
+
+        registerMetrics();
+        final Sensor restoreSensor =

Review comment:
       Ditto here.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -83,14 +85,40 @@
         this.valueSerde = valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
         taskId = context.taskId().toString();
         initStoreSerde(context);
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+        registerMetrics();

Review comment:
       `this.context` seems only used in the e2e latency as
   
   ```
   final long e2eLatency =  currentTime - context.timestamp();
   ```
   
   And in that case we may throw a NPE. Should we augment the condition as 
   
   ```
   if (e2eLatencySensor.shouldRecord() && context != null)
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java
##########
@@ -29,7 +29,9 @@
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;

Review comment:
       Why not import static the function directly like in other classes?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
##########
@@ -47,9 +48,42 @@ public static StreamsMetricsImpl getMetricsImpl(final ProcessorContext context)
         return (StreamsMetricsImpl) context.metrics();
     }
 
+    /**
+     * Should be removed as part of KAFKA-10217
+     */
+    public static StreamsMetricsImpl getMetricsImpl(final StateStoreContext context) {
+        return (StreamsMetricsImpl) context.metrics();
+    }
+
     public static String changelogFor(final ProcessorContext context, final String storeName) {
         return context instanceof InternalProcessorContext
             ? ((InternalProcessorContext) context).changelogFor(storeName)
             : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
     }
+
+    public static String changelogFor(final StateStoreContext context, final String storeName) {
+        return context instanceof InternalProcessorContext
+            ? ((InternalProcessorContext) context).changelogFor(storeName)
+            : ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
+    }
+
+    public static InternalProcessorContext asInternalProcessorContext(final ProcessorContext context) {
+        if (context instanceof InternalProcessorContext) {
+            return (InternalProcessorContext) context;
+        } else {
+            throw new IllegalArgumentException(
+                "This component requires internal features of Kafka Streams and must be disabled for unit tests."
+            );
+        }
+    }

Review comment:
       Thanks!

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##########
@@ -83,14 +85,40 @@
         this.valueSerde = valueSerde;
     }
 
+    @Deprecated
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        this.context = context;
+        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
         taskId = context.taskId().toString();
         initStoreSerde(context);
         streamsMetrics = (StreamsMetricsImpl) context.metrics();
 
+        registerMetrics();

Review comment:
       NVM, I saw you already did this :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9388: KAFKA-10562: Properly invoke new StateStoreContext init

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9388:
URL: https://github.com/apache/kafka/pull/9388#issuecomment-705316037


   Looks like the java 11 build only had a couple of environmental failures:
   ```
   00:52:22  org.gradle.internal.remote.internal.ConnectException: Could not connect to server [86e7da13-a292-40f6-a626-7774e2173e77 port:42903, addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1].
   00:52:22  	at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:67)
   00:52:22  	at org.gradle.internal.remote.internal.hub.MessageHubBackedClient.getConnection(MessageHubBackedClient.java:36)
   00:52:22  	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:123)
   00:52:22  	at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
   00:52:22  	at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
   00:52:22  	at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
   00:52:22  Caused by: java.net.ConnectException: Connection refused
   00:52:22  	at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
   00:52:22  	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:776)
   00:52:22  	at java.base/sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:120)
   00:52:22  	at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.tryConnect(TcpOutgoingConnector.java:81)
   00:52:22  	at org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:54)
   00:52:22  	... 5 more
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org