You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/30 09:21:16 UTC

[pulsar] branch master updated: WS support for eventtime (#4154)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fd9168  WS support for eventtime (#4154)
3fd9168 is described below

commit 3fd9168049103165729b4735e123801dd6b8895a
Author: Marc Enriquez <ma...@nwd.mx>
AuthorDate: Tue Apr 30 11:21:11 2019 +0200

    WS support for eventtime (#4154)
    
    ### Motivation
    Websockets producer API lacks the message property `eventTime`, even when the client message supports it. This PR introduces support for WS producers to be able to optionally provide such property.
    
    ### Modifications
    The change impacts 2 packages:
    1. `pulsar-common`: within the utilities that this package provides, there's the `DateFormatter` which is used by the WS consumer API to format both the `eventTime` an the `publishTime`. The method `parse(String datetime)` is added to do the reverse process of `format(long timestamp)`. The method was added to this class as it's very close to it's functionality and depends on a private property that already contains proper timezone management.
    2. `pulsar-websocket`: the `eventTime` property is added to `ProducerMessage` class for adequate mapping as a string. If the property is not null when processing the message (`ProducerHandler.onWebSocketText()`) and the parsing fails, a `FailedToDeserializeFromJSON` error is returned (I didn't want to introduce a new type of error); if parsing is successful, the provided `eventTime` is converted to milliseconds and passed to the builder.
---
 .../java/org/apache/pulsar/common/util/DateFormatter.java   | 12 ++++++++++++
 .../java/org/apache/pulsar/websocket/ProducerHandler.java   | 13 +++++++++++++
 .../org/apache/pulsar/websocket/data/ProducerMessage.java   |  1 +
 3 files changed, 26 insertions(+)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java
index 40eac9f..d4c6e1a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.common.util;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
 
 /**
  * Date-time String formatter utility class
@@ -51,6 +52,17 @@ public class DateFormatter {
         return DATE_FORMAT.format(instant);
     }
 
+    /**
+     * @param datetime
+     * @return the parsed timestamp (in milliseconds) of the provided datetime
+     * @throws DateTimeParseException
+     */
+    public static long parse(String datetime) throws DateTimeParseException {
+        Instant instant = Instant.from(DATE_FORMAT.parse(datetime));
+
+        return instant.toEpochMilli();
+    }
+
     private DateFormatter() {
     }
 }
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 338c0f1..b5fdd8a 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -28,6 +28,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Enums;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
 import java.util.Base64;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -48,6 +52,7 @@ import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaEx
 import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerAck;
 import org.apache.pulsar.websocket.data.ProducerMessage;
@@ -189,6 +194,14 @@ public class ProducerHandler extends AbstractWebSocketHandler {
         if (sendRequest.replicationClusters != null) {
             builder.replicationClusters(sendRequest.replicationClusters);
         }
+        if (sendRequest.eventTime != null) {
+            try {
+                builder.eventTime(DateFormatter.parse(sendRequest.eventTime));
+            } catch (DateTimeParseException e) {
+                sendAckResponse(new ProducerAck(PayloadEncodingError, e.getMessage(), null, requestContext));
+                return;
+            }
+        }
 
         final long now = System.nanoTime();
         builder.sendAsync().thenAccept(msgId -> {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
index 7419a68..f3078da 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
@@ -27,4 +27,5 @@ public class ProducerMessage {
     public String context;
     public String key;
     public List<String> replicationClusters;
+    public String eventTime;
 }