You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/16 16:47:36 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

ableegoldman commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r489580109



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -45,33 +49,38 @@
         "Please report at https://issues.apache.org/jira/projects/KAFKA or dev-mailing list (https://kafka.apache.org/contact).";
 
     private final Logger log;
-    private final long windowChangeLogAdditionalRetention;
-    private final Map<String, String> defaultTopicConfigs = new HashMap<>();
 
-    private final short replicationFactor;
+    private final Time time;
     private final Admin adminClient;
 
-    private final int retries;
+    private final short replicationFactor;
+    private final long windowChangeLogAdditionalRetention;
     private final long retryBackOffMs;
+    private final long retryTimeoutMs;
+
+    private final Map<String, String> defaultTopicConfigs = new HashMap<>();
 
-    @SuppressWarnings("deprecation") // TODO: remove in follow up PR when `RETRIES` is removed
-    public InternalTopicManager(final Admin adminClient, final StreamsConfig streamsConfig) {
+    public InternalTopicManager(final Time time,
+                                final Admin adminClient,
+                                final StreamsConfig streamsConfig) {
+        this.time = time;
         this.adminClient = adminClient;
 
         final LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName()));
         log = logContext.logger(getClass());
 
         replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
         windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
-        final AdminClientConfig adminConfigs = new ClientUtils.QuietAdminClientConfig(streamsConfig);
-        retries = adminConfigs.getInt(AdminClientConfig.RETRIES_CONFIG);
-        retryBackOffMs = adminConfigs.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
+        retryBackOffMs = streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
+        final Map<String, Object> consumerConfig = streamsConfig.getMainConsumerConfigs("dummy", "dummy", -1);
+        // need to add mandatory configs; otherwise `QuietConsumerConfig` throws
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        retryTimeoutMs = new QuietConsumerConfig(consumerConfig).getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) / 2L;

Review comment:
       Hey @mjsax , am I reading this PR correctly? Do we now only allow a single member to retry topic creation/validation for up to half of the poll interval, after which we shut down the entire application? That sounds like the opposite of resiliency...what if the brokers are temporarily unavailable? Before this we would just let the single thread die, and the internal topic creation/validation would be retried on the subsequent rebalance. That wasn't ideal, but given the upcoming work to allow reviving/recreating a death thread, that seems to be preferable to permanently ending the application? 
   
   Sorry if I'm misreading this, was just going over all the PRs in the last month or so to produce a diff+summary of the important ones, and want to make sure I actually understand all the changes we've made




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org