You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/02/14 17:25:42 UTC

[pinot] branch master updated: simulate rsvps after meetup.com retired the feed (#8180)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a6cb4b4  simulate rsvps after meetup.com retired the feed (#8180)
a6cb4b4 is described below

commit a6cb4b4ee61b8a81b9d77c935f1bb60edef8a056
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Mon Feb 14 17:25:18 2022 +0000

    simulate rsvps after meetup.com retired the feed (#8180)
---
 .../pinot/tools/streams/MeetupRsvpJsonStream.java  |  25 ++--
 .../pinot/tools/streams/MeetupRsvpStream.java      | 142 +++++++++++----------
 .../java/org/apache/pinot/tools/streams/RSVP.java  |  46 +++++++
 3 files changed, 134 insertions(+), 79 deletions(-)

diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
index 586bfa7..3ceb373 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pinot.tools.streams;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import javax.websocket.MessageHandler;
-import org.apache.pinot.spi.utils.JsonUtils;
+import java.util.function.Consumer;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -38,20 +36,17 @@ public class MeetupRsvpJsonStream extends MeetupRsvpStream {
   }
 
   @Override
-  protected MessageHandler.Whole<String> getMessageHandler() {
+  protected Consumer<RSVP> createConsumer() {
     return message -> {
-      if (_keepPublishing) {
-        if (_partitionByKey) {
-          try {
-            JsonNode messageJson = JsonUtils.stringToJsonNode(message);
-            String rsvpId = messageJson.get("rsvp_id").asText();
-            _producer.produce(_topicName, rsvpId.getBytes(UTF_8), message.getBytes(UTF_8));
-          } catch (Exception e) {
-            LOGGER.error("Caught exception while processing the message: {}", message, e);
-          }
-        } else {
-          _producer.produce(_topicName, message.getBytes(UTF_8));
+      if (_partitionByKey) {
+        try {
+          _producer.produce(_topicName, message.getRsvpId().getBytes(UTF_8), message.getPayload().toString()
+              .getBytes(UTF_8));
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while processing the message: {}", message, e);
         }
+      } else {
+        _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8));
       }
     };
   }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index 5aa8abe..d10955a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -18,20 +18,20 @@
  */
 package org.apache.pinot.tools.streams;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.net.URI;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
 import java.util.Properties;
-import javax.websocket.ClientEndpointConfig;
-import javax.websocket.Endpoint;
-import javax.websocket.EndpointConfig;
-import javax.websocket.MessageHandler;
-import javax.websocket.Session;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Consumer;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.glassfish.tyrus.client.ClientManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,14 +40,18 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 public class MeetupRsvpStream {
   protected static final Logger LOGGER = LoggerFactory.getLogger(MeetupRsvpStream.class);
+  private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
+      .parseCaseInsensitive()
+      .append(DateTimeFormatter.ISO_LOCAL_DATE)
+      .appendLiteral(' ')
+      .append(DateTimeFormatter.ISO_LOCAL_TIME)
+      .toFormatter();
   private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents";
   protected String _topicName = DEFAULT_TOPIC_NAME;
 
   protected final boolean _partitionByKey;
   protected final StreamDataProducer _producer;
-
-  protected ClientManager _client;
-  protected volatile boolean _keepPublishing;
+  private final Source _source;
 
   public MeetupRsvpStream()
       throws Exception {
@@ -63,77 +67,87 @@ public class MeetupRsvpStream {
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
     _producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
-  }
-
-  public MeetupRsvpStream(boolean partitionByKey, StreamDataProducer producer, String topicName) {
-    _partitionByKey = partitionByKey;
-    _producer = producer;
-    _topicName = topicName;
+    _source = new Source(createConsumer());
   }
 
   public void run()
       throws Exception {
-    _client = ClientManager.createClient();
-    _keepPublishing = true;
-
-    _client.connectToServer(new Endpoint() {
-      @Override
-      public void onOpen(Session session, EndpointConfig config) {
-        session.addMessageHandler(String.class, getMessageHandler());
-      }
-    }, ClientEndpointConfig.Builder.create().build(), new URI("wss://stream.meetup.com/2/rsvps"));
+    _source.start();
   }
 
   public void stopPublishing() {
-    _keepPublishing = false;
-    _client.shutdown();
     _producer.close();
+    _source.close();
   }
 
-  protected MessageHandler.Whole<String> getMessageHandler() {
+  protected Consumer<RSVP> createConsumer() {
     return message -> {
       try {
-        JsonNode messageJson = JsonUtils.stringToJsonNode(message);
-        ObjectNode extractedJson = JsonUtils.newObjectNode();
-
-        JsonNode venue = messageJson.get("venue");
-        if (venue != null) {
-          extractedJson.set("venue_name", venue.get("venue_name"));
+        if (_partitionByKey) {
+          _producer.produce(_topicName, message.getEventId().getBytes(UTF_8),
+              message.getPayload().toString().getBytes(UTF_8));
+        } else {
+          _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8));
         }
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while processing the message: {}", message, e);
+      }
+    };
+  }
 
-        JsonNode event = messageJson.get("event");
-        String eventId = "";
-        if (event != null) {
-          extractedJson.set("event_name", event.get("event_name"));
-          eventId = event.get("event_id").asText();
-          extractedJson.put("event_id", eventId);
-          extractedJson.set("event_time", event.get("time"));
-        }
+  private static class Source implements AutoCloseable, Runnable {
 
-        JsonNode group = messageJson.get("group");
-        if (group != null) {
-          extractedJson.set("group_city", group.get("group_city"));
-          extractedJson.set("group_country", group.get("group_country"));
-          extractedJson.set("group_id", group.get("group_id"));
-          extractedJson.set("group_name", group.get("group_name"));
-          extractedJson.set("group_lat", group.get("group_lat"));
-          extractedJson.set("group_lon", group.get("group_lon"));
-        }
+    private final Consumer<RSVP> _consumer;
+
+    private final ExecutorService _executorService = Executors.newSingleThreadExecutor();
+    private volatile Future<?> _future;
 
-        extractedJson.set("mtime", messageJson.get("mtime"));
-        extractedJson.put("rsvp_count", 1);
+    private Source(Consumer<RSVP> consumer) {
+      _consumer = consumer;
+    }
 
-        if (_keepPublishing) {
-          if (_partitionByKey) {
-            _producer.produce(_topicName, eventId.getBytes(UTF_8),
-                extractedJson.toString().getBytes(UTF_8));
-          } else {
-            _producer.produce(_topicName, extractedJson.toString().getBytes(UTF_8));
-          }
+    @Override
+    public void close() {
+      if (_future != null) {
+        _future.cancel(true);
+      }
+      _executorService.shutdownNow();
+    }
+
+    public void start() {
+      _future = _executorService.submit(this);
+    }
+
+    @Override
+    public void run() {
+      while (!Thread.interrupted()) {
+        try {
+          RSVP rsvp = createMessage();
+          _consumer.accept(rsvp);
+          int delay = (int) (Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1;
+          Thread.sleep(delay);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
         }
-      } catch (Exception e) {
-        LOGGER.error("Caught exception while processing the message: {}", message, e);
       }
-    };
+    }
+
+    private RSVP createMessage() {
+      String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + "";
+      ObjectNode json = JsonUtils.newObjectNode();
+      json.put("venue_name", "venue_name" + ThreadLocalRandom.current().nextInt());
+      json.put("event_name", "event_name" + ThreadLocalRandom.current().nextInt());
+      json.put("event_id", eventId);
+      json.put("event_time", DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
+      json.put("group_city", "group_city" + ThreadLocalRandom.current().nextInt());
+      json.put("group_country", "group_country" + ThreadLocalRandom.current().nextInt());
+      json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong()));
+      json.put("group_name", "group_name" + ThreadLocalRandom.current().nextInt());
+      json.put("group_lat", ThreadLocalRandom.current().nextFloat());
+      json.put("group_lon", ThreadLocalRandom.current().nextFloat());
+      json.put("mtime", DATE_TIME_FORMATTER.format(LocalDateTime.now()));
+      json.put("rsvp_count", 1);
+      return new RSVP(eventId, eventId, json);
+    }
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RSVP.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RSVP.java
new file mode 100644
index 0000000..a192c78
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RSVP.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+
+class RSVP {
+  private final String _eventId;
+  private final String _rsvpId;
+  private final JsonNode _payload;
+
+  RSVP(String eventId, String rsvpId, JsonNode payload) {
+    _eventId = eventId;
+    _rsvpId = rsvpId;
+    _payload = payload;
+  }
+
+  public String getEventId() {
+    return _eventId;
+  }
+
+  public String getRsvpId() {
+    return _rsvpId;
+  }
+
+  public JsonNode getPayload() {
+    return _payload;
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org