You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/04 10:30:38 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #13796: [Broker]support split bundle by specified boundaries

codelipenghui commented on a change in pull request #13796:
URL: https://github.com/apache/pulsar/pull/13796#discussion_r819416887



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -570,18 +571,34 @@ public void splitNamespaceBundle(
             @PathParam("bundle") String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("unload") @DefaultValue("false") boolean unload,
-            @QueryParam("splitAlgorithmName") String splitAlgorithmName) {
-
+            @QueryParam("splitAlgorithmName") String splitAlgorithmName,
+            @ApiParam("splitBoundaries") List<Long> splitBoundaries) {
         try {
             validateNamespaceName(tenant, namespace);
-            internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload, splitAlgorithmName);
+            internalSplitNamespaceBundle(asyncResponse,
+                    bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{bundle}/topicHashPositions")
+    @ApiOperation(value = "Get hash positions for topics")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist")})
+    public TopicHashPositions getTopicHashPositions(
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("bundle") String bundleRange,
+            @QueryParam("topicList") List<String> topicList) {
+            validateNamespaceName(tenant, namespace);

Review comment:
       We should receive encoded topic names

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicHashPositions.java
##########
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data

Review comment:
       If we don't need a set operation, we can make the object immutable

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
##########
@@ -2061,10 +2062,11 @@ void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffi
      * @param bundle range of bundle to split
      * @param unloadSplitBundles
      * @param splitAlgorithmName
+     * @param splitBoundaries
      * @throws PulsarAdminException
      */
-    void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles, String splitAlgorithmName)
-            throws PulsarAdminException;
+    void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles,

Review comment:
       We can't change the API directly which will introduce API breaking change.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -570,18 +571,34 @@ public void splitNamespaceBundle(
             @PathParam("bundle") String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("unload") @DefaultValue("false") boolean unload,
-            @QueryParam("splitAlgorithmName") String splitAlgorithmName) {
-
+            @QueryParam("splitAlgorithmName") String splitAlgorithmName,
+            @ApiParam("splitBoundaries") List<Long> splitBoundaries) {
         try {
             validateNamespaceName(tenant, namespace);
-            internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload, splitAlgorithmName);
+            internalSplitNamespaceBundle(asyncResponse,
+                    bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{bundle}/topicHashPositions")
+    @ApiOperation(value = "Get hash positions for topics")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist")})
+    public TopicHashPositions getTopicHashPositions(
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("bundle") String bundleRange,
+            @QueryParam("topicList") List<String> topicList) {

Review comment:
       And it's better to use `topics` to keep consistent with the REST API definition, for example "@Path("/{tenant}/{namespace}/topics")"

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -1171,6 +1173,51 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String
         });
     }
 
+    protected TopicHashPositions internalGetTopicHashPositions(String bundleRange, List<String> topicList) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Getting hash position for topic list {}, bundle {}", clientAppId(), topicList, bundleRange);
+        }
+        if (topicList == null || topicList.size() == 0) {

Review comment:
       If the topicList is empty, we should return all the topics under the namespace?
   And we should check if the all the topics are under the namespace

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -1171,6 +1173,51 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String
         });
     }
 
+    protected TopicHashPositions internalGetTopicHashPositions(String bundleRange, List<String> topicList) {

Review comment:
       Make it pure async is better to avoid jetty thread block issues.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -1171,6 +1173,51 @@ protected void internalSplitNamespaceBundle(AsyncResponse asyncResponse, String
         });
     }
 
