You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/07 14:15:48 UTC

[GitHub] [kafka] mimaison commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)

mimaison commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r890986946


##########
gradle/spotbugs-exclude.xml:
##########
@@ -311,6 +311,16 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
         <Bug pattern="SF_SWITCH_FALLTHROUGH"/>
     </Match>
 
+    <Match>
+        <!--
+            Temporarily suppress warnings about unused private methods (will be used in a subsequent pull request)
+            TODO: Remove this before merging to trunk

Review Comment:
   Do you already have the PR that clears this?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -138,6 +138,18 @@ public interface Herder {
      */
     void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback, InternalRequestSignature requestSignature);
 
+    /**
+     * Fence out any older task generations for a source connector, and then write a record to the config topic
+     * indicating that it is safe to bring up a new generation of tasks. If that record is already present, do nothing
+     * and invoke the callback successfully.
+     * @param connName the name of the connector to fence out, which must refer to a source connector; if the
+     *                 connector does not exist or is not a source connector, the callback will be invoked with an error
+     * @param callback callback to invoke upon completion
+     * @param requestSignature the signature of the request made for this connector;
+     *                         may be null if no signature was provided
+     */
+    void fenceZombies(String connName, Callback<Void> callback, InternalRequestSignature requestSignature);

Review Comment:
   What about `fenceZombieSourceTasks()`? I find `fenceZombies()` a bit too generic



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -320,6 +320,18 @@ public void putTaskConfigs(final @PathParam("connector") String connector,
         completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers, taskConfigs, forward);
     }
 
