You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ri...@apache.org on 2022/04/26 08:12:13 UTC

[pinot] branch master updated: Refactor quickstart data source (#8567)

This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d36f3dfbf Refactor quickstart data source (#8567)
4d36f3dfbf is described below

commit 4d36f3dfbfc68e5b4f5cf24d7313d20a31e74155
Author: Xiaoman Dong <xi...@startree.ai>
AuthorDate: Tue Apr 26 01:12:07 2022 -0700

    Refactor quickstart data source (#8567)
    
    * save temp work
    
    * refactor done
    
    * fix
    
    * add comments
    
    * fix topic issue
    
    * fix executor
    
    * more documentation
    
    * more refactor
    
    * remove accidental file
    
    * remove extra member var
    
    * save temp work
    
    * remove rsvp json stream
    
    * fix
    
    * fix
    
    * fix
    
    * fix 4
    
    * set topic
    
    * fix my own test
    
    * generator
    
    * address comments
---
 .../pinot/spi/stream/StreamDataProducer.java       |  72 +++++++
 .../pinot/spi/stream/StreamDataProducerTest.java   |  55 +++++
 .../RealtimeComplexTypeHandlingQuickStart.java     |   4 +-
 .../pinot/tools/RealtimeJsonIndexQuickStart.java   |   4 +-
 .../apache/pinot/tools/UpsertJsonQuickStart.java   |   5 +-
 .../pinot/tools/streams/AirlineDataStream.java     | 112 ++--------
 .../tools/streams/AvroFileSourceGenerator.java     | 141 +++++++++++++
 .../pinot/tools/streams/MeetupRsvpJsonStream.java  |  53 -----
 .../pinot/tools/streams/MeetupRsvpStream.java      | 121 +++--------
 .../pinot/tools/streams/PinotRealtimeSource.java   | 190 +++++++++++++++++
 .../tools/streams/PinotSourceDataGenerator.java    |  44 ++++
 .../tools/streams/PinotStreamRateLimiter.java      |  21 +-
 .../pinot/tools/streams/RsvpSourceGenerator.java   |  98 +++++++++
 .../GithubPullRequestSourceGenerator.java          | 220 ++++++++++++++++++++
 .../PullRequestMergedEventsStream.java             | 227 ++-------------------
 .../tools/streams/PinotRealtimeSourceTest.java     |  65 ++++++
 16 files changed, 956 insertions(+), 476 deletions(-)

diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
index 6b3c14010d..d181c44225 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pinot.spi.stream;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
+import javax.annotation.Nullable;
 
 
 /**
@@ -32,4 +35,73 @@ public interface StreamDataProducer {
   void produce(String topic, byte[] key, byte[] payload);
 
   void close();
+
+  /**
+   * Allows the producer to optimize for a batched write.
+   * This will help increase throughput in some cases
+   * @param topic the topic of the output
+   * @param rows the rows
+   */
+  default void produceBatch(String topic, List<byte[]> rows) {
+    for (byte[] row: rows) {
+      produce(topic, row);
+    }
+  }
+
+  /**
+   * Allows the producer to optimize for a batched write.
+   * This will help increase throughput in some cases
+   * @param topic the topic of the output
+   * @param payloadWithKey the payload rows with key
+   */
+  default void produceKeyedBatch(String topic, List<RowWithKey> payloadWithKey) {
+    for (RowWithKey rowWithKey: payloadWithKey) {
+      if (rowWithKey.getKey() == null) {
+        produce(topic, rowWithKey.getPayload());
+      } else {
+        produce(topic, rowWithKey.getKey(), rowWithKey.getPayload());
+      }
+    }
+  }
+
+  /**
+   * Helper class so the key and payload can be easily tied together instead of using a pair
+   * The class is intended for StreamDataProducer only
+   */
+  class RowWithKey {
+    private final byte[] _key;
+    private final byte[] _payload;
+
+    public RowWithKey(@Nullable byte[] key, byte[] payload) {
+      _key = key;
+      _payload = payload;
+    }
+
+    public byte[] getKey() {
+      return _key;
+    }
+
+    public byte[] getPayload() {
+      return _payload;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      RowWithKey that = (RowWithKey) o;
+      return Arrays.equals(_key, that._key) && Arrays.equals(_payload, that._payload);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Arrays.hashCode(_key);
+      result = 31 * result + Arrays.hashCode(_payload);
+      return result;
+    }
+  }
 }
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataProducerTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataProducerTest.java
new file mode 100644
index 0000000000..cd038bd801
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataProducerTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.spi.stream;
+
+import java.nio.charset.StandardCharsets;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class StreamDataProducerTest {
+
+  @Test
+  public void testRowWithKeyEquals() {
+    byte[] b1 = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20};
+    byte[] b2 = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20};
+    byte[] b3 = new byte[]{1, 2, 3, 4, 5, 6, 7, 8};
+    byte[] k1 = "somekey".getBytes(StandardCharsets.UTF_8);
+    byte[] k3 = "anotherkey".getBytes(StandardCharsets.UTF_8);
+    StreamDataProducer.RowWithKey nullKey1 = new StreamDataProducer.RowWithKey(null, b1);
+    StreamDataProducer.RowWithKey nullKey2 = new StreamDataProducer.RowWithKey(null, b2);
+    StreamDataProducer.RowWithKey nullKey3 = new StreamDataProducer.RowWithKey(null, b3);
+    Assert.assertEquals(nullKey2, nullKey1);
+    Assert.assertEquals(nullKey1.hashCode(), nullKey2.hashCode());
+    Assert.assertNotEquals(nullKey3, nullKey1);
+    Assert.assertNotEquals(nullKey3.hashCode(), nullKey1.hashCode());
+
+    Assert.assertEquals(nullKey1, nullKey1);
+
+    StreamDataProducer.RowWithKey b2WithKey = new StreamDataProducer.RowWithKey(k1, b2);
+    Assert.assertNotEquals(nullKey2, b2WithKey);;
+    StreamDataProducer.RowWithKey b1WithKey = new StreamDataProducer.RowWithKey(k1, b1);
+    Assert.assertEquals(b1WithKey, b2WithKey);
+    Assert.assertEquals(b1WithKey.hashCode(), b2WithKey.hashCode());
+
+    StreamDataProducer.RowWithKey b2WithDifferentKey = new StreamDataProducer.RowWithKey(k3, b2);
+    Assert.assertNotEquals(b2WithKey, b2WithDifferentKey);
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
index c07004653a..88106d3612 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
@@ -32,7 +32,7 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpJsonStream;
+import org.apache.pinot.tools.streams.MeetupRsvpStream;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
 import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
@@ -88,7 +88,7 @@ public class RealtimeComplexTypeHandlingQuickStart extends QuickStartBase {
     _kafkaStarter.start();
     _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2));
     printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****");
-    MeetupRsvpJsonStream meetupRSVPProvider = new MeetupRsvpJsonStream();
+    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
     meetupRSVPProvider.run();
     printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****");
     runner.startAll();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
index 3875543ef0..94aadea9e8 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
@@ -32,7 +32,7 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpJsonStream;
+import org.apache.pinot.tools.streams.MeetupRsvpStream;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
 import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
@@ -87,7 +87,7 @@ public class RealtimeJsonIndexQuickStart extends QuickStartBase {
     _kafkaStarter.start();
     _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2));
     printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****");