+    protected TopicHashPositions internalGetTopicHashPositions(String bundleRange, List<String> topicList) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Getting hash position for topic list {}, bundle {}", clientAppId(), topicList, bundleRange);
+        }
+        if (topicList == null || topicList.size() == 0) {
+            return null;
+        }
+        validateNamespacePolicyOperation(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ);
+        Policies policies = getNamespacePolicies(namespaceName);
+        NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+                false, true);
+        try {
+            List<String> allTopicsInThisBundle =
+                    pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).get();
+            if (allTopicsInThisBundle.size() == 0) {
+                return null;
+            }
+            Map<String, Long> topicHashPositions = new HashMap<>();
+            for (String topic : topicList) {
+                // partitioned topic
+                if (TopicName.get(topic).getPartitionIndex() == -1) {
+                    allTopicsInThisBundle.stream()
+                            .filter(t -> TopicName.get(t).getPartitionedTopicName()
+                                    .equals(TopicName.get(topic).getPartitionedTopicName()))
+                            .forEach(partition -> {
+                                topicHashPositions.put(partition,  pulsar().getNamespaceService()
+                                        .getNamespaceBundleFactory().getLongHashCode(partition));
+                            });
+                } else { // topic partition
+                    if (allTopicsInThisBundle.contains(topic)) {

Review comment:
       Should convert to TopicName first, the topic name might  be a short topic name

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/common/naming/BundleSplitOption.java
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+
+@Getter
+@Setter

Review comment:
       Do we need set operations, it's better to keep immutable

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
##########
@@ -651,18 +652,34 @@ public void splitNamespaceBundle(
             @PathParam("namespace") String namespace,
             @PathParam("bundle") String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @QueryParam("unload") @DefaultValue("false") boolean unload) {
+            @QueryParam("unload") @DefaultValue("false") boolean unload,
+            @QueryParam("splitBoundaries") @DefaultValue("") List<Long> splitBoundaries) {
         try {
             validateNamespaceName(property, cluster, namespace);
-            internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload,
-                    NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME);
+            internalSplitNamespaceBundle(asyncResponse, bundleRange,
+                    authoritative, unload, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME, splitBoundaries);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
         }
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/{bundle}/topicHashPositions")
+    @ApiOperation(value = "Get hash positions for topics")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist")})
+    public TopicHashPositions getTopicHashPositions(

Review comment:
       Same as the comments in `v2/namespaces`

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -570,18 +571,34 @@ public void splitNamespaceBundle(
             @PathParam("bundle") String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("unload") @DefaultValue("false") boolean unload,
-            @QueryParam("splitAlgorithmName") String splitAlgorithmName) {
-
+            @QueryParam("splitAlgorithmName") String splitAlgorithmName,
+            @ApiParam("splitBoundaries") List<Long> splitBoundaries) {
         try {
             validateNamespaceName(tenant, namespace);
-            internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload, splitAlgorithmName);
+            internalSplitNamespaceBundle(asyncResponse,
+                    bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries);

Review comment:
       We should avoid using the `specified_positions_divide` algorithm but with empty `splitBoundaries`

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -570,18 +571,34 @@ public void splitNamespaceBundle(
             @PathParam("bundle") String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("unload") @DefaultValue("false") boolean unload,
-            @QueryParam("splitAlgorithmName") String splitAlgorithmName) {
-
+            @QueryParam("splitAlgorithmName") String splitAlgorithmName,
+            @ApiParam("splitBoundaries") List<Long> splitBoundaries) {
         try {
             validateNamespaceName(tenant, namespace);
-            internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload, splitAlgorithmName);
+            internalSplitNamespaceBundle(asyncResponse,
+                    bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{bundle}/topicHashPositions")
+    @ApiOperation(value = "Get hash positions for topics")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist")})
+    public TopicHashPositions getTopicHashPositions(
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("bundle") String bundleRange,
+            @QueryParam("topicList") List<String> topicList) {

Review comment:
       The `topicList` should be in the request body? Too many topic names might reach the length limitation of the URL. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/common/naming/SpecifiedPositionsBundleSplitAlgorithm.java
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+
+/**
+ * This algorithm divides the bundle into several parts by the specified positions.
+ */
+public class SpecifiedPositionsBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm{
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOption) {
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        List<Long> positions = bundleSplitOption.getPositions();
+        if (positions == null || positions.size() == 0) {
+            return CompletableFuture.completedFuture(null);
+        }

Review comment:
       Should throw IllegalArgumentException

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
##########
@@ -892,7 +897,39 @@ void run() throws PulsarAdminException {
                 throw new ParameterException("--bundle and --bundle-type are mutually exclusive");
             }
             bundle = bundleType != null ? bundleType.toString() : bundle;
-            getAdmin().namespaces().splitNamespaceBundle(namespace, bundle, unload, splitAlgorithmName);
+            getAdmin().namespaces().splitNamespaceBundle(
+                    namespace, bundle, unload, splitAlgorithmName, splitBoundaries);
+        }
+    }
+
+    @Parameters(commandDescription = "Get the positions for one or more topic(s) in a namespace bundle")
+    private class GetTopicHashPositions extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(
+                names = { "--bundle", "-b" },
+                description = "{start-boundary}_{end-boundary} format namespace bundle",
+                required = false)
+        private String bundle;
+
+        @Parameter(
+                names = { "--topic-list",  "-tl" },
+                description = "The list of topics(both non-partitioned topic and partitioned topic) "
+                        + "to get positions in this bundle",
+                required = false)
+        private List<String> topicList;

Review comment:
       ```suggestion
           private List<String> topics;
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
##########
@@ -570,18 +571,34 @@ public void splitNamespaceBundle(
             @PathParam("bundle") String bundleRange,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("unload") @DefaultValue("false") boolean unload,
-            @QueryParam("splitAlgorithmName") String splitAlgorithmName) {
-
+            @QueryParam("splitAlgorithmName") String splitAlgorithmName,
+            @ApiParam("splitBoundaries") List<Long> splitBoundaries) {
         try {
             validateNamespaceName(tenant, namespace);
-            internalSplitNamespaceBundle(asyncResponse, bundleRange, authoritative, unload, splitAlgorithmName);
+            internalSplitNamespaceBundle(asyncResponse,
+                    bundleRange, authoritative, unload, splitAlgorithmName, splitBoundaries);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{bundle}/topicHashPositions")
+    @ApiOperation(value = "Get hash positions for topics")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist")})
+    public TopicHashPositions getTopicHashPositions(

Review comment:
       Better to use AsyncResponse and make the `internalGetTopicHashPositions` returns `CompletableFuture<Optional<TopicHashPositions>>`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org