You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/29 08:31:00 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #11139: [Issue 10896] add get message id by timestamp

codelipenghui commented on a change in pull request #11139:
URL: https://github.com/apache/pulsar/pull/11139#discussion_r660397751



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
##########
@@ -796,4 +798,55 @@ public void testSetReplicatedSubscriptionStatus() {
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
     }
 
+    @Test
+    public void testGetMessageIdByTimestamp() throws Exception {
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("tenant-xyz", tenantInfo);
+        admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test"));
+        final String topicName = "persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        AtomicLong publishTime = new AtomicLong(0);
+        ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                .intercept(new ProducerInterceptor() {
+                    @Override
+                    public void close() {
+
+                    }
+
+                    @Override
+                    public boolean eligible(Message message) {
+                        return true;
+                    }
+
+                    @Override
+                    public Message beforeSend(Producer producer, Message message) {
+                        return message;
+                    }
+
+                    @Override
+                    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
+                                                      Throwable exception) {
+                        publishTime.set(message.getPublishTime());
+                    }
+                })
+                .create();
+
+        MessageId id1 = producer.send("test1".getBytes());
+        long publish1 = publishTime.get();
+
+        Thread.sleep(10);

Review comment:
       Can be instead by Awaitibility? Use sleep here will introduce a flaky test.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1515,6 +1515,39 @@ public void getMessageById(
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/messageid/{timestamp}")
+    @ApiOperation(value = "Get message id at or after this absolute timestamp (in ms).")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+                    + "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic is not non-partitioned and persistent"),
+            @ApiResponse(code = 412, message = "Topic name is not valid"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
+    public void getMessageIdByTimestamp(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Specify the timestamp", required = true)
+            @PathParam("timestamp") long timestamp,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGetMessageIdByTimestamp(asyncResponse, timestamp, authoritative);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));

Review comment:
       the `internalGetMessageIdByTimestamp` will throw RestException, so this will be a problem when resume the asyncResponse by newRestException(RestException)

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2328,6 +2330,61 @@ public void readEntryComplete(Entry entry, Object ctx) {
         }
     }
 
+    protected void internalGetMessageIdByTimestamp(AsyncResponse asyncResponse, long timestamp, boolean authoritative) {

Review comment:
       It's better to return CompletableFuture<MessageIdImpl> directly, it will make the code easier to read. you can see `PersistentTopics.setMaxSubscriptionsPerTopic`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org