You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/07 06:36:58 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890817292


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -582,6 +588,59 @@ public boolean startTask(
         }
     }
 
+    /**
+     * Using the admin principal for this connector, perform a round of zombie fencing that disables transactional producers
+     * for the specified number of source tasks from sending any more records.
+     * @param connName the name of the connector
+     * @param numTasks the number of tasks to fence out
+     * @param connProps the configuration of the connector; may not be null
+     * @return a {@link KafkaFuture} that will complete when the producers have all been fenced out, or the attempt has failed
+     */
+    public KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, String> connProps) {
+        return fenceZombies(connName, numTasks, connProps, Admin::create);
+    }
+
+    // Allows us to mock out the Admin client for testing
+    KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, String> connProps, Function<Map<String, Object>, Admin> adminFactory) {
+        log.debug("Fencing out {} task producers for source connector {}", numTasks, connName);
+        try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
+            ClassLoader savedLoader = plugins.currentThreadLoader();
+            try {
+                String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+                ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
+                savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+                final SourceConnectorConfig connConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
+                final Class<? extends Connector> connClass = plugins.connectorClass(
+                        connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+
+                Map<String, Object> adminConfig = adminConfigs(
+                        connName,
+                        "connector-worker-adminclient-" + connName,
+                        config,
+                        connConfig,
+                        connClass,
+                        connectorClientConfigOverridePolicy,
+                        kafkaClusterId,
+                        ConnectorType.SOURCE);
+                Admin admin = adminFactory.apply(adminConfig);

Review Comment:
   Good catch, done.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -582,6 +588,59 @@ public boolean startTask(
         }
     }
 
+    /**
+     * Using the admin principal for this connector, perform a round of zombie fencing that disables transactional producers
+     * for the specified number of source tasks from sending any more records.
+     * @param connName the name of the connector
+     * @param numTasks the number of tasks to fence out
+     * @param connProps the configuration of the connector; may not be null
+     * @return a {@link KafkaFuture} that will complete when the producers have all been fenced out, or the attempt has failed
+     */
+    public KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, String> connProps) {
+        return fenceZombies(connName, numTasks, connProps, Admin::create);
+    }
+
+    // Allows us to mock out the Admin client for testing
+    KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, String> connProps, Function<Map<String, Object>, Admin> adminFactory) {
+        log.debug("Fencing out {} task producers for source connector {}", numTasks, connName);
+        try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
+            ClassLoader savedLoader = plugins.currentThreadLoader();
+            try {
+                String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+                ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
+                savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);

Review Comment:
   Ah yeah, been toying with that idea for a while but never got around to trying it out. Works pretty well in this case; the one wrinkle is that the signature for `AutoCloseable::close` includes a checked exception. I've added a new (internal) `LoaderSwap` class that implements `AutoCloseable` and removes that checked exception to address that.
   
   If this looks good, we can retrofit other parts of the code base to leverage it in a follow-up.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -138,6 +138,17 @@ public interface Herder {
      */
     void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback, InternalRequestSignature requestSignature);
 
+    /**
+     * Fence out any older task generations for a source connector, and then write a record to the config topic
+     * indicating that it is safe to bring up a new generation of tasks. If that record is already present, do nothing
+     * and invoke the callback successfully.
+     * @param connName the name of the connector to fence out; must refer to a source connector

Review Comment:
   The callback is invoked with an error (added to Javadocs)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org