You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/02/14 17:25:42 UTC
[pinot] branch master updated: simulate rsvps after meetup.com retired the feed (#8180)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a6cb4b4 simulate rsvps after meetup.com retired the feed (#8180)
a6cb4b4 is described below
commit a6cb4b4ee61b8a81b9d77c935f1bb60edef8a056
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Mon Feb 14 17:25:18 2022 +0000
simulate rsvps after meetup.com retired the feed (#8180)
---
.../pinot/tools/streams/MeetupRsvpJsonStream.java | 25 ++--
.../pinot/tools/streams/MeetupRsvpStream.java | 142 +++++++++++----------
.../java/org/apache/pinot/tools/streams/RSVP.java | 46 +++++++
3 files changed, 134 insertions(+), 79 deletions(-)
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
index 586bfa7..3ceb373 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
@@ -18,9 +18,7 @@
*/
package org.apache.pinot.tools.streams;
-import com.fasterxml.jackson.databind.JsonNode;
-import javax.websocket.MessageHandler;
-import org.apache.pinot.spi.utils.JsonUtils;
+import java.util.function.Consumer;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -38,20 +36,17 @@ public class MeetupRsvpJsonStream extends MeetupRsvpStream {
}
@Override
- protected MessageHandler.Whole<String> getMessageHandler() {
+ protected Consumer<RSVP> createConsumer() {
return message -> {
- if (_keepPublishing) {
- if (_partitionByKey) {
- try {
- JsonNode messageJson = JsonUtils.stringToJsonNode(message);
- String rsvpId = messageJson.get("rsvp_id").asText();
- _producer.produce(_topicName, rsvpId.getBytes(UTF_8), message.getBytes(UTF_8));
- } catch (Exception e) {
- LOGGER.error("Caught exception while processing the message: {}", message, e);
- }
- } else {
- _producer.produce(_topicName, message.getBytes(UTF_8));
+ if (_partitionByKey) {
+ try {
+ _producer.produce(_topicName, message.getRsvpId().getBytes(UTF_8), message.getPayload().toString()
+ .getBytes(UTF_8));
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing the message: {}", message, e);
}
+ } else {
+ _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8));
}
};
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index 5aa8abe..d10955a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -18,20 +18,20 @@
*/
package org.apache.pinot.tools.streams;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.net.URI;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
import java.util.Properties;
-import javax.websocket.ClientEndpointConfig;
-import javax.websocket.Endpoint;
-import javax.websocket.EndpointConfig;
-import javax.websocket.MessageHandler;
-import javax.websocket.Session;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.glassfish.tyrus.client.ClientManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,14 +40,18 @@ import static java.nio.charset.StandardCharsets.UTF_8;
public class MeetupRsvpStream {
protected static final Logger LOGGER = LoggerFactory.getLogger(MeetupRsvpStream.class);
+ private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
+ .parseCaseInsensitive()
+ .append(DateTimeFormatter.ISO_LOCAL_DATE)
+ .appendLiteral(' ')
+ .append(DateTimeFormatter.ISO_LOCAL_TIME)
+ .toFormatter();
private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents";
protected String _topicName = DEFAULT_TOPIC_NAME;
protected final boolean _partitionByKey;
protected final StreamDataProducer _producer;
-
- protected ClientManager _client;
- protected volatile boolean _keepPublishing;
+ private final Source _source;
public MeetupRsvpStream()
throws Exception {
@@ -63,77 +67,87 @@ public class MeetupRsvpStream {
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
_producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
- }
-
- public MeetupRsvpStream(boolean partitionByKey, StreamDataProducer producer, String topicName) {
- _partitionByKey = partitionByKey;
- _producer = producer;
- _topicName = topicName;
+ _source = new Source(createConsumer());
}
public void run()
throws Exception {
- _client = ClientManager.createClient();
- _keepPublishing = true;
-
- _client.connectToServer(new Endpoint() {
- @Override
- public void onOpen(Session session, EndpointConfig config) {
- session.addMessageHandler(String.class, getMessageHandler());
- }
- }, ClientEndpointConfig.Builder.create().build(), new URI("wss://stream.meetup.com/2/rsvps"));
+ _source.start();
}
public void stopPublishing() {
- _keepPublishing = false;
- _client.shutdown();
_producer.close();
+ _source.close();
}
- protected MessageHandler.Whole<String> getMessageHandler() {
+ protected Consumer<RSVP> createConsumer() {
return message -> {
try {
- JsonNode messageJson = JsonUtils.stringToJsonNode(message);
- ObjectNode extractedJson = JsonUtils.newObjectNode();
-
- JsonNode venue = messageJson.get("venue");
- if (venue != null) {
- extractedJson.set("venue_name", venue.get("venue_name"));
+ if (_partitionByKey) {
+ _producer.produce(_topicName, message.getEventId().getBytes(UTF_8),
+ message.getPayload().toString().getBytes(UTF_8));
+ } else {
+ _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8));
}
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing the message: {}", message, e);
+ }
+ };
+ }
- JsonNode event = messageJson.get("event");
- String eventId = "";
- if (event != null) {
- extractedJson.set("event_name", event.get("event_name"));
- eventId = event.get("event_id").asText();
- extractedJson.put("event_id", eventId);
- extractedJson.set("event_time", event.get("time"));
- }
+ private static class Source implements AutoCloseable, Runnable {
- JsonNode group = messageJson.get("group");
- if (group != null) {
- extractedJson.set("group_city", group.get("group_city"));
- extractedJson.set("group_country", group.get("group_country"));
- extractedJson.set("group_id", group.get("group_id"));
- extractedJson.set("group_name", group.get("group_name"));
- extractedJson.set("group_lat", group.get("group_lat"));
- extractedJson.set("group_lon", group.get("group_lon"));
- }
+ private final Consumer<RSVP> _consumer;
+
+ private final ExecutorService _executorService = Executors.newSingleThreadExecutor();
+ private volatile Future<?> _future;
- extractedJson.set("mtime", messageJson.get("mtime"));
- extractedJson.put("rsvp_count", 1);
+ private Source(Consumer<RSVP> consumer) {
+ _consumer = consumer;
+ }
- if (_keepPublishing) {
- if (_partitionByKey) {
- _producer.produce(_topicName, eventId.getBytes(UTF_8),
- extractedJson.toString().getBytes(UTF_8));
- } else {
- _producer.produce(_topicName, extractedJson.toString().getBytes(UTF_8));
- }
+ @Override
+ public void close() {
+ if (_future != null) {
+ _future.cancel(true);
+ }
+ _executorService.shutdownNow();
+ }
+
+ public void start() {
+ _future = _executorService.submit(this);
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ RSVP rsvp = createMessage();
+ _consumer.accept(rsvp);
+ int delay = (int) (Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1;
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
- } catch (Exception e) {
- LOGGER.error("Caught exception while processing the message: {}", message, e);
}
- };
+ }
+
+ private RSVP createMessage() {
+ String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + "";
+ ObjectNode json = JsonUtils.newObjectNode();
+ json.put("venue_name", "venue_name" + ThreadLocalRandom.current().nextInt());
+ json.put("event_name", "event_name" + ThreadLocalRandom.current().nextInt());
+ json.put("event_id", eventId);
+ json.put("event_time", DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
+ json.put("group_city", "group_city" + ThreadLocalRandom.current().nextInt());
+ json.put("group_country", "group_country" + ThreadLocalRandom.current().nextInt());
+ json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong()));
+ json.put("group_name", "group_name" + ThreadLocalRandom.current().nextInt());
+ json.put("group_lat", ThreadLocalRandom.current().nextFloat());
+ json.put("group_lon", ThreadLocalRandom.current().nextFloat());
+ json.put("mtime", DATE_TIME_FORMATTER.format(LocalDateTime.now()));
+ json.put("rsvp_count", 1);
+ return new RSVP(eventId, eventId, json);
+ }
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RSVP.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RSVP.java
new file mode 100644
index 0000000..a192c78
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RSVP.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+
+class RSVP {
+ private final String _eventId;
+ private final String _rsvpId;
+ private final JsonNode _payload;
+
+ RSVP(String eventId, String rsvpId, JsonNode payload) {
+ _eventId = eventId;
+ _rsvpId = rsvpId;
+ _payload = payload;
+ }
+
+ public String getEventId() {
+ return _eventId;
+ }
+
+ public String getRsvpId() {
+ return _rsvpId;
+ }
+
+ public JsonNode getPayload() {
+ return _payload;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org