You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/07 21:39:29 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10343: KAFKA-12471: Implement createPartitions in KIP-500 mode

hachikuji commented on a change in pull request #10343:
URL: https://github.com/apache/kafka/pull/10343#discussion_r609081390



##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1007,6 +1014,112 @@ int bestLeader(int[] replicas, int[] isr, boolean unclean) {
         return ControllerResult.of(records, null);
     }
 
+    ControllerResult<List<CreatePartitionsTopicResult>>
+            createPartitions(List<CreatePartitionsTopic> topics) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        List<CreatePartitionsTopicResult> results = new ArrayList<>();
+        for (CreatePartitionsTopic topic : topics) {
+            ApiError apiError = ApiError.NONE;
+            try {
+                createPartitions(topic, records);
+            } catch (ApiException e) {
+                apiError = ApiError.fromThrowable(e);
+            } catch (Exception e) {
+                log.error("Unexpected createPartitions error for {}", topic, e);
+                apiError = ApiError.fromThrowable(e);
+            }
+            results.add(new CreatePartitionsTopicResult().
+                setName(topic.name()).
+                setErrorCode(apiError.error().code()).
+                setErrorMessage(apiError.message()));
+        }
+        return new ControllerResult<>(records, results, true);
+    }
+
+    void createPartitions(CreatePartitionsTopic topic,
+                          List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topic.name());
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException();
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException();
+        }
+        if (topic.count() == topicInfo.parts.size()) {
+            throw new InvalidPartitionsException("Topic already has " +
+                topicInfo.parts.size() + " partition(s).");
+        } else if (topic.count() < topicInfo.parts.size()) {
+            throw new InvalidPartitionsException("The topic " + topic.name() + " currently " +
+                "has " + topicInfo.parts.size() + " partition(s); " + topic.count() +
+                " would not be an increase.");
+        }
+        int additional = topic.count() - topicInfo.parts.size();
+        if (topic.assignments() != null) {
+            if (topic.assignments().size() != additional) {
+                throw new InvalidReplicaAssignmentException("Attempted to add " + additional +
+                    " additional partition(s), but only " + topic.assignments().size() +
+                    " assignment(s) were specified.");
+            }
+        }
+        Iterator<PartitionControlInfo> iterator = topicInfo.parts.values().iterator();
+        if (!iterator.hasNext()) {
+            throw new UnknownServerException("Invalid state: topic " + topic.name() +
+                " appears to have no partitions.");
+        }
+        PartitionControlInfo partitionInfo = iterator.next();
+        if (partitionInfo.replicas.length > Short.MAX_VALUE) {
+            throw new UnknownServerException("Invalid replication factor " +
+                partitionInfo.replicas.length + ": expected a number less than 65536.");
+        }
+        short replicationFactor = (short) partitionInfo.replicas.length;
+        int startPartitionId = topicInfo.parts.size();
+
+        List<List<Integer>> placements = null;
+        if (topic.assignments() != null) {
+            placements = new ArrayList<>();
+            for (CreatePartitionsAssignment assignment : topic.assignments()) {

Review comment:
       nit: it would improve readability to factor out some functions for some of the work here. Here we can have a separate function with a nice name for building the assignments

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -538,4 +542,57 @@ class ControllerApis(val requestChannel: RequestChannel,
         }
       })
   }
+
+  def handleCreatePartitions(request: RequestChannel.Request): Unit = {
+    val responses = createPartitions(request.body[CreatePartitionsRequest].data,
+      authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new CreatePartitionsResponseData().
+        setResults(responses).
+        setThrottleTimeMs(throttleTimeMs)
+      new CreatePartitionsResponse(responseData)
+    })
+  }
+
+  def createPartitions(request: CreatePartitionsRequestData,
+                       hasClusterAuth: Boolean,
+                       getCreatableTopics: Iterable[String] => Set[String]): util.List[CreatePartitionsTopicResult] = {
+    val responses = new util.ArrayList[CreatePartitionsTopicResult]()
+    val duplicateTopicNames = new util.HashSet[String]()
+    val topicNames = new util.HashSet[String]()
+    request.topics().forEach {
+      topic =>
+        if (!topicNames.add(topic.name())) {
+          duplicateTopicNames.add(topic.name())
+        }
+    }
+    duplicateTopicNames.forEach {
+      case topicName => responses.add(new CreatePartitionsTopicResult().

Review comment:
       nit: the `case` is not needed. usually we put the argument on the previous line.
   ```scala
       duplicateTopicNames.forEach { topicName =>
   ```
   Same in the `topicNames` loop below.

##########
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##########
@@ -538,4 +542,57 @@ class ControllerApis(val requestChannel: RequestChannel,
         }
       })
   }