-    MeetupRsvpJsonStream meetupRSVPProvider = new MeetupRsvpJsonStream();
+    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
     meetupRSVPProvider.run();
     printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****");
     runner.startAll();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
index 7a86ee87cc..b257d491a4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
@@ -32,7 +32,8 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.PinotAdministrator;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpJsonStream;
+import org.apache.pinot.tools.streams.MeetupRsvpStream;
+import org.apache.pinot.tools.streams.RsvpSourceGenerator;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 
 import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
@@ -87,7 +88,7 @@ public class UpsertJsonQuickStart extends QuickStartBase {
     _kafkaStarter.start();
     _kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2));
     printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****");
-    MeetupRsvpJsonStream meetupRSVPProvider = new MeetupRsvpJsonStream(true);
+    MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(RsvpSourceGenerator.KeyColumn.RSVP_ID);
     meetupRSVPProvider.run();
     printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****");
     runner.startAll();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index d0824e0054..2dee145ac0 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -19,43 +19,28 @@
 package org.apache.pinot.tools.streams;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.tools.QuickStartBase;
 import org.apache.pinot.tools.Quickstart;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
  * This is used in Hybrid Quickstart.
  */
 public class AirlineDataStream {
-  private static final Logger logger = LoggerFactory.getLogger(AirlineDataStream.class);
-
+  private static final String KAFKA_TOPIC_NAME = "flights-realtime";
   Schema _pinotSchema;
   String _timeColumnName;
   File _avroFile;
-  DataFileStream<GenericRecord> _avroDataStream;
-  Integer _currentTimeValue = 16102;
-  boolean _keepIndexing = true;
-  ExecutorService _service;
-  int _counter = 0;
+  final Integer _startTime = 16102;
   private StreamDataProducer _producer;
+  private PinotRealtimeSource _pinotStream;
 
   public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File avroFile)
       throws Exception {
@@ -67,9 +52,12 @@ public class AirlineDataStream {
     _pinotSchema = pinotSchema;
     _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
     _avroFile = avroFile;
-    createStream();
     _producer = producer;
-    _service = Executors.newFixedThreadPool(1);
+    AvroFileSourceGenerator generator = new AvroFileSourceGenerator(pinotSchema, avroFile, 1, _timeColumnName,
+        (rowNumber) -> (_startTime + rowNumber / 60));
+    _pinotStream =
+        PinotRealtimeSource.builder().setProducer(_producer).setGenerator(generator).setTopic(KAFKA_TOPIC_NAME)
+            .setMaxMessagePerSecond(1).build();
     QuickStartBase.printStatus(Quickstart.Color.YELLOW,
         "***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time "
             + "every 60 events (which is approximately 60 seconds) *****");
@@ -85,84 +73,14 @@ public class AirlineDataStream {
     return StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
   }
 
-  public void shutdown() {
-    _keepIndexing = false;
-    _avroDataStream = null;
-    _producer.close();
-    _producer = null;
-    _service.shutdown();
-  }
-
-  private void createStream()
-      throws IOException {
-    if (_keepIndexing) {
-      _avroDataStream = new DataFileStream<>(new FileInputStream(_avroFile), new GenericDatumReader<>());
-      return;
-    }
-    _avroDataStream = null;
-  }
-
-  private void publish(GenericRecord message)
-      throws IOException {
-    if (!_keepIndexing) {
-      _avroDataStream.close();
-      _avroDataStream = null;
-      return;
-    }
-    _producer.produce("flights-realtime", message.toString().getBytes("UTF-8"));
-  }
-
   public void run() {
+    _pinotStream.run();
+  }
 
-    _service.submit(new Runnable() {
-
-      @Override
-      public void run() {
-        while (true) {
-          while (_avroDataStream.hasNext()) {
-            if (!_keepIndexing) {
-              return;
-            }
-
-            GenericRecord record = _avroDataStream.next();
-
-            GenericRecord message = new GenericData.Record(AvroUtils.getAvroSchemaFromPinotSchema(_pinotSchema));
-
-            for (FieldSpec spec : _pinotSchema.getDimensionFieldSpecs()) {
-              message.put(spec.getName(), record.get(spec.getName()));
-            }
-
-            for (FieldSpec spec : _pinotSchema.getMetricFieldSpecs()) {
-              message.put(spec.getName(), record.get(spec.getName()));
-            }
-
-            message.put(_timeColumnName, _currentTimeValue);
-
-            try {
-              publish(message);
-              _counter++;
-              if (_counter % 60 == 0) {
-                _currentTimeValue = _currentTimeValue + 1;
-              }
-              Thread.sleep(1000);
-            } catch (Exception e) {
-              logger.error(e.getMessage());
-            }
-          }
-
-          try {
-            _avroDataStream.close();
-          } catch (IOException e) {
-            logger.error(e.getMessage());
-          }
-
-          try {
-            createStream();
-          } catch (IOException e) {
-            logger.error(e.getMessage());
-          }
-        }
-      }
-    });
+  public void shutdown()
+      throws Exception {
+    _pinotStream.close();
+    _producer.close();
+    _producer = null;
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AvroFileSourceGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AvroFileSourceGenerator.java
new file mode 100644
index 0000000000..bfe6e34a68
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AvroFileSourceGenerator.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.parquet.Strings;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Generates Pinot Real Time Source by an AvroFile.
+ * It will keep looping the same file and produce data output. We can pass in a lambda function to compute
+ * time index based on row number.
+ */
+public class AvroFileSourceGenerator implements PinotSourceDataGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSource.class);
+  private DataFileStream<GenericRecord> _avroDataStream;
+  private final Schema _pinotSchema;
+  private long _rowsProduced;
+  // If this var is null, we will not set time index column
+  private final String _timeColumnName;
+  private final Function<Long, Long> _rowNumberToTimeIndex;
+  private final File _avroFile;
+  private final int _rowsPerBatch;
+
+  /**
+   * Reads the avro file, produce the rows, and then keep looping without setting time index
+   * @param pinotSchema the Pinot Schema so the avro rows can be produced
+   * @param avroFile the avro file as source.
+   */
+  public AvroFileSourceGenerator(Schema pinotSchema, File avroFile) {
+    this(pinotSchema, avroFile, 1, null, null);
+  }
+
+  /**
+   * Reads the avro file, produce the rows, and keep looping, allows customization of time index by a lambda function
+   * @param pinotSchema the Pinot Schema so the avro rows can be produced
+   * @param avroFile the avro file as source.
+   * @param rowsPerBatch in one batch, return several rows at the same time
+   * @param timeColumnName the time column name for customizing/overriding time index. Null for skipping customization.
+   * @param rowNumberToTimeIndex the lambda to compute time index based on row number. Null for skipping customization.
+   */
+  public AvroFileSourceGenerator(Schema pinotSchema, File avroFile, int rowsPerBatch,
+      @Nullable String timeColumnName, @Nullable Function<Long, Long> rowNumberToTimeIndex) {
+    _pinotSchema = pinotSchema;
+    _rowsProduced = 0;
+    _rowNumberToTimeIndex = rowNumberToTimeIndex;
+    _timeColumnName = timeColumnName;
+    if (!Strings.isNullOrEmpty(_timeColumnName)) {
+      DateTimeFieldSpec timeColumnSpec = pinotSchema.getSpecForTimeColumn(timeColumnName);
+      Preconditions.checkNotNull(timeColumnSpec,
+          "Time column " + timeColumnName + " is not found in schema, or is not a valid DateTime column");
+    }
+    _avroFile = avroFile;
+    _rowsPerBatch = rowsPerBatch;
+  }
+
+  @Override
+  public void init(Properties properties) {
+  }
+
+  @Override
+  public List<StreamDataProducer.RowWithKey> generateRows() {
+    List<StreamDataProducer.RowWithKey> retVal = new ArrayList<>();
+    ensureStream();
+    int rowsInCurrentBatch = 0;
+    while (_avroDataStream.hasNext() && rowsInCurrentBatch < _rowsPerBatch) {
+      GenericRecord record = _avroDataStream.next();
+      GenericRecord message = new GenericData.Record(AvroUtils.getAvroSchemaFromPinotSchema(_pinotSchema));
+      for (FieldSpec spec : _pinotSchema.getDimensionFieldSpecs()) {
+        message.put(spec.getName(), record.get(spec.getName()));
+      }
+
+      for (FieldSpec spec : _pinotSchema.getMetricFieldSpecs()) {
+        message.put(spec.getName(), record.get(spec.getName()));
+      }
+      message.put(_timeColumnName, _rowNumberToTimeIndex.apply(_rowsProduced));
+      retVal.add(new StreamDataProducer.RowWithKey(null, message.toString().getBytes(StandardCharsets.UTF_8)));
+      _rowsProduced += 1;
+      rowsInCurrentBatch += 1;
+    }
+    return retVal;
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    _avroDataStream.close();
+  }
+
+  // Re-opens file stream if the file has reached its end.
+  private void ensureStream() {
+    try {
+      if (_avroDataStream != null && !_avroDataStream.hasNext()) {
+        _avroDataStream.close();
+        _avroDataStream = null;
+      }
+      if (_avroDataStream == null) {
+        _avroDataStream = new DataFileStream<>(new FileInputStream(_avroFile.getPath()), new GenericDatumReader<>());
+      }
+    } catch (IOException ex) {
+      LOGGER.error("Failed to open/close {}", _avroFile.getPath(), ex);
+      throw new RuntimeException("Failed to open/close " + _avroFile.getPath(), ex);
+    }
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
deleted file mode 100644
index 3ceb3734bc..0000000000
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.tools.streams;
-
-import java.util.function.Consumer;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-
-public class MeetupRsvpJsonStream extends MeetupRsvpStream {
-
-  public MeetupRsvpJsonStream()
-      throws Exception {
-    super();
-  }
-
-  public MeetupRsvpJsonStream(boolean partitionByKey)
-      throws Exception {
-    super(partitionByKey);
-  }
-
-  @Override
-  protected Consumer<RSVP> createConsumer() {
-    return message -> {
-      if (_partitionByKey) {
-        try {
-          _producer.produce(_topicName, message.getRsvpId().getBytes(UTF_8), message.getPayload().toString()
-              .getBytes(UTF_8));
-        } catch (Exception e) {
-          LOGGER.error("Caught exception while processing the message: {}", message, e);
-        }
-      } else {
-        _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8));
-      }
-    };
-  }
-}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index d10955a22e..f42ffd7fe9 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -18,40 +18,21 @@
  */
 package org.apache.pinot.tools.streams;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Consumer;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 
 public class MeetupRsvpStream {
   protected static final Logger LOGGER = LoggerFactory.getLogger(MeetupRsvpStream.class);
-  private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
-      .parseCaseInsensitive()
-      .append(DateTimeFormatter.ISO_LOCAL_DATE)
-      .appendLiteral(' ')
-      .append(DateTimeFormatter.ISO_LOCAL_TIME)
-      .toFormatter();
   private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents";
   protected String _topicName = DEFAULT_TOPIC_NAME;
 
-  protected final boolean _partitionByKey;
-  protected final StreamDataProducer _producer;
-  private final Source _source;
+  protected PinotRealtimeSource _pinotRealtimeSource;
 
   public MeetupRsvpStream()
       throws Exception {
@@ -60,94 +41,42 @@ public class MeetupRsvpStream {
 
   public MeetupRsvpStream(boolean partitionByKey)
       throws Exception {
-    _partitionByKey = partitionByKey;
+    // calling this constructor means that we wish to use EVENT_ID as key. RsvpId is used by MeetupRsvpJsonStream
+    this(partitionByKey ? RsvpSourceGenerator.KeyColumn.EVENT_ID : RsvpSourceGenerator.KeyColumn.NONE);
+  }
 
+  public MeetupRsvpStream(RsvpSourceGenerator.KeyColumn keyColumn)
+      throws Exception {
     Properties properties = new Properties();
     properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
-    _producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
-    _source = new Source(createConsumer());
+    StreamDataProducer producer =
+        StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
+    _pinotRealtimeSource =
+        PinotRealtimeSource.builder().setGenerator(new RsvpSourceGenerator(keyColumn)).setProducer(producer)
+            .setRateLimiter(permits -> {
+              int delay = (int) (Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1;
+              try {
+                Thread.sleep(delay);
+              } catch (InterruptedException ex) {
+                LOGGER.warn("Interrupted from sleep but will continue", ex);
+              }
+            })
+            .setTopic(_topicName)
+            .build();
   }
 
   public void run()
       throws Exception {
-    _source.start();
+    _pinotRealtimeSource.run();
   }
 
   public void stopPublishing() {
-    _producer.close();
-    _source.close();
-  }
-
-  protected Consumer<RSVP> createConsumer() {
-    return message -> {
-      try {
-        if (_partitionByKey) {
-          _producer.produce(_topicName, message.getEventId().getBytes(UTF_8),
-              message.getPayload().toString().getBytes(UTF_8));
-        } else {
-          _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8));
-        }
-      } catch (Exception e) {
-        LOGGER.error("Caught exception while processing the message: {}", message, e);
-      }
-    };
-  }
-
-  private static class Source implements AutoCloseable, Runnable {
-
-    private final Consumer<RSVP> _consumer;
-
-    private final ExecutorService _executorService = Executors.newSingleThreadExecutor();
-    private volatile Future<?> _future;
-
-    private Source(Consumer<RSVP> consumer) {
-      _consumer = consumer;
-    }
-
-    @Override
-    public void close() {
-      if (_future != null) {
-        _future.cancel(true);
-      }
-      _executorService.shutdownNow();
-    }
-
-    public void start() {
-      _future = _executorService.submit(this);
-    }
-
-    @Override
-    public void run() {
-      while (!Thread.interrupted()) {
-        try {
-          RSVP rsvp = createMessage();
-          _consumer.accept(rsvp);
-          int delay = (int) (Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1;
-          Thread.sleep(delay);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-      }
-    }
-
-    private RSVP createMessage() {
-      String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + "";
-      ObjectNode json = JsonUtils.newObjectNode();
-      json.put("venue_name", "venue_name" + ThreadLocalRandom.current().nextInt());
-      json.put("event_name", "event_name" + ThreadLocalRandom.current().nextInt());
-      json.put("event_id", eventId);
-      json.put("event_time", DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
-      json.put("group_city", "group_city" + ThreadLocalRandom.current().nextInt());
-      json.put("group_country", "group_country" + ThreadLocalRandom.current().nextInt());
-      json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong()));
-      json.put("group_name", "group_name" + ThreadLocalRandom.current().nextInt());
-      json.put("group_lat", ThreadLocalRandom.current().nextFloat());
-      json.put("group_lon", ThreadLocalRandom.current().nextFloat());
-      json.put("mtime", DATE_TIME_FORMATTER.format(LocalDateTime.now()));
-      json.put("rsvp_count", 1);
-      return new RSVP(eventId, eventId, json);
+    try {
+      _pinotRealtimeSource.close();
+    } catch (Exception ex) {
+      LOGGER.error("Failed to close real time source. ignored and continue", ex);
     }
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java
new file mode 100644
index 0000000000..87c527f9e1
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Represents one Pinot Real Time Source that is capable of
+ * 1. Keep running forever
+ * 2. Pull from generator and write into StreamDataProducer
+ * The Source has a thread that is looping forever.
+ */
+public class PinotRealtimeSource implements AutoCloseable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSource.class);
+  public static final String KEY_OF_MAX_MESSAGE_PER_SECOND = "pinot.stream.max.message.per.second";
+  public static final String KEY_OF_TOPIC_NAME = "pinot.topic.name";
+  public static final long DEFAULT_MAX_MESSAGE_PER_SECOND = Long.MAX_VALUE;
+  public static final long DEFAULT_EMPTY_SOURCE_SLEEP_MS = 10;
+  final StreamDataProducer _producer;
+  final PinotSourceDataGenerator _generator;
+  final String _topicName;
+  final ExecutorService _executor;
+  final Properties _properties;
+  PinotStreamRateLimiter _rateLimiter;
+  protected volatile boolean _shutdown;
+
+  /**
+   * Constructs a source by passing in a Properties file, a generator, and a producer
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceDataGenerator generator, StreamDataProducer producer) {
+    this(settings, generator, producer, null, null);
+  }
+
+  /**
+   * Constructs a source by passing in properties file, a generator, a producer and an executor service
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   * @param executor the preferred executor instead of creating a thread pool. Null for default one
+   * @param rateLimiter the specialized rate limiter for customization. Null for default guava one
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceDataGenerator generator, StreamDataProducer producer,
+      @Nullable ExecutorService executor, @Nullable PinotStreamRateLimiter rateLimiter) {
+    _properties = settings;
+    _producer = producer;
+    Preconditions.checkNotNull(_producer, "Producer of a stream cannot be null");
+    _generator = generator;
+    Preconditions.checkNotNull(_generator, "Generator of a stream cannot be null");
+    _executor = executor == null ? Executors.newSingleThreadExecutor() : executor;
+    _topicName = settings.getProperty(KEY_OF_TOPIC_NAME);
+    Preconditions.checkNotNull(_topicName, "Topic name needs to be set via " + KEY_OF_TOPIC_NAME);
+    _rateLimiter = rateLimiter == null ? new GuavaRateLimiter(extractMaxQps(settings)) : rateLimiter;
+  }
+
+  public void run() {
+    _executor.execute(() -> {
+      while (!_shutdown) {
+        List<StreamDataProducer.RowWithKey> rows = _generator.generateRows();
+        // we expect the generator implementation to return empty rows when there is no data available
+        // as a stream, we expect data to be available all the time
+        if (rows.isEmpty()) {
+          try {
+            Thread.sleep(DEFAULT_EMPTY_SOURCE_SLEEP_MS);
+          } catch (InterruptedException ex) {
+            LOGGER.warn("Interrupted from sleep, will check shutdown flag later", ex);
+          }
+        } else {
+          _rateLimiter.acquire(rows.size());
+          if (!_shutdown) {
+            _producer.produceKeyedBatch(_topicName, rows);
+          }
+        }
+      }
+    });
+  }
+
+  @Override
+  public void close() throws Exception {
+    _generator.close();
+    _shutdown = true;
+    _producer.close();
+    _executor.shutdownNow();
+  }
+
+  /**
+   * A simpler wrapper for guava-based rate limiter
+   */
+  private static class GuavaRateLimiter implements PinotStreamRateLimiter {
+    private final RateLimiter _rateLimiter;
+    public GuavaRateLimiter(long maxQps) {
+      _rateLimiter = RateLimiter.create(maxQps);
+    }
+    @Override
+    public void acquire(int permits) {
+      _rateLimiter.acquire();
+    }
+  }
+
+  static long extractMaxQps(Properties settings) {
+    String qpsStr = settings.getProperty(KEY_OF_MAX_MESSAGE_PER_SECOND, String.valueOf(DEFAULT_MAX_MESSAGE_PER_SECOND));
+    long maxQps = DEFAULT_MAX_MESSAGE_PER_SECOND;
+    try {
+      maxQps = Long.parseLong(qpsStr);
+    } catch (NumberFormatException ex) {
+      LOGGER.warn("Cannot parse {} as max qps setting, using default {}", qpsStr, DEFAULT_MAX_MESSAGE_PER_SECOND);
+    }
+    return maxQps;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private String _topic;
+    private long _maxMessagePerSecond;
+    private PinotSourceDataGenerator _generator;
+    private StreamDataProducer _producer;
+    private ExecutorService _executor;
+    private PinotStreamRateLimiter _rateLimiter;
+    public Builder setTopic(String topic) {
+      _topic = topic;
+      return this;
+    }
+
+    public Builder setMaxMessagePerSecond(long maxMessagePerSecond) {
+      _maxMessagePerSecond = maxMessagePerSecond;
+      return this;
+    }
+
+    public Builder setGenerator(PinotSourceDataGenerator generator) {
+      _generator = generator;
+      return this;
+    }
+
+    public Builder setProducer(StreamDataProducer producer) {
+      _producer = producer;
+      return this;
+    }
+
+    public Builder setExecutor(ExecutorService executor) {
+      _executor = executor;
+      return this;
+    }
+
+    public Builder setRateLimiter(PinotStreamRateLimiter rateLimiter) {
+      _rateLimiter = rateLimiter;
+      return this;
+    }
+
+    public PinotRealtimeSource build() {
+      Preconditions.checkNotNull(_topic, "PinotRealTimeSource should specify topic name");
+      Properties properties = new Properties();
+      if (_maxMessagePerSecond > 0) {
+        properties.setProperty(KEY_OF_MAX_MESSAGE_PER_SECOND, String.valueOf(_maxMessagePerSecond));
+      }
+      properties.setProperty(KEY_OF_TOPIC_NAME, _topic);
+      return new PinotRealtimeSource(properties, _generator, _producer, _executor, _rateLimiter);
+    }
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotSourceDataGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotSourceDataGenerator.java
new file mode 100644
index 0000000000..12e15e4cf3
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotSourceDataGenerator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+
+
+/**
+ * Represents one Pinot Real Time Data Source that can constantly generate data
+ * For example it can be pulling a batch from Kafka, or polling some data via HTTP GET
+ * The generator will be driven by PinotRealtimeSource to keep producing into some downstream sink
+ */
+public interface PinotSourceDataGenerator extends AutoCloseable {
+  /**
+   * Initialize the generator via a property file. It will be called at least once
+   * @param properties the property files
+   */
+  void init(Properties properties);
+
+  /**
+   * Generate a small batch of rows represented by bytes.
+   * It is up to the generator to define the binary format
+   * @return a small list of RowWithKey, each element of the list will be written as one row of data
+   */
+  List<StreamDataProducer.RowWithKey> generateRows();
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotStreamRateLimiter.java
similarity index 70%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
copy to pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotStreamRateLimiter.java
index 6b3c14010d..ef5390acea 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotStreamRateLimiter.java
@@ -16,20 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.stream;
-
-import java.util.Properties;
 
+package org.apache.pinot.tools.streams;
 
 /**
- * StreamDataProducer is the interface for stream data sources. E.g. KafkaDataProducer.
+ * Represents a very simple rate limiter that is used by Pinot
  */
-public interface StreamDataProducer {
-  void init(Properties props);
-
-  void produce(String topic, byte[] payload);
-
-  void produce(String topic, byte[] key, byte[] payload);
-
-  void close();
+@FunctionalInterface
+public interface PinotStreamRateLimiter {
+  /**
+   * Blocks current thread until X permits are available
+   * @param permits how many permits we wish to acquire
+   */
+  void acquire(int permits);
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
new file mode 100644
index 0000000000..a09b6bb6fa
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.streams;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableList;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * A simple random generator that fakes RSVP
+ */
+public class RsvpSourceGenerator implements PinotSourceDataGenerator {
+  private final KeyColumn _keyColumn;
+  public static final DateTimeFormatter DATE_TIME_FORMATTER =
+      new DateTimeFormatterBuilder().parseCaseInsensitive().append(DateTimeFormatter.ISO_LOCAL_DATE).appendLiteral(' ')
+          .append(DateTimeFormatter.ISO_LOCAL_TIME).toFormatter();
+
+  public RsvpSourceGenerator(KeyColumn keyColumn) {
+    _keyColumn = keyColumn;
+  }
+
+  public RSVP createMessage() {
+    String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + "";
+    ObjectNode json = JsonUtils.newObjectNode();
+    json.put("venue_name", "venue_name" + ThreadLocalRandom.current().nextInt());
+    json.put("event_name", "event_name" + ThreadLocalRandom.current().nextInt());
+    json.put("event_id", eventId);
+    json.put("event_time", DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
+    json.put("group_city", "group_city" + ThreadLocalRandom.current().nextInt());
+    json.put("group_country", "group_country" + ThreadLocalRandom.current().nextInt());
+    json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong()));
+    json.put("group_name", "group_name" + ThreadLocalRandom.current().nextInt());
+    json.put("group_lat", ThreadLocalRandom.current().nextFloat());
+    json.put("group_lon", ThreadLocalRandom.current().nextFloat());
+    json.put("mtime", DATE_TIME_FORMATTER.format(LocalDateTime.now()));
+    json.put("rsvp_count", 1);
+    return new RSVP(eventId, eventId, json);
+  }
+
+  @Override
+  public void init(Properties properties) {
+  }
+
+  @Override
+  public List<StreamDataProducer.RowWithKey> generateRows() {
+    RSVP msg = createMessage();
+    byte[] key;
+    switch (_keyColumn) {
+      case EVENT_ID:
+        key = msg.getEventId().getBytes(UTF_8);
+        break;
+      case RSVP_ID:
+        key = msg.getRsvpId().getBytes(UTF_8);
+        break;
+      default:
+        key = null;
+        break;
+    }
+    return ImmutableList.of(new StreamDataProducer.RowWithKey(key, msg.getPayload().toString().getBytes(UTF_8)));
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+  }
+  public enum KeyColumn {
+    NONE,
+    EVENT_ID,
+    RSVP_ID
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/GithubPullRequestSourceGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/GithubPullRequestSourceGenerator.java
new file mode 100644
index 0000000000..3d1b6708b4
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/GithubPullRequestSourceGenerator.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams.githubevents;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.tools.QuickStartBase;
+import org.apache.pinot.tools.Quickstart;
+import org.apache.pinot.tools.streams.PinotSourceDataGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The class that pulls events from GitHub by RPC calls, and converts them into byte[] so we can write to Kafka
+ */
+public class GithubPullRequestSourceGenerator implements PinotSourceDataGenerator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GithubPullRequestSourceGenerator.class);
+  private static final long SLEEP_MILLIS = 10_000;
+
+  private GitHubAPICaller _gitHubAPICaller;
+  private Schema _avroSchema;
+  private String _etag = null;
+
+  public GithubPullRequestSourceGenerator(File schemaFile, String personalAccessToken)
+      throws Exception {
+    try {
+      _avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(org.apache.pinot.spi.data.Schema.fromFile(schemaFile));
+    } catch (Exception e) {
+      LOGGER.error("Got exception while reading Pinot schema from file: [" + schemaFile.getName() + "]");
+      throw e;
+    }
+    _gitHubAPICaller = new GitHubAPICaller(personalAccessToken);
+  }
+
+  private GenericRecord convertToPullRequestMergedGenericRecord(JsonNode event)
+      throws IOException {
+    GenericRecord genericRecord = null;
+    String type = event.get("type").asText();
+
+    if ("PullRequestEvent".equals(type)) {
+      JsonNode payload = event.get("payload");
+      if (payload != null) {
+        String action = payload.get("action").asText();
+        JsonNode pullRequest = payload.get("pull_request");
+        String merged = pullRequest.get("merged").asText();
+        if ("closed".equals(action) && "true".equals(merged)) { // valid pull request merge event
+
+          JsonNode commits = null;
+          String commitsURL = pullRequest.get("commits_url").asText();
+          GitHubAPICaller.GitHubAPIResponse commitsResponse = _gitHubAPICaller.callAPI(commitsURL);
+
+          if (commitsResponse._responseString != null) {
+            commits = JsonUtils.stringToJsonNode(commitsResponse._responseString);
+          }
+
+          JsonNode reviewComments = null;
+          String reviewCommentsURL = pullRequest.get("review_comments_url").asText();
+          GitHubAPICaller.GitHubAPIResponse reviewCommentsResponse = _gitHubAPICaller.callAPI(reviewCommentsURL);
+          if (reviewCommentsResponse._responseString != null) {
+            reviewComments = JsonUtils.stringToJsonNode(reviewCommentsResponse._responseString);
+          }
+
+          JsonNode comments = null;
+          String commentsURL = pullRequest.get("comments_url").asText();
+          GitHubAPICaller.GitHubAPIResponse commentsResponse = _gitHubAPICaller.callAPI(commentsURL);
+          if (commentsResponse._responseString != null) {
+            comments = JsonUtils.stringToJsonNode(commentsResponse._responseString);
+          }
+
+          // get PullRequestMergeEvent
+          PullRequestMergedEvent pullRequestMergedEvent =
+              new PullRequestMergedEvent(event, commits, reviewComments, comments);
+          // make generic record
+          genericRecord = convertToGenericRecord(pullRequestMergedEvent);
+        }
+      }
+    }
+    return genericRecord;
+  }
+
+  /**
+   * Convert the PullRequestMergedEvent to a GenericRecord
+   */
+  private GenericRecord convertToGenericRecord(PullRequestMergedEvent pullRequestMergedEvent) {
+    GenericRecord genericRecord = new GenericData.Record(_avroSchema);
+
+    // Dimensions
+    genericRecord.put("title", pullRequestMergedEvent.getTitle());
+    genericRecord.put("labels", pullRequestMergedEvent.getLabels());
+    genericRecord.put("userId", pullRequestMergedEvent.getUserId());
+    genericRecord.put("userType", pullRequestMergedEvent.getUserType());
+    genericRecord.put("authorAssociation", pullRequestMergedEvent.getAuthorAssociation());
+    genericRecord.put("mergedBy", pullRequestMergedEvent.getMergedBy());
+    genericRecord.put("assignees", pullRequestMergedEvent.getAssignees());
+    genericRecord.put("committers", pullRequestMergedEvent.getCommitters());
+    genericRecord.put("reviewers", pullRequestMergedEvent.getReviewers());
+    genericRecord.put("commenters", pullRequestMergedEvent.getCommenters());
+    genericRecord.put("authors", pullRequestMergedEvent.getAuthors());
+    genericRecord.put("requestedReviewers", pullRequestMergedEvent.getRequestedReviewers());
+    genericRecord.put("requestedTeams", pullRequestMergedEvent.getRequestedTeams());
+    genericRecord.put("repo", pullRequestMergedEvent.getRepo());
+    genericRecord.put("organization", pullRequestMergedEvent.getOrganization());
+
+    // Metrics
+    genericRecord.put("numComments", pullRequestMergedEvent.getNumComments());
+    genericRecord.put("numReviewComments", pullRequestMergedEvent.getNumReviewComments());
+    genericRecord.put("numCommits", pullRequestMergedEvent.getNumCommits());
+    genericRecord.put("numLinesAdded", pullRequestMergedEvent.getNumLinesAdded());
+    genericRecord.put("numLinesDeleted", pullRequestMergedEvent.getNumLinesDeleted());
+    genericRecord.put("numFilesChanged", pullRequestMergedEvent.getNumFilesChanged());
+    genericRecord.put("numReviewers", pullRequestMergedEvent.getNumReviewers());
+    genericRecord.put("numCommenters", pullRequestMergedEvent.getNumCommenters());
+    genericRecord.put("numCommitters", pullRequestMergedEvent.getNumCommitters());
+    genericRecord.put("numAuthors", pullRequestMergedEvent.getNumAuthors());
+    genericRecord.put("createdTimeMillis", pullRequestMergedEvent.getCreatedTimeMillis());
+    genericRecord.put("elapsedTimeMillis", pullRequestMergedEvent.getElapsedTimeMillis());
+
+    // Time column
+    genericRecord.put("mergedTimeMillis", pullRequestMergedEvent.getMergedTimeMillis());
+
+    return genericRecord;
+  }
+
+  @Override
+  public void init(Properties properties) {
+  }
+
+  @Override
+  public List<StreamDataProducer.RowWithKey> generateRows() {
+    List<StreamDataProducer.RowWithKey> retVal = new ArrayList<>();
+    try {
+      GitHubAPICaller.GitHubAPIResponse githubAPIResponse = _gitHubAPICaller.callEventsAPI(_etag);
+      switch (githubAPIResponse._statusCode) {
+        case 200: // Read new events
+          _etag = githubAPIResponse._etag;
+          JsonNode jsonArray = JsonUtils.stringToJsonNode(githubAPIResponse._responseString);
+          for (JsonNode eventElement : jsonArray) {
+            try {
+              GenericRecord genericRecord = convertToPullRequestMergedGenericRecord(eventElement);
+              if (genericRecord != null) {
+                QuickStartBase.printStatus(Quickstart.Color.CYAN, genericRecord.toString());
+                retVal.add(
+                    new StreamDataProducer.RowWithKey(null, genericRecord.toString().getBytes(StandardCharsets.UTF_8)));
+              }
+            } catch (Exception e) {
+              LOGGER.error("Exception in publishing generic record. Skipping", e);
+            }
+          }
+          break;
+        case 304: // Not Modified
+          Quickstart.printStatus(Quickstart.Color.YELLOW, "Not modified. Checking again in 10s.");
+          Thread.sleep(SLEEP_MILLIS);
+          break;
+        case 408: // Timeout
+          Quickstart.printStatus(Quickstart.Color.YELLOW, "Timeout. Trying again in 10s.");
+          Thread.sleep(SLEEP_MILLIS);
+          break;
+        case 403: // Rate Limit exceeded
+          Quickstart.printStatus(Quickstart.Color.YELLOW,
+              "Rate limit exceeded, sleeping until " + githubAPIResponse._resetTimeMs);
+          long sleepMs = Math.max(60_000L, githubAPIResponse._resetTimeMs - System.currentTimeMillis());
+          Thread.sleep(sleepMs);
+          break;
+        case 401: // Unauthorized
+          String msg = "Unauthorized call to GitHub events API. Status message: " + githubAPIResponse._statusMessage
+              + ". Exiting.";
+          Quickstart.printStatus(Quickstart.Color.YELLOW, msg);
+          throw new RuntimeException(msg);
+        default: // Unknown status code
+          Quickstart.printStatus(Quickstart.Color.YELLOW,
+              "Unknown status code " + githubAPIResponse._statusCode + " statusMessage "
+                  + githubAPIResponse._statusMessage + ". Retry in 10s");
+          Thread.sleep(SLEEP_MILLIS);
+          break;
+      }
+    } catch (Exception e) {
+      LOGGER.error("Exception in reading events data", e);
+      try {
+        Thread.sleep(SLEEP_MILLIS);
+      } catch (InterruptedException ex) {
+        LOGGER.error("Caught exception in retry", ex);
+      }
+    }
+    return retVal;
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    _gitHubAPICaller.shutdown();
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
index a1703d8f6f..950af0d260 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
@@ -18,24 +18,16 @@
  */
 package org.apache.pinot.tools.streams.githubevents;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
 import java.io.File;
-import java.io.IOException;
 import java.net.URL;
-import java.nio.charset.StandardCharsets;
 import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.Quickstart;
+import org.apache.pinot.tools.streams.PinotRealtimeSource;
+import org.apache.pinot.tools.streams.PinotSourceDataGenerator;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.apache.pinot.tools.utils.KinesisStarterUtils;
 import org.apache.pinot.tools.utils.StreamSourceType;
@@ -53,38 +45,26 @@ import static org.apache.pinot.tools.Quickstart.printStatus;
  */
 public class PullRequestMergedEventsStream {
   private static final Logger LOGGER = LoggerFactory.getLogger(PullRequestMergedEventsStream.class);
-  private static final long SLEEP_MILLIS = 10_000;
 
-  private final ExecutorService _service;
-  private boolean _keepStreaming = true;
-
-  private final Schema _avroSchema;
-  private final String _topicName;
-  private final GitHubAPICaller _gitHubAPICaller;
-
-  private StreamDataProducer _producer;
+  private PinotRealtimeSource _pinotStream;
 
   public PullRequestMergedEventsStream(File schemaFile, String topicName, String personalAccessToken,
       StreamDataProducer producer)
       throws Exception {
-    _service = Executors.newFixedThreadPool(2);
-    try {
-      _avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(org.apache.pinot.spi.data.Schema.fromFile(schemaFile));
-    } catch (Exception e) {
-      LOGGER.error("Got exception while reading Pinot schema from file: [" + schemaFile.getName() + "]");
-      throw e;
-    }
-    _topicName = topicName;
-    _gitHubAPICaller = new GitHubAPICaller(personalAccessToken);
-    _producer = producer;
+    PinotSourceDataGenerator generator = new GithubPullRequestSourceGenerator(schemaFile, personalAccessToken);
+    _pinotStream =
+        PinotRealtimeSource.builder().setProducer(producer).setGenerator(generator).setTopic(topicName).build();
   }
 
   public PullRequestMergedEventsStream(String schemaFilePath, String topicName, String personalAccessToken,
       StreamDataProducer producer)
       throws Exception {
-    _service = Executors.newFixedThreadPool(2);
+    this(getSchemaFile(schemaFilePath), topicName, personalAccessToken, producer);
+  }
+
+  public static File getSchemaFile(String schemaFilePath) {
+    File pinotSchema;
     try {
-      File pinotSchema;
       if (schemaFilePath == null) {
         ClassLoader classLoader = PullRequestMergedEventsStream.class.getClassLoader();
         URL resource = classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_schema.json");
@@ -93,14 +73,11 @@ public class PullRequestMergedEventsStream {
       } else {
         pinotSchema = new File(schemaFilePath);
       }
-      _avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(org.apache.pinot.spi.data.Schema.fromFile(pinotSchema));
     } catch (Exception e) {
       LOGGER.error("Got exception while reading Pinot schema from file: [" + schemaFilePath + "]");
       throw e;
     }
-    _topicName = topicName;
-    _gitHubAPICaller = new GitHubAPICaller(personalAccessToken);
-    _producer = producer;
+    return pinotSchema;
   }
 
   public static StreamDataProducer getKafkaStreamDataProducer()
@@ -181,187 +158,13 @@ public class PullRequestMergedEventsStream {
    * Shuts down the stream.
    */
   public void shutdown()
-      throws IOException, InterruptedException {
+      throws Exception {
     printStatus(Quickstart.Color.GREEN, "***** Shutting down pullRequestMergedEvents Stream *****");
-    _keepStreaming = false;
-    Thread.sleep(3000L);
-    _gitHubAPICaller.shutdown();
-    _producer.close();
-    _producer = null;
-    _service.shutdown();
-  }
-
-  /**
-   * Publishes the message to the kafka topic
-   */
-  private void publish(GenericRecord message)
-      throws IOException {
-    if (!_keepStreaming) {
-      return;
-    }
-    _producer.produce(_topicName, message.toString().getBytes(StandardCharsets.UTF_8));
+    _pinotStream.close();
   }
 
   public void start() {
-
     printStatus(Quickstart.Color.CYAN, "***** Starting pullRequestMergedEvents Stream *****");
-
-    _service.submit(() -> {
-
-      String etag = null;
-      while (true) {
-        if (!_keepStreaming) {
-          return;
-        }
-        try {
-          GitHubAPICaller.GitHubAPIResponse githubAPIResponse = _gitHubAPICaller.callEventsAPI(etag);
-          switch (githubAPIResponse._statusCode) {
-            case 200: // Read new events
-              etag = githubAPIResponse._etag;
-              JsonNode jsonArray = JsonUtils.stringToJsonNode(githubAPIResponse._responseString);
-              for (JsonNode eventElement : jsonArray) {
-                try {
-                  GenericRecord genericRecord = convertToPullRequestMergedGenericRecord(eventElement);
-                  if (genericRecord != null) {
-                    printStatus(Quickstart.Color.CYAN, genericRecord.toString());
-                    publish(genericRecord);
-                  }
-                } catch (Exception e) {
-                  LOGGER.error("Exception in publishing generic record. Skipping", e);
-                }
-              }
-              break;
-            case 304: // Not Modified
-              printStatus(Quickstart.Color.YELLOW, "Not modified. Checking again in 10s.");
-              Thread.sleep(SLEEP_MILLIS);
-              break;
-            case 408: // Timeout
-              printStatus(Quickstart.Color.YELLOW, "Timeout. Trying again in 10s.");
-              Thread.sleep(SLEEP_MILLIS);
-              break;
-            case 403: // Rate Limit exceeded
-              printStatus(Quickstart.Color.YELLOW,
-                  "Rate limit exceeded, sleeping until " + githubAPIResponse._resetTimeMs);
-              long sleepMs = Math.max(60_000L, githubAPIResponse._resetTimeMs - System.currentTimeMillis());
-              Thread.sleep(sleepMs);
-              break;
-            case 401: // Unauthorized
-              printStatus(Quickstart.Color.YELLOW,
-                  "Unauthorized call to GitHub events API. Status message: " + githubAPIResponse._statusMessage
-                      + ". Exiting.");
-              return;
-            default: // Unknown status code
-              printStatus(Quickstart.Color.YELLOW,
-                  "Unknown status code " + githubAPIResponse._statusCode + " statusMessage "
-                      + githubAPIResponse._statusMessage + ". Retry in 10s");
-              Thread.sleep(SLEEP_MILLIS);
-              break;
-          }
-        } catch (Exception e) {
-          LOGGER.error("Exception in reading events data", e);
-          try {
-            Thread.sleep(SLEEP_MILLIS);
-          } catch (InterruptedException ex) {
-            LOGGER.error("Caught exception in retry", ex);
-          }
-        }
-      }
-    });
-  }
-
-  /**
-   * Checks for events of type PullRequestEvent which have action = closed and merged = true.
-   * Find commits, review comments, comments corresponding to this pull request event.
-   * Construct a PullRequestMergedEvent with the help of the event, commits, review comments and comments.
-   * Converts PullRequestMergedEvent to GenericRecord
-   * @param event
-   */
-  private GenericRecord convertToPullRequestMergedGenericRecord(JsonNode event)
-      throws IOException {
-    GenericRecord genericRecord = null;
-    String type = event.get("type").asText();
-
-    if ("PullRequestEvent".equals(type)) {
-      JsonNode payload = event.get("payload");
-      if (payload != null) {
-        String action = payload.get("action").asText();
-        JsonNode pullRequest = payload.get("pull_request");
-        String merged = pullRequest.get("merged").asText();
-        if ("closed".equals(action) && "true".equals(merged)) { // valid pull request merge event
-
-          JsonNode commits = null;
-          String commitsURL = pullRequest.get("commits_url").asText();
-          GitHubAPICaller.GitHubAPIResponse commitsResponse = _gitHubAPICaller.callAPI(commitsURL);
-
-          if (commitsResponse._responseString != null) {
-            commits = JsonUtils.stringToJsonNode(commitsResponse._responseString);
-          }
-
-          JsonNode reviewComments = null;
-          String reviewCommentsURL = pullRequest.get("review_comments_url").asText();
-          GitHubAPICaller.GitHubAPIResponse reviewCommentsResponse = _gitHubAPICaller.callAPI(reviewCommentsURL);
-          if (reviewCommentsResponse._responseString != null) {
-            reviewComments = JsonUtils.stringToJsonNode(reviewCommentsResponse._responseString);
-          }
-
-          JsonNode comments = null;
-          String commentsURL = pullRequest.get("comments_url").asText();
-          GitHubAPICaller.GitHubAPIResponse commentsResponse = _gitHubAPICaller.callAPI(commentsURL);
-          if (commentsResponse._responseString != null) {
-            comments = JsonUtils.stringToJsonNode(commentsResponse._responseString);
-          }
-
-          // get PullRequestMergeEvent
-          PullRequestMergedEvent pullRequestMergedEvent =
-              new PullRequestMergedEvent(event, commits, reviewComments, comments);
-          // make generic record
-          genericRecord = convertToGenericRecord(pullRequestMergedEvent);
-        }
-      }
-    }
-    return genericRecord;
-  }
-
-  /**
-   * Convert the PullRequestMergedEvent to a GenericRecord
-   */
-  private GenericRecord convertToGenericRecord(PullRequestMergedEvent pullRequestMergedEvent) {
-    GenericRecord genericRecord = new GenericData.Record(_avroSchema);
-
-    // Dimensions
-    genericRecord.put("title", pullRequestMergedEvent.getTitle());
-    genericRecord.put("labels", pullRequestMergedEvent.getLabels());
-    genericRecord.put("userId", pullRequestMergedEvent.getUserId());
-    genericRecord.put("userType", pullRequestMergedEvent.getUserType());
-    genericRecord.put("authorAssociation", pullRequestMergedEvent.getAuthorAssociation());
-    genericRecord.put("mergedBy", pullRequestMergedEvent.getMergedBy());
-    genericRecord.put("assignees", pullRequestMergedEvent.getAssignees());
-    genericRecord.put("committers", pullRequestMergedEvent.getCommitters());
-    genericRecord.put("reviewers", pullRequestMergedEvent.getReviewers());
-    genericRecord.put("commenters", pullRequestMergedEvent.getCommenters());
-    genericRecord.put("authors", pullRequestMergedEvent.getAuthors());
-    genericRecord.put("requestedReviewers", pullRequestMergedEvent.getRequestedReviewers());
-    genericRecord.put("requestedTeams", pullRequestMergedEvent.getRequestedTeams());
-    genericRecord.put("repo", pullRequestMergedEvent.getRepo());
-    genericRecord.put("organization", pullRequestMergedEvent.getOrganization());
-
-    // Metrics
-    genericRecord.put("numComments", pullRequestMergedEvent.getNumComments());
-    genericRecord.put("numReviewComments", pullRequestMergedEvent.getNumReviewComments());
-    genericRecord.put("numCommits", pullRequestMergedEvent.getNumCommits());
-    genericRecord.put("numLinesAdded", pullRequestMergedEvent.getNumLinesAdded());
-    genericRecord.put("numLinesDeleted", pullRequestMergedEvent.getNumLinesDeleted());
-    genericRecord.put("numFilesChanged", pullRequestMergedEvent.getNumFilesChanged());
-    genericRecord.put("numReviewers", pullRequestMergedEvent.getNumReviewers());
-    genericRecord.put("numCommenters", pullRequestMergedEvent.getNumCommenters());
-    genericRecord.put("numCommitters", pullRequestMergedEvent.getNumCommitters());
-    genericRecord.put("numAuthors", pullRequestMergedEvent.getNumAuthors());
-    genericRecord.put("createdTimeMillis", pullRequestMergedEvent.getCreatedTimeMillis());
-    genericRecord.put("elapsedTimeMillis", pullRequestMergedEvent.getElapsedTimeMillis());
-
-    // Time column
-    genericRecord.put("mergedTimeMillis", pullRequestMergedEvent.getMergedTimeMillis());
-
-    return genericRecord;
+    _pinotStream.run();
   }
 }
diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/streams/PinotRealtimeSourceTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/streams/PinotRealtimeSourceTest.java
new file mode 100644
index 0000000000..14f915d62a
--- /dev/null
+++ b/pinot-tools/src/test/java/org/apache/pinot/tools/streams/PinotRealtimeSourceTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.streams;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class PinotRealtimeSourceTest {
+
+  @Test
+  public void testBuilder() {
+    StreamDataProducer producer = Mockito.mock(StreamDataProducer.class);
+    PinotSourceDataGenerator generator = Mockito.mock(PinotSourceDataGenerator.class);
+    PinotRealtimeSource realtimeSource =
+        PinotRealtimeSource.builder().setTopic("mytopic").setProducer(producer).setGenerator(generator).build();
+    Assert.assertNotNull(realtimeSource);
+
+    PinotStreamRateLimiter limiter = Mockito.mock(PinotStreamRateLimiter.class);
+    ExecutorService executorService = Mockito.mock(ExecutorService.class);
+    realtimeSource = PinotRealtimeSource.builder().setRateLimiter(limiter).setProducer(producer).setGenerator(generator)
+        .setTopic("mytopic").setExecutor(executorService).setMaxMessagePerSecond(9527).build();
+    Assert.assertEquals(realtimeSource._executor, executorService);
+    Assert.assertEquals(realtimeSource._producer, producer);
+    Assert.assertEquals(realtimeSource._topicName, "mytopic");
+    String qps = realtimeSource._properties.get(PinotRealtimeSource.KEY_OF_MAX_MESSAGE_PER_SECOND).toString();
+    Assert.assertNotNull(qps);
+    Assert.assertEquals(qps, "9527");
+    Assert.assertEquals(realtimeSource._rateLimiter, limiter);
+  }
+
+  @Test(expectedExceptions = NullPointerException.class)
+  public void testBuilderNoNullProducerThrowExceptions() {
+    PinotSourceDataGenerator generator = Mockito.mock(PinotSourceDataGenerator.class);
+    PinotRealtimeSource realtimeSource =
+        PinotRealtimeSource.builder().setTopic("mytopic").setGenerator(generator).build();
+  }
+
+  @Test(expectedExceptions = NullPointerException.class)
+  public void testBuilderNoNullGeneratorThrowExceptions() {
+    StreamDataProducer producer = Mockito.mock(StreamDataProducer.class);
+    PinotRealtimeSource realtimeSource =
+        PinotRealtimeSource.builder().setTopic("mytopic").setProducer(producer).build();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org