You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/20 00:08:58 UTC

[GitHub] [kafka] cmccabe opened a new pull request #10366: KAFKA-12467: Add controller-side snapshot generation

cmccabe opened a new pull request #10366:
URL: https://github.com/apache/kafka/pull/10366


   Implement controller-side snapshot generation.
   
   This is a work in progress, no tests are here yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10366: KAFKA-12467: Add controller-side snapshot generation

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10366:
URL: https://github.com/apache/kafka/pull/10366


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10366: KAFKA-12467: Add controller-side snapshot generation

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10366:
URL: https://github.com/apache/kafka/pull/10366#discussion_r606373590



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -301,6 +328,89 @@ private void appendControlEvent(String name, Runnable handler) {
         queue.append(event);
     }
 
+    private static final String GENERATE_SNAPSHOT = "generateSnapshot";
+
+    private static final int MAX_BATCHES_PER_GENERATE_CALL = 10;
+
+    class SnapshotGeneratorManager implements Runnable {
+        private final Function<Long, SnapshotWriter> writerBuilder;
+        private final ExponentialBackoff exponentialBackoff =
+            new ExponentialBackoff(10, 2, 5000, 0);
+        private SnapshotGenerator generator = null;
+
+        SnapshotGeneratorManager(Function<Long, SnapshotWriter> writerBuilder) {
+            this.writerBuilder = writerBuilder;
+        }
+
+        void createSnapshotGenerator(long epoch) {

Review comment:
       It's not technically necessary since we have other checks, but I will add a check.  In general this situation should not occur.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10366: KAFKA-12467: Add controller-side snapshot generation

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10366:
URL: https://github.com/apache/kafka/pull/10366#discussion_r606355983



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
##########
@@ -272,4 +276,44 @@ private ApiError validateEntity(ClientQuotaEntity entity, Map<String, String> va
 
         return ApiError.NONE;
     }
+
+    class ClientQuotaControlIterator implements Iterator<List<ApiMessageAndVersion>> {
+        private final long epoch;
+        private final Iterator<Entry<ClientQuotaEntity, TimelineHashMap<String, Double>>> iterator;
+
+        ClientQuotaControlIterator(long epoch) {
+            this.epoch = epoch;
+            this.iterator = clientQuotaData.entrySet(epoch).iterator();

Review comment:
       Since we're getting the entry set of a particular epoch, does this mean we'll be getting consistent snapshots?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -301,6 +328,89 @@ private void appendControlEvent(String name, Runnable handler) {
         queue.append(event);
     }
 
+    private static final String GENERATE_SNAPSHOT = "generateSnapshot";
+
+    private static final int MAX_BATCHES_PER_GENERATE_CALL = 10;
+
+    class SnapshotGeneratorManager implements Runnable {
+        private final Function<Long, SnapshotWriter> writerBuilder;
+        private final ExponentialBackoff exponentialBackoff =
+            new ExponentialBackoff(10, 2, 5000, 0);
+        private SnapshotGenerator generator = null;
+
+        SnapshotGeneratorManager(Function<Long, SnapshotWriter> writerBuilder) {
+            this.writerBuilder = writerBuilder;
+        }
+
+        void createSnapshotGenerator(long epoch) {

Review comment:
       Do we need to check to see if this epoch is known or valid? 

##########
File path: metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.slf4j.Logger;
+
+
+final class SnapshotGenerator {
+    static class Section {
+        private final String name;
+        private final Iterator<List<ApiMessageAndVersion>> iterator;
+
+        Section(String name, Iterator<List<ApiMessageAndVersion>> iterator) {
+            this.name = name;
+            this.iterator = iterator;
+        }
+
+        String name() {
+            return name;
+        }
+
+        Iterator<List<ApiMessageAndVersion>> iterator() {
+            return iterator;
+        }
+    }
+
+    private final Logger log;
+    private final SnapshotWriter writer;
+    private final int maxBatchesPerGenerateCall;
+    private final ExponentialBackoff exponentialBackoff;
+    private final List<Section> sections;
+    private final Iterator<Section> sectionIterator;
+    private Iterator<List<ApiMessageAndVersion>> batchIterator;
+    private List<ApiMessageAndVersion> batch;
+    private Section section;
+    private long numRecords;
+    private long numWriteTries;
+
+    SnapshotGenerator(LogContext logContext,
+                      SnapshotWriter writer,
+                      int maxBatchesPerGenerateCall,
+                      ExponentialBackoff exponentialBackoff,
+                      List<Section> sections) {
+        this.log = logContext.logger(SnapshotGenerator.class);
+        this.writer = writer;
+        this.maxBatchesPerGenerateCall = maxBatchesPerGenerateCall;
+        this.exponentialBackoff = exponentialBackoff;
+        this.sections = sections;
+        this.sectionIterator = this.sections.iterator();
+        this.batchIterator = Collections.emptyIterator();
+        this.batch = null;
+        this.section = null;
+        this.numRecords = 0;
+        this.numWriteTries = 0;
+    }
+
+    /**
+     * Returns the epoch of the snapshot that we are generating.
+     */
+    long epoch() {
+        return writer.epoch();
+    }
+
+    SnapshotWriter writer() {
+        return writer;
+    }
+
+    /**
+     * Generate the next batch of records.
+     *
+     * @return  0 if a batch was sent to the writer,
+     *          -1 if there are no more batches to generate,
+     *          or the number of times we tried to write and the writer
+     *          was busy, otherwise.
+     */
+    private long generateBatch() throws Exception {
+        if (batch == null) {
+            while (!batchIterator.hasNext()) {
+                if (section != null) {
+                    log.info("Generated {} record(s) for the {} section of snapshot {}.",
+                             numRecords, section.name(), writer.epoch());
+                    section = null;
+                    numRecords = 0;
+                }
+                if (!sectionIterator.hasNext()) {
+                    writer.completeSnapshot();
+                    return -1;
+                }
+                section = sectionIterator.next();
+                log.info("Generating records for the {} section of snapshot {}.",
+                         section.name(), writer.epoch());
+                batchIterator = section.iterator();
+            }
+            batch = batchIterator.next();
+        }
+        if (writer.writeBatch(batch)) {
+            numRecords += batch.size();
+            numWriteTries = 0;
+            batch = null;
+            return 0;
+        } else {
+            return ++numWriteTries;
+        }
+    }
+
+    /**
+     * Generate the next few batches of records.
+     *
+     * @return  The number of nanoseconds to delay before rescheduling the
+     *          generateBatches event, or empty if the snapshot is done.
+     */
+    OptionalLong generateBatches() throws Exception {
+        for (int numBatches = 0; numBatches < maxBatchesPerGenerateCall; numBatches++) {
+            long result = generateBatch();
+            if (result < 0) {
+                return OptionalLong.empty();
+            } else if (result > 0) {
+                return OptionalLong.of(exponentialBackoff.backoff(numWriteTries - 1));

Review comment:
       Should we use `result` here instead of `numWriteTries`? If not, maybe we can change generateBatch() to return a boolean instead




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10366: KAFKA-12467: Add controller-side snapshot generation

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10366:
URL: https://github.com/apache/kafka/pull/10366#discussion_r606372293



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
##########
@@ -272,4 +276,44 @@ private ApiError validateEntity(ClientQuotaEntity entity, Map<String, String> va
 
         return ApiError.NONE;
     }
+
+    class ClientQuotaControlIterator implements Iterator<List<ApiMessageAndVersion>> {
+        private final long epoch;
+        private final Iterator<Entry<ClientQuotaEntity, TimelineHashMap<String, Double>>> iterator;
+
+        ClientQuotaControlIterator(long epoch) {
+            this.epoch = epoch;
+            this.iterator = clientQuotaData.entrySet(epoch).iterator();

Review comment:
       Yes, snapshots are consistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10366: KAFKA-12467: Add controller-side snapshot generation

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10366:
URL: https://github.com/apache/kafka/pull/10366#issuecomment-813567129


   Fix spotbugs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10366: KAFKA-12467: Add controller-side snapshot generation

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10366:
URL: https://github.com/apache/kafka/pull/10366#issuecomment-811503450


   I split off parts of this into:
   https://github.com/apache/kafka/pull/10455
   https://github.com/apache/kafka/pull/10456


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10366: KAFKA-12467: Add controller-side snapshot generation

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10366:
URL: https://github.com/apache/kafka/pull/10366#discussion_r606374075



##########
File path: metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+
+import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.slf4j.Logger;
+
+
+final class SnapshotGenerator {
+    static class Section {
+        private final String name;
+        private final Iterator<List<ApiMessageAndVersion>> iterator;
+
+        Section(String name, Iterator<List<ApiMessageAndVersion>> iterator) {
+            this.name = name;
+            this.iterator = iterator;
+        }
+
+        String name() {
+            return name;
+        }
+
+        Iterator<List<ApiMessageAndVersion>> iterator() {
+            return iterator;
+        }
+    }
+
+    private final Logger log;
+    private final SnapshotWriter writer;
+    private final int maxBatchesPerGenerateCall;
+    private final ExponentialBackoff exponentialBackoff;
+    private final List<Section> sections;
+    private final Iterator<Section> sectionIterator;
+    private Iterator<List<ApiMessageAndVersion>> batchIterator;
+    private List<ApiMessageAndVersion> batch;
+    private Section section;
+    private long numRecords;
+    private long numWriteTries;
+
+    SnapshotGenerator(LogContext logContext,
+                      SnapshotWriter writer,
+                      int maxBatchesPerGenerateCall,
+                      ExponentialBackoff exponentialBackoff,
+                      List<Section> sections) {
+        this.log = logContext.logger(SnapshotGenerator.class);
+        this.writer = writer;
+        this.maxBatchesPerGenerateCall = maxBatchesPerGenerateCall;
+        this.exponentialBackoff = exponentialBackoff;
+        this.sections = sections;
+        this.sectionIterator = this.sections.iterator();
+        this.batchIterator = Collections.emptyIterator();
+        this.batch = null;
+        this.section = null;
+        this.numRecords = 0;
+        this.numWriteTries = 0;
+    }
+
+    /**
+     * Returns the epoch of the snapshot that we are generating.
+     */
+    long epoch() {
+        return writer.epoch();
+    }
+
+    SnapshotWriter writer() {
+        return writer;
+    }
+
+    /**
+     * Generate the next batch of records.
+     *
+     * @return  0 if a batch was sent to the writer,
+     *          -1 if there are no more batches to generate,
+     *          or the number of times we tried to write and the writer
+     *          was busy, otherwise.
+     */
+    private long generateBatch() throws Exception {
+        if (batch == null) {
+            while (!batchIterator.hasNext()) {
+                if (section != null) {
+                    log.info("Generated {} record(s) for the {} section of snapshot {}.",
+                             numRecords, section.name(), writer.epoch());
+                    section = null;
+                    numRecords = 0;
+                }
+                if (!sectionIterator.hasNext()) {
+                    writer.completeSnapshot();
+                    return -1;
+                }
+                section = sectionIterator.next();
+                log.info("Generating records for the {} section of snapshot {}.",
+                         section.name(), writer.epoch());
+                batchIterator = section.iterator();
+            }
+            batch = batchIterator.next();
+        }
+        if (writer.writeBatch(batch)) {
+            numRecords += batch.size();
+            numWriteTries = 0;
+            batch = null;
+            return 0;
+        } else {
+            return ++numWriteTries;
+        }
+    }
+
+    /**
+     * Generate the next few batches of records.
+     *
+     * @return  The number of nanoseconds to delay before rescheduling the
+     *          generateBatches event, or empty if the snapshot is done.
+     */
+    OptionalLong generateBatches() throws Exception {
+        for (int numBatches = 0; numBatches < maxBatchesPerGenerateCall; numBatches++) {
+            long result = generateBatch();
+            if (result < 0) {
+                return OptionalLong.empty();
+            } else if (result > 0) {
+                return OptionalLong.of(exponentialBackoff.backoff(numWriteTries - 1));

Review comment:
       It's fine to use `result` here, I guess.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10366: KAFKA-12467: Add controller-side snapshot generation

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10366:
URL: https://github.com/apache/kafka/pull/10366#issuecomment-812667473


   > Returning an iterator of batches works well when the control managers only have a single iterable thing that needs to be translated to records. Do you think we might have cases (eventually) where the control classes have more complex state? It might be more flexible to take a consumer rather than return an iterable:
   
   Hmm... I don't think that would be a problem.  You could have a single iterator that iterates over multiple data structures X and Y.  You would just need to store the state of whether you were done with X before starting Y.
   
   In general one of the goals of controller snapshots is to avoid "stopping the world" while the snapshot is generated.  So a synchronous API would not be useful here.  The iterator API allows us to take only a few batches of records at a time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org