You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/03 15:37:47 UTC

[GitHub] [kafka] chia7712 opened a new pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not…

chia7712 opened a new pull request #8978:
URL: https://github.com/apache/kafka/pull/8978


   We instantiate, configure and use them but them are never closed.
   
   issue: https://issues.apache.org/jira/browse/KAFKA-10234
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #8978:
URL: https://github.com/apache/kafka/pull/8978#issuecomment-691790414


   ```
   Build / JDK 8 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   Build / JDK 15 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota
   Build / JDK 15 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota
   Build / JDK 15 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```
   unrelated failure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #8978:
URL: https://github.com/apache/kafka/pull/8978#issuecomment-666943950


   @kkonstantine Could you please take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8978:
URL: https://github.com/apache/kafka/pull/8978#discussion_r449675093



##########
File path: clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
##########
@@ -26,17 +26,16 @@
 
 public class MockDeserializer implements ClusterResourceListener, Deserializer<byte[]> {
     public static AtomicInteger initCount = new AtomicInteger(0);
-    public static AtomicInteger closeCount = new AtomicInteger(0);
     public static AtomicReference<ClusterResource> clusterMeta = new AtomicReference<>();
     public static ClusterResource noClusterId = new ClusterResource("no_cluster_id");
     public static AtomicReference<ClusterResource> clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId);
 
     public boolean isKey;
     public Map<String, ?> configs;
+    public boolean isClosed = false;

Review comment:
       I did not reuse the atomic count since the count is a static variable. Junit, by default, could run different tests on same JVM and hence the static variable may be changed by other tests.
   
   
   For another, your point makes sense that the atomic integer is good enough. I will revert the Boolean and remove the static modifier to prevent the case I described above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #8978:
URL: https://github.com/apache/kafka/pull/8978#discussion_r449728663



##########
File path: clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
##########
@@ -26,17 +26,16 @@
 
 public class MockDeserializer implements ClusterResourceListener, Deserializer<byte[]> {
     public static AtomicInteger initCount = new AtomicInteger(0);
-    public static AtomicInteger closeCount = new AtomicInteger(0);
     public static AtomicReference<ClusterResource> clusterMeta = new AtomicReference<>();
     public static ClusterResource noClusterId = new ClusterResource("no_cluster_id");
     public static AtomicReference<ClusterResource> clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId);
 
     public boolean isKey;
     public Map<String, ?> configs;
+    public boolean isClosed = false;

Review comment:
       I revert all changes of ```MockDeserializer``` as the concern I described is not existent yet.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #8978:
URL: https://github.com/apache/kafka/pull/8978#issuecomment-687725573


   @kkonstantine Could you please take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8978: KAFKA-10234 The key/value deserializer used by ConsoleConsumer is not…

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8978:
URL: https://github.com/apache/kafka/pull/8978#discussion_r449657463



##########
File path: clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
##########
@@ -26,17 +26,16 @@
 
 public class MockDeserializer implements ClusterResourceListener, Deserializer<byte[]> {
     public static AtomicInteger initCount = new AtomicInteger(0);
-    public static AtomicInteger closeCount = new AtomicInteger(0);
     public static AtomicReference<ClusterResource> clusterMeta = new AtomicReference<>();
     public static ClusterResource noClusterId = new ClusterResource("no_cluster_id");
     public static AtomicReference<ClusterResource> clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId);
 
     public boolean isKey;
     public Map<String, ?> configs;
+    public boolean isClosed = false;

Review comment:
       This needs to be `volatile` or otherwise accessed with synchronization. 
   
   In multicores (and multiprocessors in general) changing the value of a variable like this is, is not guaranteed to be visible to other threads unless there's a memory barrier associated with this change, that will enforce a happens-before behavior for the changes on this variable. 
   
   I'm saying the above, without really checking whether this code is accessed by multiple threads. Yet the presence of several atomic variables tells me that it is. 

##########
File path: clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
##########
@@ -26,17 +26,16 @@
 
 public class MockDeserializer implements ClusterResourceListener, Deserializer<byte[]> {
     public static AtomicInteger initCount = new AtomicInteger(0);
-    public static AtomicInteger closeCount = new AtomicInteger(0);
     public static AtomicReference<ClusterResource> clusterMeta = new AtomicReference<>();
     public static ClusterResource noClusterId = new ClusterResource("no_cluster_id");
     public static AtomicReference<ClusterResource> clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId);
 
     public boolean isKey;
     public Map<String, ?> configs;
+    public boolean isClosed = false;

Review comment:
       On another note, why are we switching to `boolean` from `integer`? I agree that there's no use of `closeCount` in the code base and that this is not public interface. But this changes the symmetry with `initCount` (which remains an integer) and it could still be used by a third-party package that uses these tests (by incorrectly depending on non-public interface). My point is, if we just do this for an assertion, we could use 
   
   `closeCount.get() > 0` or `close.get() == 0` for the equivalent of `true` or `false` 
   
   and be on the safe side (preserving symmetry too). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org