You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/30 09:21:16 UTC
[pulsar] branch master updated: WS support for eventtime (#4154)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3fd9168 WS support for eventtime (#4154)
3fd9168 is described below
commit 3fd9168049103165729b4735e123801dd6b8895a
Author: Marc Enriquez <ma...@nwd.mx>
AuthorDate: Tue Apr 30 11:21:11 2019 +0200
WS support for eventtime (#4154)
### Motivation
Websockets producer API lacks the message property `eventTime`, even when the client message supports it. This PR introduces support for WS producers to be able to optionally provide such property.
### Modifications
The change impacts 2 packages:
1. `pulsar-common`: within the utilities that this package provides, there's the `DateFormatter` which is used by the WS consumer API to format both the `eventTime` an the `publishTime`. The method `parse(String datetime)` is added to do the reverse process of `format(long timestamp)`. The method was added to this class as it's very close to it's functionality and depends on a private property that already contains proper timezone management.
2. `pulsar-websocket`: the `eventTime` property is added to `ProducerMessage` class for adequate mapping as a string. If the property is not null when processing the message (`ProducerHandler.onWebSocketText()`) and the parsing fails, a `FailedToDeserializeFromJSON` error is returned (I didn't want to introduce a new type of error); if parsing is successful, the provided `eventTime` is converted to milliseconds and passed to the builder.
---
.../java/org/apache/pulsar/common/util/DateFormatter.java | 12 ++++++++++++
.../java/org/apache/pulsar/websocket/ProducerHandler.java | 13 +++++++++++++
.../org/apache/pulsar/websocket/data/ProducerMessage.java | 1 +
3 files changed, 26 insertions(+)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java
index 40eac9f..d4c6e1a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/DateFormatter.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.common.util;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
/**
* Date-time String formatter utility class
@@ -51,6 +52,17 @@ public class DateFormatter {
return DATE_FORMAT.format(instant);
}
+ /**
+ * @param datetime
+ * @return the parsed timestamp (in milliseconds) of the provided datetime
+ * @throws DateTimeParseException
+ */
+ public static long parse(String datetime) throws DateTimeParseException {
+ Instant instant = Instant.from(DATE_FORMAT.parse(datetime));
+
+ return instant.toEpochMilli();
+ }
+
private DateFormatter() {
}
}
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 338c0f1..b5fdd8a 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -28,6 +28,10 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Enums;
import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
import java.util.Base64;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -48,6 +52,7 @@ import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaEx
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerAck;
import org.apache.pulsar.websocket.data.ProducerMessage;
@@ -189,6 +194,14 @@ public class ProducerHandler extends AbstractWebSocketHandler {
if (sendRequest.replicationClusters != null) {
builder.replicationClusters(sendRequest.replicationClusters);
}
+ if (sendRequest.eventTime != null) {
+ try {
+ builder.eventTime(DateFormatter.parse(sendRequest.eventTime));
+ } catch (DateTimeParseException e) {
+ sendAckResponse(new ProducerAck(PayloadEncodingError, e.getMessage(), null, requestContext));
+ return;
+ }
+ }
final long now = System.nanoTime();
builder.sendAsync().thenAccept(msgId -> {
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
index 7419a68..f3078da 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ProducerMessage.java
@@ -27,4 +27,5 @@ public class ProducerMessage {
public String context;
public String key;
public List<String> replicationClusters;
+ public String eventTime;
}