+
+  def handleCreatePartitions(request: RequestChannel.Request): Unit = {
+    val responses = createPartitions(request.body[CreatePartitionsRequest].data,
+      authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME),
+      names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, names)(n => n))
+    requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+      val responseData = new CreatePartitionsResponseData().
+        setResults(responses).
+        setThrottleTimeMs(throttleTimeMs)
+      new CreatePartitionsResponse(responseData)
+    })
+  }
+
+  def createPartitions(request: CreatePartitionsRequestData,
+                       hasClusterAuth: Boolean,
+                       getCreatableTopics: Iterable[String] => Set[String]): util.List[CreatePartitionsTopicResult] = {
+    val responses = new util.ArrayList[CreatePartitionsTopicResult]()
+    val duplicateTopicNames = new util.HashSet[String]()
+    val topicNames = new util.HashSet[String]()
+    request.topics().forEach {
+      topic =>
+        if (!topicNames.add(topic.name())) {
+          duplicateTopicNames.add(topic.name())
+        }
+    }
+    duplicateTopicNames.forEach {
+      case topicName => responses.add(new CreatePartitionsTopicResult().
+        setName(topicName).
+        setErrorCode(INVALID_REQUEST.code()).
+        setErrorMessage("Duplicate topic name."))
+        topicNames.remove(topicName)
+    }
+    val authorizedTopicNames = {
+      if (hasClusterAuth) {
+        topicNames.asScala
+      } else {
+        getCreatableTopics(topicNames.asScala)
+      }
+    }
+    val topics = new util.ArrayList[CreatePartitionsTopic]
+    topicNames.forEach {
+      case topicName => if (authorizedTopicNames.contains(topicName)) {
+        topics.add(request.topics().find(topicName))
+      } else {
+        responses.add(new CreatePartitionsTopicResult().
+          setName(topicName).
+          setErrorCode(TOPIC_AUTHORIZATION_FAILED.code()))
+      }
+    }
+    val results = controller.createPartitions(topics).get()

Review comment:
       Why do we need to block?

##########
File path: metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -518,4 +523,89 @@ public void testDeleteTopics() throws Exception {
                 Long.MAX_VALUE, Collections.singleton("foo")));
         assertEmptyTopicConfigs(ctx, "foo");
     }