+    @PUT
+    @Path("/{connector}/fence")
+    public Response fenceZombies(final @PathParam("connector") String connector,
+                             final @Context HttpHeaders headers,
+                             final @QueryParam("forward") Boolean forward,
+                             final byte[] requestBody) throws Throwable {
+        FutureCallback<Void> cb = new FutureCallback<>();
+        herder.fenceZombies(connector, cb, InternalRequestSignature.fromHeaders(requestBody, headers));
+        completeOrForwardRequest(cb, "/connectors/" + connector + "/fence", "PUT", headers, requestBody, forward);
+        return Response.ok().build();

Review Comment:
   Do we need this? Also does this method need to return `Response`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -291,7 +328,16 @@ public void start() {
         log.info("Starting KafkaConfigBackingStore");
         // Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
         // updates can continue to occur in the background
-        configLog.start();
+        try {
+            configLog.start();
+        } catch (UnsupportedVersionException e) {
+            throw new ConnectException(
+                    "Enabling exactly-once support for source connectors requires a Kafka broker version that allows "
+                            + "admin clients to read consumer offsets. Disable the worker's exactly-once support "
+                            + "for source connectors, or use a new Kafka broker version.",

Review Comment:
   `newer`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -201,6 +214,9 @@ public static String COMMIT_TASKS_KEY(String connectorName) {
     public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
             .field("state", Schema.STRING_SCHEMA)
             .build();
+    public static final Schema TASK_COUNT_RECORD_V0 = SchemaBuilder.struct()
+            .field("tasks", Schema.INT32_SCHEMA)

Review Comment:
   Would `task_count` or even `count` (like `state`) be clearer?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -309,16 +355,70 @@ public void start() {
     @Override
     public void stop() {
         log.info("Closing KafkaConfigBackingStore");
-        try {
-            configLog.stop();
-        } finally {
-            if (ownTopicAdmin != null) {
-                ownTopicAdmin.close();
-            }
+
+        if (fencableProducer != null) {
+            Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), "fencable producer for config topic");
         }
+        Utils.closeQuietly(ownTopicAdmin, "admin for config topic");
+        Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic");
+
         log.info("Closed KafkaConfigBackingStore");
     }
 
+    @Override
+    public void claimWritePrivileges() {
+        if (usesFencableWriter && fencableProducer == null) {
+            try {
+                fencableProducer = createFencableProducer();
+                fencableProducer.initTransactions();
+            } catch (Exception e) {
+                if (fencableProducer != null) {
+                    Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), "fencable producer for config topic");
+                    fencableProducer = null;
+                }
+                throw new ConnectException("Failed to create and initialize fencable producer for config topic", e);
+            }
+        }
+    }
+
+    private Map<String, Object> baseProducerProps(WorkerConfig workerConfig) {
+        Map<String, Object> producerProps = new HashMap<>(workerConfig.originals());
+        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(workerConfig);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
+        ConnectUtils.addMetricsContextProperties(producerProps, workerConfig, kafkaClusterId);
+        return producerProps;
+    }
+
+    // Visible for testing
+    Map<String, Object> fencableProducerProps(DistributedConfig workerConfig) {
+        Map<String, Object> result = new HashMap<>(baseProducerProps(workerConfig));
+
+        // Always require producer acks to all to ensure durable writes
+        result.put(ProducerConfig.ACKS_CONFIG, "all");
+        // Don't allow more than one in-flight request to prevent reordering on retry (if enabled)
+        result.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);

Review Comment:
   Isn't ordering still guaranteed with retries when idempotency is enabled?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##########
@@ -90,6 +89,10 @@ public interface ConfigBackingStore {
      */
     void putTargetState(String connector, TargetState state);
 
+    /**
+     * Store a new {@link SessionKey} that can be used to validate internal (i.e., non-user-triggered) inter-worker communication.

Review Comment:
   This type of small cleanups are really appreciated, thanks!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -784,6 +845,14 @@ private static Map<String, Object> connectorClientConfigOverrides(ConnectorTaskI
         return clientOverrides;
     }
 
+    private String transactionalId(ConnectorTaskId id) {

Review Comment:
   I was confused for a moment as I remember seeing these methods in another PR. I see this PR has conflicts so this must be the reason and they'll disappear from here once this is rebased on trunk



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -784,6 +845,14 @@ private static Map<String, Object> connectorClientConfigOverrides(ConnectorTaskI
         return clientOverrides;
     }
 
+    private String transactionalId(ConnectorTaskId id) {
+        return transactionalId(config.groupId(), id.connector(), id.task());
+    }
+
+    public static String transactionalId(String groupId, String connector, int taskId) {

Review Comment:
   Is this going to be called from other places in the remaining PRs? If not we could get rid of it



##########
connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java:
##########
@@ -139,14 +140,23 @@ public void testNoFileOption() throws IOException {
         jaasBasicAuthFilter.filter(requestContext);
 
         verify(requestContext).abortWith(any(Response.class));
-        verify(requestContext).getMethod();
+        verify(requestContext, atLeastOnce()).getMethod();
         verify(requestContext).getHeaderString(JaasBasicAuthFilter.AUTHORIZATION);
     }
 
     @Test
-    public void testPostWithoutAppropriateCredential() throws IOException {
+    public void testInternalTaskConfigEndpointSkipped() throws IOException {
+        testInternalEndpointSkipped("connectors/connName/tasks");
+    }
+
+    @Test
+    public void testInternalZombieFencingEndpointSkipped() throws IOException {
+        testInternalEndpointSkipped("connectors/connName/fence");
+    }
+
+    private void testInternalEndpointSkipped(String endpoint) throws IOException {
         UriInfo uriInfo = mock(UriInfo.class);
-        when(uriInfo.getPath()).thenReturn("connectors/connName/tasks");
+        when(uriInfo.getPath()).thenReturn(endpoint);
 
         ContainerRequestContext requestContext = mock(ContainerRequestContext.class);
         when(requestContext.getMethod()).thenReturn(HttpMethod.POST);

Review Comment:
   How does this test work with the `connectors/connName/fence` endpoint that uses `PUT`?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java:
##########
@@ -280,4 +326,4 @@ public int hashCode() {
                 inconsistentConnectors,
                 configTransformer);
     }
-}
+}

Review Comment:
   Nit, let's keep the new line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org