+
+
+    @Test
+    public void testCreatePartitions() throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        request.topics().add(new CreatableTopic().setName("foo").
+            setNumPartitions(3).setReplicationFactor((short) 2));
+        request.topics().add(new CreatableTopic().setName("bar").
+            setNumPartitions(4).setReplicationFactor((short) 2));
+        request.topics().add(new CreatableTopic().setName("quux").
+            setNumPartitions(2).setReplicationFactor((short) 2));
+        request.topics().add(new CreatableTopic().setName("foo2").
+            setNumPartitions(2).setReplicationFactor((short) 2));
+        registerBroker(0, ctx);
+        unfenceBroker(0, ctx);
+        registerBroker(1, ctx);
+        unfenceBroker(1, ctx);
+        ControllerResult<CreateTopicsResponseData> createTopicResult =
+            replicationControl.createTopics(request);
+        ctx.replay(createTopicResult.records());
+        List<CreatePartitionsTopic> topics = new ArrayList<>();
+        topics.add(new CreatePartitionsTopic().
+            setName("foo").setCount(5).setAssignments(null));
+        topics.add(new CreatePartitionsTopic().
+            setName("bar").setCount(3).setAssignments(null));
+        topics.add(new CreatePartitionsTopic().
+            setName("baz").setCount(3).setAssignments(null));
+        topics.add(new CreatePartitionsTopic().
+            setName("quux").setCount(2).setAssignments(null));
+        ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
+            replicationControl.createPartitions(topics);
+        assertEquals(Arrays.asList(new CreatePartitionsTopicResult().
+                setName("foo").
+                setErrorCode(NONE.code()).
+                setErrorMessage(null),
+            new CreatePartitionsTopicResult().
+                setName("bar").
+                setErrorCode(INVALID_PARTITIONS.code()).
+                setErrorMessage("The topic bar currently has 4 partition(s); 3 would not be an increase."),
+            new CreatePartitionsTopicResult().
+                setName("baz").
+                setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+                setErrorMessage(null),
+            new CreatePartitionsTopicResult().
+                setName("quux").
+                setErrorCode(INVALID_PARTITIONS.code()).
+                setErrorMessage("Topic already has 2 partition(s).")),
+            createPartitionsResult.response());
+        ctx.replay(createPartitionsResult.records());
+        List<CreatePartitionsTopic> topics2 = new ArrayList<>();
+        topics2.add(new CreatePartitionsTopic().

Review comment:
       It's easier to follow these tests when they are broken down into smaller cases. That also gives us an opportunity to provide an opportunity to give a good name to the case we're verifying. For example, `testCreatePartitionsWithInvalidReplicaAssignment` or `testCreatePartitionsWithSmallerPartitionCount`. 

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -1007,6 +1014,112 @@ int bestLeader(int[] replicas, int[] isr, boolean unclean) {
         return ControllerResult.of(records, null);
     }
 
+    ControllerResult<List<CreatePartitionsTopicResult>>
+            createPartitions(List<CreatePartitionsTopic> topics) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        List<CreatePartitionsTopicResult> results = new ArrayList<>();
+        for (CreatePartitionsTopic topic : topics) {
+            ApiError apiError = ApiError.NONE;
+            try {
+                createPartitions(topic, records);
+            } catch (ApiException e) {
+                apiError = ApiError.fromThrowable(e);
+            } catch (Exception e) {
+                log.error("Unexpected createPartitions error for {}", topic, e);
+                apiError = ApiError.fromThrowable(e);
+            }
+            results.add(new CreatePartitionsTopicResult().
+                setName(topic.name()).
+                setErrorCode(apiError.error().code()).
+                setErrorMessage(apiError.message()));
+        }
+        return new ControllerResult<>(records, results, true);
+    }
+
+    void createPartitions(CreatePartitionsTopic topic,
+                          List<ApiMessageAndVersion> records) {
+        Uuid topicId = topicsByName.get(topic.name());
+        if (topicId == null) {
+            throw new UnknownTopicOrPartitionException();
+        }
+        TopicControlInfo topicInfo = topics.get(topicId);
+        if (topicInfo == null) {
+            throw new UnknownTopicOrPartitionException();
+        }
+        if (topic.count() == topicInfo.parts.size()) {
+            throw new InvalidPartitionsException("Topic already has " +
+                topicInfo.parts.size() + " partition(s).");
+        } else if (topic.count() < topicInfo.parts.size()) {
+            throw new InvalidPartitionsException("The topic " + topic.name() + " currently " +
+                "has " + topicInfo.parts.size() + " partition(s); " + topic.count() +
+                " would not be an increase.");
+        }
+        int additional = topic.count() - topicInfo.parts.size();
+        if (topic.assignments() != null) {
+            if (topic.assignments().size() != additional) {
+                throw new InvalidReplicaAssignmentException("Attempted to add " + additional +
+                    " additional partition(s), but only " + topic.assignments().size() +
+                    " assignment(s) were specified.");
+            }
+        }
+        Iterator<PartitionControlInfo> iterator = topicInfo.parts.values().iterator();
+        if (!iterator.hasNext()) {
+            throw new UnknownServerException("Invalid state: topic " + topic.name() +
+                " appears to have no partitions.");
+        }
+        PartitionControlInfo partitionInfo = iterator.next();
+        if (partitionInfo.replicas.length > Short.MAX_VALUE) {
+            throw new UnknownServerException("Invalid replication factor " +
+                partitionInfo.replicas.length + ": expected a number less than 65536.");

Review comment:
       nit: `Short.MAX_VALUE` is 32767




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org