You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ri...@apache.org on 2022/04/26 08:12:13 UTC
[pinot] branch master updated: Refactor quickstart data source (#8567)
This is an automated email from the ASF dual-hosted git repository.
richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4d36f3dfbf Refactor quickstart data source (#8567)
4d36f3dfbf is described below
commit 4d36f3dfbfc68e5b4f5cf24d7313d20a31e74155
Author: Xiaoman Dong <xi...@startree.ai>
AuthorDate: Tue Apr 26 01:12:07 2022 -0700
Refactor quickstart data source (#8567)
* save temp work
* refactor done
* fix
* add comments
* fix topic issue
* fix executor
* more documentation
* more refactor
* remove accidental file
* remove extra member var
* save temp work
* remove rsvp json stream
* fix
* fix
* fix
* fix 4
* set topic
* fix my own test
* generator
* address comments
---
.../pinot/spi/stream/StreamDataProducer.java | 72 +++++++
.../pinot/spi/stream/StreamDataProducerTest.java | 55 +++++
.../RealtimeComplexTypeHandlingQuickStart.java | 4 +-
.../pinot/tools/RealtimeJsonIndexQuickStart.java | 4 +-
.../apache/pinot/tools/UpsertJsonQuickStart.java | 5 +-
.../pinot/tools/streams/AirlineDataStream.java | 112 ++--------
.../tools/streams/AvroFileSourceGenerator.java | 141 +++++++++++++
.../pinot/tools/streams/MeetupRsvpJsonStream.java | 53 -----
.../pinot/tools/streams/MeetupRsvpStream.java | 121 +++--------
.../pinot/tools/streams/PinotRealtimeSource.java | 190 +++++++++++++++++
.../tools/streams/PinotSourceDataGenerator.java | 44 ++++
.../tools/streams/PinotStreamRateLimiter.java | 21 +-
.../pinot/tools/streams/RsvpSourceGenerator.java | 98 +++++++++
.../GithubPullRequestSourceGenerator.java | 220 ++++++++++++++++++++
.../PullRequestMergedEventsStream.java | 227 ++-------------------
.../tools/streams/PinotRealtimeSourceTest.java | 65 ++++++
16 files changed, 956 insertions(+), 476 deletions(-)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
index 6b3c14010d..d181c44225 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
@@ -18,7 +18,10 @@
*/
package org.apache.pinot.spi.stream;
+import java.util.Arrays;
+import java.util.List;
import java.util.Properties;
+import javax.annotation.Nullable;
/**
@@ -32,4 +35,73 @@ public interface StreamDataProducer {
void produce(String topic, byte[] key, byte[] payload);
void close();
+
+ /**
+ * Allows the producer to optimize for a batched write.
+ * This will help increase throughput in some cases
+ * @param topic the topic of the output
+ * @param rows the rows
+ */
+ default void produceBatch(String topic, List<byte[]> rows) {
+ for (byte[] row: rows) {
+ produce(topic, row);
+ }
+ }
+
+ /**
+ * Allows the producer to optimize for a batched write.
+ * This will help increase throughput in some cases
+ * @param topic the topic of the output
+ * @param payloadWithKey the payload rows with key
+ */
+ default void produceKeyedBatch(String topic, List<RowWithKey> payloadWithKey) {
+ for (RowWithKey rowWithKey: payloadWithKey) {
+ if (rowWithKey.getKey() == null) {
+ produce(topic, rowWithKey.getPayload());
+ } else {
+ produce(topic, rowWithKey.getKey(), rowWithKey.getPayload());
+ }
+ }
+ }
+
+ /**
+ * Helper class so the key and payload can be easily tied together instead of using a pair
+ * The class is intended for StreamDataProducer only
+ */
+ class RowWithKey {
+ private final byte[] _key;
+ private final byte[] _payload;
+
+ public RowWithKey(@Nullable byte[] key, byte[] payload) {
+ _key = key;
+ _payload = payload;
+ }
+
+ public byte[] getKey() {
+ return _key;
+ }
+
+ public byte[] getPayload() {
+ return _payload;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RowWithKey that = (RowWithKey) o;
+ return Arrays.equals(_key, that._key) && Arrays.equals(_payload, that._payload);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Arrays.hashCode(_key);
+ result = 31 * result + Arrays.hashCode(_payload);
+ return result;
+ }
+ }
}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataProducerTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataProducerTest.java
new file mode 100644
index 0000000000..cd038bd801
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamDataProducerTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.spi.stream;
+
+import java.nio.charset.StandardCharsets;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class StreamDataProducerTest {
+
+ @Test
+ public void testRowWithKeyEquals() {
+ byte[] b1 = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20};
+ byte[] b2 = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20};
+ byte[] b3 = new byte[]{1, 2, 3, 4, 5, 6, 7, 8};
+ byte[] k1 = "somekey".getBytes(StandardCharsets.UTF_8);
+ byte[] k3 = "anotherkey".getBytes(StandardCharsets.UTF_8);
+ StreamDataProducer.RowWithKey nullKey1 = new StreamDataProducer.RowWithKey(null, b1);
+ StreamDataProducer.RowWithKey nullKey2 = new StreamDataProducer.RowWithKey(null, b2);
+ StreamDataProducer.RowWithKey nullKey3 = new StreamDataProducer.RowWithKey(null, b3);
+ Assert.assertEquals(nullKey2, nullKey1);
+ Assert.assertEquals(nullKey1.hashCode(), nullKey2.hashCode());
+ Assert.assertNotEquals(nullKey3, nullKey1);
+ Assert.assertNotEquals(nullKey3.hashCode(), nullKey1.hashCode());
+
+ Assert.assertEquals(nullKey1, nullKey1);
+
+ StreamDataProducer.RowWithKey b2WithKey = new StreamDataProducer.RowWithKey(k1, b2);
+ Assert.assertNotEquals(nullKey2, b2WithKey);;
+ StreamDataProducer.RowWithKey b1WithKey = new StreamDataProducer.RowWithKey(k1, b1);
+ Assert.assertEquals(b1WithKey, b2WithKey);
+ Assert.assertEquals(b1WithKey.hashCode(), b2WithKey.hashCode());
+
+ StreamDataProducer.RowWithKey b2WithDifferentKey = new StreamDataProducer.RowWithKey(k3, b2);
+ Assert.assertNotEquals(b2WithKey, b2WithDifferentKey);
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
index c07004653a..88106d3612 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeComplexTypeHandlingQuickStart.java
@@ -32,7 +32,7 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpJsonStream;
+import org.apache.pinot.tools.streams.MeetupRsvpStream;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
@@ -88,7 +88,7 @@ public class RealtimeComplexTypeHandlingQuickStart extends QuickStartBase {
_kafkaStarter.start();
_kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2));
printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****");
- MeetupRsvpJsonStream meetupRSVPProvider = new MeetupRsvpJsonStream();
+ MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
meetupRSVPProvider.run();
printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****");
runner.startAll();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
index 3875543ef0..94aadea9e8 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeJsonIndexQuickStart.java
@@ -32,7 +32,7 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpJsonStream;
+import org.apache.pinot.tools.streams.MeetupRsvpStream;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
@@ -87,7 +87,7 @@ public class RealtimeJsonIndexQuickStart extends QuickStartBase {
_kafkaStarter.start();
_kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2));
printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****");
- MeetupRsvpJsonStream meetupRSVPProvider = new MeetupRsvpJsonStream();
+ MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream();
meetupRSVPProvider.run();
printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****");
runner.startAll();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
index 7a86ee87cc..b257d491a4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/UpsertJsonQuickStart.java
@@ -32,7 +32,8 @@ import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.PinotAdministrator;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
-import org.apache.pinot.tools.streams.MeetupRsvpJsonStream;
+import org.apache.pinot.tools.streams.MeetupRsvpStream;
+import org.apache.pinot.tools.streams.RsvpSourceGenerator;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import static org.apache.pinot.tools.Quickstart.prettyPrintResponse;
@@ -87,7 +88,7 @@ public class UpsertJsonQuickStart extends QuickStartBase {
_kafkaStarter.start();
_kafkaStarter.createTopic("meetupRSVPEvents", KafkaStarterUtils.getTopicCreationProps(2));
printStatus(Color.CYAN, "***** Starting meetup data stream and publishing to Kafka *****");
- MeetupRsvpJsonStream meetupRSVPProvider = new MeetupRsvpJsonStream(true);
+ MeetupRsvpStream meetupRSVPProvider = new MeetupRsvpStream(RsvpSourceGenerator.KeyColumn.RSVP_ID);
meetupRSVPProvider.run();
printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****");
runner.startAll();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index d0824e0054..2dee145ac0 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -19,43 +19,28 @@
package org.apache.pinot.tools.streams;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.tools.QuickStartBase;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This is used in Hybrid Quickstart.
*/
public class AirlineDataStream {
- private static final Logger logger = LoggerFactory.getLogger(AirlineDataStream.class);
-
+ private static final String KAFKA_TOPIC_NAME = "flights-realtime";
Schema _pinotSchema;
String _timeColumnName;
File _avroFile;
- DataFileStream<GenericRecord> _avroDataStream;
- Integer _currentTimeValue = 16102;
- boolean _keepIndexing = true;
- ExecutorService _service;
- int _counter = 0;
+ final Integer _startTime = 16102;
private StreamDataProducer _producer;
+ private PinotRealtimeSource _pinotStream;
public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File avroFile)
throws Exception {
@@ -67,9 +52,12 @@ public class AirlineDataStream {
_pinotSchema = pinotSchema;
_timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
_avroFile = avroFile;
- createStream();
_producer = producer;
- _service = Executors.newFixedThreadPool(1);
+ AvroFileSourceGenerator generator = new AvroFileSourceGenerator(pinotSchema, avroFile, 1, _timeColumnName,
+ (rowNumber) -> (_startTime + rowNumber / 60));
+ _pinotStream =
+ PinotRealtimeSource.builder().setProducer(_producer).setGenerator(generator).setTopic(KAFKA_TOPIC_NAME)
+ .setMaxMessagePerSecond(1).build();
QuickStartBase.printStatus(Quickstart.Color.YELLOW,
"***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time "
+ "every 60 events (which is approximately 60 seconds) *****");
@@ -85,84 +73,14 @@ public class AirlineDataStream {
return StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
}
- public void shutdown() {
- _keepIndexing = false;
- _avroDataStream = null;
- _producer.close();
- _producer = null;
- _service.shutdown();
- }
-
- private void createStream()
- throws IOException {
- if (_keepIndexing) {
- _avroDataStream = new DataFileStream<>(new FileInputStream(_avroFile), new GenericDatumReader<>());
- return;
- }
- _avroDataStream = null;
- }
-
- private void publish(GenericRecord message)
- throws IOException {
- if (!_keepIndexing) {
- _avroDataStream.close();
- _avroDataStream = null;
- return;
- }
- _producer.produce("flights-realtime", message.toString().getBytes("UTF-8"));
- }
-
public void run() {
+ _pinotStream.run();
+ }
- _service.submit(new Runnable() {
-
- @Override
- public void run() {
- while (true) {
- while (_avroDataStream.hasNext()) {
- if (!_keepIndexing) {
- return;
- }
-
- GenericRecord record = _avroDataStream.next();
-
- GenericRecord message = new GenericData.Record(AvroUtils.getAvroSchemaFromPinotSchema(_pinotSchema));
-
- for (FieldSpec spec : _pinotSchema.getDimensionFieldSpecs()) {
- message.put(spec.getName(), record.get(spec.getName()));
- }
-
- for (FieldSpec spec : _pinotSchema.getMetricFieldSpecs()) {
- message.put(spec.getName(), record.get(spec.getName()));
- }
-
- message.put(_timeColumnName, _currentTimeValue);
-
- try {
- publish(message);
- _counter++;
- if (_counter % 60 == 0) {
- _currentTimeValue = _currentTimeValue + 1;
- }
- Thread.sleep(1000);
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
-
- try {
- _avroDataStream.close();
- } catch (IOException e) {
- logger.error(e.getMessage());
- }
-
- try {
- createStream();
- } catch (IOException e) {
- logger.error(e.getMessage());
- }
- }
- }
- });
+ public void shutdown()
+ throws Exception {
+ _pinotStream.close();
+ _producer.close();
+ _producer = null;
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AvroFileSourceGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AvroFileSourceGenerator.java
new file mode 100644
index 0000000000..bfe6e34a68
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AvroFileSourceGenerator.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.parquet.Strings;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Generates Pinot Real Time Source by an AvroFile.
+ * It will keep looping the same file and produce data output. We can pass in a lambda function to compute
+ * time index based on row number.
+ */
+public class AvroFileSourceGenerator implements PinotSourceDataGenerator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSource.class);
+ private DataFileStream<GenericRecord> _avroDataStream;
+ private final Schema _pinotSchema;
+ private long _rowsProduced;
+ // If this var is null, we will not set time index column
+ private final String _timeColumnName;
+ private final Function<Long, Long> _rowNumberToTimeIndex;
+ private final File _avroFile;
+ private final int _rowsPerBatch;
+
+ /**
+ * Reads the avro file, produce the rows, and then keep looping without setting time index
+ * @param pinotSchema the Pinot Schema so the avro rows can be produced
+ * @param avroFile the avro file as source.
+ */
+ public AvroFileSourceGenerator(Schema pinotSchema, File avroFile) {
+ this(pinotSchema, avroFile, 1, null, null);
+ }
+
+ /**
+ * Reads the avro file, produce the rows, and keep looping, allows customization of time index by a lambda function
+ * @param pinotSchema the Pinot Schema so the avro rows can be produced
+ * @param avroFile the avro file as source.
+ * @param rowsPerBatch in one batch, return several rows at the same time
+ * @param timeColumnName the time column name for customizing/overriding time index. Null for skipping customization.
+ * @param rowNumberToTimeIndex the lambda to compute time index based on row number. Null for skipping customization.
+ */
+ public AvroFileSourceGenerator(Schema pinotSchema, File avroFile, int rowsPerBatch,
+ @Nullable String timeColumnName, @Nullable Function<Long, Long> rowNumberToTimeIndex) {
+ _pinotSchema = pinotSchema;
+ _rowsProduced = 0;
+ _rowNumberToTimeIndex = rowNumberToTimeIndex;
+ _timeColumnName = timeColumnName;
+ if (!Strings.isNullOrEmpty(_timeColumnName)) {
+ DateTimeFieldSpec timeColumnSpec = pinotSchema.getSpecForTimeColumn(timeColumnName);
+ Preconditions.checkNotNull(timeColumnSpec,
+ "Time column " + timeColumnName + " is not found in schema, or is not a valid DateTime column");
+ }
+ _avroFile = avroFile;
+ _rowsPerBatch = rowsPerBatch;
+ }
+
+ @Override
+ public void init(Properties properties) {
+ }
+
+ @Override
+ public List<StreamDataProducer.RowWithKey> generateRows() {
+ List<StreamDataProducer.RowWithKey> retVal = new ArrayList<>();
+ ensureStream();
+ int rowsInCurrentBatch = 0;
+ while (_avroDataStream.hasNext() && rowsInCurrentBatch < _rowsPerBatch) {
+ GenericRecord record = _avroDataStream.next();
+ GenericRecord message = new GenericData.Record(AvroUtils.getAvroSchemaFromPinotSchema(_pinotSchema));
+ for (FieldSpec spec : _pinotSchema.getDimensionFieldSpecs()) {
+ message.put(spec.getName(), record.get(spec.getName()));
+ }
+
+ for (FieldSpec spec : _pinotSchema.getMetricFieldSpecs()) {
+ message.put(spec.getName(), record.get(spec.getName()));
+ }
+ message.put(_timeColumnName, _rowNumberToTimeIndex.apply(_rowsProduced));
+ retVal.add(new StreamDataProducer.RowWithKey(null, message.toString().getBytes(StandardCharsets.UTF_8)));
+ _rowsProduced += 1;
+ rowsInCurrentBatch += 1;
+ }
+ return retVal;
+ }
+
+ @Override
+ public void close()
+ throws Exception {
+ _avroDataStream.close();
+ }
+
+ // Re-opens file stream if the file has reached its end.
+ private void ensureStream() {
+ try {
+ if (_avroDataStream != null && !_avroDataStream.hasNext()) {
+ _avroDataStream.close();
+ _avroDataStream = null;
+ }
+ if (_avroDataStream == null) {
+ _avroDataStream = new DataFileStream<>(new FileInputStream(_avroFile.getPath()), new GenericDatumReader<>());
+ }
+ } catch (IOException ex) {
+ LOGGER.error("Failed to open/close {}", _avroFile.getPath(), ex);
+ throw new RuntimeException("Failed to open/close " + _avroFile.getPath(), ex);
+ }
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
deleted file mode 100644
index 3ceb3734bc..0000000000
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.tools.streams;
-
-import java.util.function.Consumer;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-
-public class MeetupRsvpJsonStream extends MeetupRsvpStream {
-
- public MeetupRsvpJsonStream()
- throws Exception {
- super();
- }
-
- public MeetupRsvpJsonStream(boolean partitionByKey)
- throws Exception {
- super(partitionByKey);
- }
-
- @Override
- protected Consumer<RSVP> createConsumer() {
- return message -> {
- if (_partitionByKey) {
- try {
- _producer.produce(_topicName, message.getRsvpId().getBytes(UTF_8), message.getPayload().toString()
- .getBytes(UTF_8));
- } catch (Exception e) {
- LOGGER.error("Caught exception while processing the message: {}", message, e);
- }
- } else {
- _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8));
- }
- };
- }
-}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index d10955a22e..f42ffd7fe9 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -18,40 +18,21 @@
*/
package org.apache.pinot.tools.streams;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Consumer;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
public class MeetupRsvpStream {
protected static final Logger LOGGER = LoggerFactory.getLogger(MeetupRsvpStream.class);
- private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
- .parseCaseInsensitive()
- .append(DateTimeFormatter.ISO_LOCAL_DATE)
- .appendLiteral(' ')
- .append(DateTimeFormatter.ISO_LOCAL_TIME)
- .toFormatter();
private static final String DEFAULT_TOPIC_NAME = "meetupRSVPEvents";
protected String _topicName = DEFAULT_TOPIC_NAME;
- protected final boolean _partitionByKey;
- protected final StreamDataProducer _producer;
- private final Source _source;
+ protected PinotRealtimeSource _pinotRealtimeSource;
public MeetupRsvpStream()
throws Exception {
@@ -60,94 +41,42 @@ public class MeetupRsvpStream {
public MeetupRsvpStream(boolean partitionByKey)
throws Exception {
- _partitionByKey = partitionByKey;
+ // calling this constructor means that we wish to use EVENT_ID as key. RsvpId is used by MeetupRsvpJsonStream
+ this(partitionByKey ? RsvpSourceGenerator.KeyColumn.EVENT_ID : RsvpSourceGenerator.KeyColumn.NONE);
+ }
+ public MeetupRsvpStream(RsvpSourceGenerator.KeyColumn keyColumn)
+ throws Exception {
Properties properties = new Properties();
properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
properties.put("request.required.acks", "1");
- _producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
- _source = new Source(createConsumer());
+ StreamDataProducer producer =
+ StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
+ _pinotRealtimeSource =
+ PinotRealtimeSource.builder().setGenerator(new RsvpSourceGenerator(keyColumn)).setProducer(producer)
+ .setRateLimiter(permits -> {
+ int delay = (int) (Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1;
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ex) {
+ LOGGER.warn("Interrupted from sleep but will continue", ex);
+ }
+ })
+ .setTopic(_topicName)
+ .build();
}
public void run()
throws Exception {
- _source.start();
+ _pinotRealtimeSource.run();
}
public void stopPublishing() {
- _producer.close();
- _source.close();
- }
-
- protected Consumer<RSVP> createConsumer() {
- return message -> {
- try {
- if (_partitionByKey) {
- _producer.produce(_topicName, message.getEventId().getBytes(UTF_8),
- message.getPayload().toString().getBytes(UTF_8));
- } else {
- _producer.produce(_topicName, message.getPayload().toString().getBytes(UTF_8));
- }
- } catch (Exception e) {
- LOGGER.error("Caught exception while processing the message: {}", message, e);
- }
- };
- }
-
- private static class Source implements AutoCloseable, Runnable {
-
- private final Consumer<RSVP> _consumer;
-
- private final ExecutorService _executorService = Executors.newSingleThreadExecutor();
- private volatile Future<?> _future;
-
- private Source(Consumer<RSVP> consumer) {
- _consumer = consumer;
- }
-
- @Override
- public void close() {
- if (_future != null) {
- _future.cancel(true);
- }
- _executorService.shutdownNow();
- }
-
- public void start() {
- _future = _executorService.submit(this);
- }
-
- @Override
- public void run() {
- while (!Thread.interrupted()) {
- try {
- RSVP rsvp = createMessage();
- _consumer.accept(rsvp);
- int delay = (int) (Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1;
- Thread.sleep(delay);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private RSVP createMessage() {
- String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + "";
- ObjectNode json = JsonUtils.newObjectNode();
- json.put("venue_name", "venue_name" + ThreadLocalRandom.current().nextInt());
- json.put("event_name", "event_name" + ThreadLocalRandom.current().nextInt());
- json.put("event_id", eventId);
- json.put("event_time", DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
- json.put("group_city", "group_city" + ThreadLocalRandom.current().nextInt());
- json.put("group_country", "group_country" + ThreadLocalRandom.current().nextInt());
- json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong()));
- json.put("group_name", "group_name" + ThreadLocalRandom.current().nextInt());
- json.put("group_lat", ThreadLocalRandom.current().nextFloat());
- json.put("group_lon", ThreadLocalRandom.current().nextFloat());
- json.put("mtime", DATE_TIME_FORMATTER.format(LocalDateTime.now()));
- json.put("rsvp_count", 1);
- return new RSVP(eventId, eventId, json);
+ try {
+ _pinotRealtimeSource.close();
+ } catch (Exception ex) {
+ LOGGER.error("Failed to close real time source. ignored and continue", ex);
}
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java
new file mode 100644
index 0000000000..87c527f9e1
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Represents one Pinot Real Time Source that is capable of
+ * 1. Keep running forever
+ * 2. Pull from generator and write into StreamDataProducer
+ * The Source has a thread that is looping forever.
+ */
+public class PinotRealtimeSource implements AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSource.class);
+ public static final String KEY_OF_MAX_MESSAGE_PER_SECOND = "pinot.stream.max.message.per.second";
+ public static final String KEY_OF_TOPIC_NAME = "pinot.topic.name";
+ public static final long DEFAULT_MAX_MESSAGE_PER_SECOND = Long.MAX_VALUE;
+ public static final long DEFAULT_EMPTY_SOURCE_SLEEP_MS = 10;
+ final StreamDataProducer _producer;
+ final PinotSourceDataGenerator _generator;
+ final String _topicName;
+ final ExecutorService _executor;
+ final Properties _properties;
+ PinotStreamRateLimiter _rateLimiter;
+ protected volatile boolean _shutdown;
+
+ /**
+ * Constructs a source by passing in a Properties file, a generator, and a producer
+ * @param settings the settings for all components passed in
+ * @param generator the generator that can create data
+ * @param producer the producer to write the generator's data into
+ */
+ public PinotRealtimeSource(Properties settings, PinotSourceDataGenerator generator, StreamDataProducer producer) {
+ this(settings, generator, producer, null, null);
+ }
+
+ /**
+ * Constructs a source by passing in properties file, a generator, a producer and an executor service
+ * @param settings the settings for all components passed in
+ * @param generator the generator that can create data
+ * @param producer the producer to write the generator's data into
+ * @param executor the preferred executor instead of creating a thread pool. Null for default one
+ * @param rateLimiter the specialized rate limiter for customization. Null for default guava one
+ */
+ public PinotRealtimeSource(Properties settings, PinotSourceDataGenerator generator, StreamDataProducer producer,
+ @Nullable ExecutorService executor, @Nullable PinotStreamRateLimiter rateLimiter) {
+ _properties = settings;
+ _producer = producer;
+ Preconditions.checkNotNull(_producer, "Producer of a stream cannot be null");
+ _generator = generator;
+ Preconditions.checkNotNull(_generator, "Generator of a stream cannot be null");
+ _executor = executor == null ? Executors.newSingleThreadExecutor() : executor;
+ _topicName = settings.getProperty(KEY_OF_TOPIC_NAME);
+ Preconditions.checkNotNull(_topicName, "Topic name needs to be set via " + KEY_OF_TOPIC_NAME);
+ _rateLimiter = rateLimiter == null ? new GuavaRateLimiter(extractMaxQps(settings)) : rateLimiter;
+ }
+
+ public void run() {
+ _executor.execute(() -> {
+ while (!_shutdown) {
+ List<StreamDataProducer.RowWithKey> rows = _generator.generateRows();
+ // we expect the generator implementation to return empty rows when there is no data available
+ // as a stream, we expect data to be available all the time
+ if (rows.isEmpty()) {
+ try {
+ Thread.sleep(DEFAULT_EMPTY_SOURCE_SLEEP_MS);
+ } catch (InterruptedException ex) {
+ LOGGER.warn("Interrupted from sleep, will check shutdown flag later", ex);
+ }
+ } else {
+ _rateLimiter.acquire(rows.size());
+ if (!_shutdown) {
+ _producer.produceKeyedBatch(_topicName, rows);
+ }
+ }
+ }
+ });
+ }
+
+ @Override
+ public void close() throws Exception {
+ _generator.close();
+ _shutdown = true;
+ _producer.close();
+ _executor.shutdownNow();
+ }
+
+ /**
+ * A simpler wrapper for guava-based rate limiter
+ */
+ private static class GuavaRateLimiter implements PinotStreamRateLimiter {
+ private final RateLimiter _rateLimiter;
+ public GuavaRateLimiter(long maxQps) {
+ _rateLimiter = RateLimiter.create(maxQps);
+ }
+ @Override
+ public void acquire(int permits) {
+ _rateLimiter.acquire();
+ }
+ }
+
+ static long extractMaxQps(Properties settings) {
+ String qpsStr = settings.getProperty(KEY_OF_MAX_MESSAGE_PER_SECOND, String.valueOf(DEFAULT_MAX_MESSAGE_PER_SECOND));
+ long maxQps = DEFAULT_MAX_MESSAGE_PER_SECOND;
+ try {
+ maxQps = Long.parseLong(qpsStr);
+ } catch (NumberFormatException ex) {
+ LOGGER.warn("Cannot parse {} as max qps setting, using default {}", qpsStr, DEFAULT_MAX_MESSAGE_PER_SECOND);
+ }
+ return maxQps;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private String _topic;
+ private long _maxMessagePerSecond;
+ private PinotSourceDataGenerator _generator;
+ private StreamDataProducer _producer;
+ private ExecutorService _executor;
+ private PinotStreamRateLimiter _rateLimiter;
+ public Builder setTopic(String topic) {
+ _topic = topic;
+ return this;
+ }
+
+ public Builder setMaxMessagePerSecond(long maxMessagePerSecond) {
+ _maxMessagePerSecond = maxMessagePerSecond;
+ return this;
+ }
+
+ public Builder setGenerator(PinotSourceDataGenerator generator) {
+ _generator = generator;
+ return this;
+ }
+
+ public Builder setProducer(StreamDataProducer producer) {
+ _producer = producer;
+ return this;
+ }
+
+ public Builder setExecutor(ExecutorService executor) {
+ _executor = executor;
+ return this;
+ }
+
+ public Builder setRateLimiter(PinotStreamRateLimiter rateLimiter) {
+ _rateLimiter = rateLimiter;
+ return this;
+ }
+
+ public PinotRealtimeSource build() {
+ Preconditions.checkNotNull(_topic, "PinotRealTimeSource should specify topic name");
+ Properties properties = new Properties();
+ if (_maxMessagePerSecond > 0) {
+ properties.setProperty(KEY_OF_MAX_MESSAGE_PER_SECOND, String.valueOf(_maxMessagePerSecond));
+ }
+ properties.setProperty(KEY_OF_TOPIC_NAME, _topic);
+ return new PinotRealtimeSource(properties, _generator, _producer, _executor, _rateLimiter);
+ }
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotSourceDataGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotSourceDataGenerator.java
new file mode 100644
index 0000000000..12e15e4cf3
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotSourceDataGenerator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+
+
+/**
+ * Represents one Pinot Real Time Data Source that can constantly generate data
+ * For example it can be pulling a batch from Kafka, or polling some data via HTTP GET
+ * The generator will be driven by PinotRealtimeSource to keep producing into some downstream sink
+ */
+public interface PinotSourceDataGenerator extends AutoCloseable {
+ /**
+ * Initialize the generator via a property file. It will be called at least once
+ * @param properties the property files
+ */
+ void init(Properties properties);
+
+ /**
+ * Generate a small batch of rows represented by bytes.
+ * It is up to the generator to define the binary format
+ * @return a small list of RowWithKey, each element of the list will be written as one row of data
+ */
+ List<StreamDataProducer.RowWithKey> generateRows();
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotStreamRateLimiter.java
similarity index 70%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
copy to pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotStreamRateLimiter.java
index 6b3c14010d..ef5390acea 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataProducer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotStreamRateLimiter.java
@@ -16,20 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.stream;
-
-import java.util.Properties;
+package org.apache.pinot.tools.streams;
/**
- * StreamDataProducer is the interface for stream data sources. E.g. KafkaDataProducer.
+ * Represents a very simple rate limiter that is used by Pinot
*/
-public interface StreamDataProducer {
- void init(Properties props);
-
- void produce(String topic, byte[] payload);
-
- void produce(String topic, byte[] key, byte[] payload);
-
- void close();
+@FunctionalInterface
+public interface PinotStreamRateLimiter {
+ /**
+ * Blocks current thread until X permits are available
+ * @param permits how many permits we wish to acquire
+ */
+ void acquire(int permits);
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
new file mode 100644
index 0000000000..a09b6bb6fa
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.streams;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableList;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * A simple random generator that fakes RSVP
+ */
+public class RsvpSourceGenerator implements PinotSourceDataGenerator {
+ private final KeyColumn _keyColumn;
+ public static final DateTimeFormatter DATE_TIME_FORMATTER =
+ new DateTimeFormatterBuilder().parseCaseInsensitive().append(DateTimeFormatter.ISO_LOCAL_DATE).appendLiteral(' ')
+ .append(DateTimeFormatter.ISO_LOCAL_TIME).toFormatter();
+
+ public RsvpSourceGenerator(KeyColumn keyColumn) {
+ _keyColumn = keyColumn;
+ }
+
+ public RSVP createMessage() {
+ String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + "";
+ ObjectNode json = JsonUtils.newObjectNode();
+ json.put("venue_name", "venue_name" + ThreadLocalRandom.current().nextInt());
+ json.put("event_name", "event_name" + ThreadLocalRandom.current().nextInt());
+ json.put("event_id", eventId);
+ json.put("event_time", DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
+ json.put("group_city", "group_city" + ThreadLocalRandom.current().nextInt());
+ json.put("group_country", "group_country" + ThreadLocalRandom.current().nextInt());
+ json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong()));
+ json.put("group_name", "group_name" + ThreadLocalRandom.current().nextInt());
+ json.put("group_lat", ThreadLocalRandom.current().nextFloat());
+ json.put("group_lon", ThreadLocalRandom.current().nextFloat());
+ json.put("mtime", DATE_TIME_FORMATTER.format(LocalDateTime.now()));
+ json.put("rsvp_count", 1);
+ return new RSVP(eventId, eventId, json);
+ }
+
+ @Override
+ public void init(Properties properties) {
+ }
+
+ @Override
+ public List<StreamDataProducer.RowWithKey> generateRows() {
+ RSVP msg = createMessage();
+ byte[] key;
+ switch (_keyColumn) {
+ case EVENT_ID:
+ key = msg.getEventId().getBytes(UTF_8);
+ break;
+ case RSVP_ID:
+ key = msg.getRsvpId().getBytes(UTF_8);
+ break;
+ default:
+ key = null;
+ break;
+ }
+ return ImmutableList.of(new StreamDataProducer.RowWithKey(key, msg.getPayload().toString().getBytes(UTF_8)));
+ }
+
+ @Override
+ public void close()
+ throws Exception {
+ }
+ public enum KeyColumn {
+ NONE,
+ EVENT_ID,
+ RSVP_ID
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/GithubPullRequestSourceGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/GithubPullRequestSourceGenerator.java
new file mode 100644
index 0000000000..3d1b6708b4
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/GithubPullRequestSourceGenerator.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams.githubevents;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.tools.QuickStartBase;
+import org.apache.pinot.tools.Quickstart;
+import org.apache.pinot.tools.streams.PinotSourceDataGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The class that pulls events from GitHub by RPC calls, and converts them into byte[] so we can write to Kafka
+ */
+public class GithubPullRequestSourceGenerator implements PinotSourceDataGenerator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(GithubPullRequestSourceGenerator.class);
+ private static final long SLEEP_MILLIS = 10_000;
+
+ private GitHubAPICaller _gitHubAPICaller;
+ private Schema _avroSchema;
+ private String _etag = null;
+
+ public GithubPullRequestSourceGenerator(File schemaFile, String personalAccessToken)
+ throws Exception {
+ try {
+ _avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(org.apache.pinot.spi.data.Schema.fromFile(schemaFile));
+ } catch (Exception e) {
+ LOGGER.error("Got exception while reading Pinot schema from file: [" + schemaFile.getName() + "]");
+ throw e;
+ }
+ _gitHubAPICaller = new GitHubAPICaller(personalAccessToken);
+ }
+
+ private GenericRecord convertToPullRequestMergedGenericRecord(JsonNode event)
+ throws IOException {
+ GenericRecord genericRecord = null;
+ String type = event.get("type").asText();
+
+ if ("PullRequestEvent".equals(type)) {
+ JsonNode payload = event.get("payload");
+ if (payload != null) {
+ String action = payload.get("action").asText();
+ JsonNode pullRequest = payload.get("pull_request");
+ String merged = pullRequest.get("merged").asText();
+ if ("closed".equals(action) && "true".equals(merged)) { // valid pull request merge event
+
+ JsonNode commits = null;
+ String commitsURL = pullRequest.get("commits_url").asText();
+ GitHubAPICaller.GitHubAPIResponse commitsResponse = _gitHubAPICaller.callAPI(commitsURL);
+
+ if (commitsResponse._responseString != null) {
+ commits = JsonUtils.stringToJsonNode(commitsResponse._responseString);
+ }
+
+ JsonNode reviewComments = null;
+ String reviewCommentsURL = pullRequest.get("review_comments_url").asText();
+ GitHubAPICaller.GitHubAPIResponse reviewCommentsResponse = _gitHubAPICaller.callAPI(reviewCommentsURL);
+ if (reviewCommentsResponse._responseString != null) {
+ reviewComments = JsonUtils.stringToJsonNode(reviewCommentsResponse._responseString);
+ }
+
+ JsonNode comments = null;
+ String commentsURL = pullRequest.get("comments_url").asText();
+ GitHubAPICaller.GitHubAPIResponse commentsResponse = _gitHubAPICaller.callAPI(commentsURL);
+ if (commentsResponse._responseString != null) {
+ comments = JsonUtils.stringToJsonNode(commentsResponse._responseString);
+ }
+
+ // get PullRequestMergeEvent
+ PullRequestMergedEvent pullRequestMergedEvent =
+ new PullRequestMergedEvent(event, commits, reviewComments, comments);
+ // make generic record
+ genericRecord = convertToGenericRecord(pullRequestMergedEvent);
+ }
+ }
+ }
+ return genericRecord;
+ }
+
+ /**
+ * Convert the PullRequestMergedEvent to a GenericRecord
+ */
+ private GenericRecord convertToGenericRecord(PullRequestMergedEvent pullRequestMergedEvent) {
+ GenericRecord genericRecord = new GenericData.Record(_avroSchema);
+
+ // Dimensions
+ genericRecord.put("title", pullRequestMergedEvent.getTitle());
+ genericRecord.put("labels", pullRequestMergedEvent.getLabels());
+ genericRecord.put("userId", pullRequestMergedEvent.getUserId());
+ genericRecord.put("userType", pullRequestMergedEvent.getUserType());
+ genericRecord.put("authorAssociation", pullRequestMergedEvent.getAuthorAssociation());
+ genericRecord.put("mergedBy", pullRequestMergedEvent.getMergedBy());
+ genericRecord.put("assignees", pullRequestMergedEvent.getAssignees());
+ genericRecord.put("committers", pullRequestMergedEvent.getCommitters());
+ genericRecord.put("reviewers", pullRequestMergedEvent.getReviewers());
+ genericRecord.put("commenters", pullRequestMergedEvent.getCommenters());
+ genericRecord.put("authors", pullRequestMergedEvent.getAuthors());
+ genericRecord.put("requestedReviewers", pullRequestMergedEvent.getRequestedReviewers());
+ genericRecord.put("requestedTeams", pullRequestMergedEvent.getRequestedTeams());
+ genericRecord.put("repo", pullRequestMergedEvent.getRepo());
+ genericRecord.put("organization", pullRequestMergedEvent.getOrganization());
+
+ // Metrics
+ genericRecord.put("numComments", pullRequestMergedEvent.getNumComments());
+ genericRecord.put("numReviewComments", pullRequestMergedEvent.getNumReviewComments());
+ genericRecord.put("numCommits", pullRequestMergedEvent.getNumCommits());
+ genericRecord.put("numLinesAdded", pullRequestMergedEvent.getNumLinesAdded());
+ genericRecord.put("numLinesDeleted", pullRequestMergedEvent.getNumLinesDeleted());
+ genericRecord.put("numFilesChanged", pullRequestMergedEvent.getNumFilesChanged());
+ genericRecord.put("numReviewers", pullRequestMergedEvent.getNumReviewers());
+ genericRecord.put("numCommenters", pullRequestMergedEvent.getNumCommenters());
+ genericRecord.put("numCommitters", pullRequestMergedEvent.getNumCommitters());
+ genericRecord.put("numAuthors", pullRequestMergedEvent.getNumAuthors());
+ genericRecord.put("createdTimeMillis", pullRequestMergedEvent.getCreatedTimeMillis());
+ genericRecord.put("elapsedTimeMillis", pullRequestMergedEvent.getElapsedTimeMillis());
+
+ // Time column
+ genericRecord.put("mergedTimeMillis", pullRequestMergedEvent.getMergedTimeMillis());
+
+ return genericRecord;
+ }
+
+ @Override
+ public void init(Properties properties) {
+ }
+
+ @Override
+ public List<StreamDataProducer.RowWithKey> generateRows() {
+ List<StreamDataProducer.RowWithKey> retVal = new ArrayList<>();
+ try {
+ GitHubAPICaller.GitHubAPIResponse githubAPIResponse = _gitHubAPICaller.callEventsAPI(_etag);
+ switch (githubAPIResponse._statusCode) {
+ case 200: // Read new events
+ _etag = githubAPIResponse._etag;
+ JsonNode jsonArray = JsonUtils.stringToJsonNode(githubAPIResponse._responseString);
+ for (JsonNode eventElement : jsonArray) {
+ try {
+ GenericRecord genericRecord = convertToPullRequestMergedGenericRecord(eventElement);
+ if (genericRecord != null) {
+ QuickStartBase.printStatus(Quickstart.Color.CYAN, genericRecord.toString());
+ retVal.add(
+ new StreamDataProducer.RowWithKey(null, genericRecord.toString().getBytes(StandardCharsets.UTF_8)));
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception in publishing generic record. Skipping", e);
+ }
+ }
+ break;
+ case 304: // Not Modified
+ Quickstart.printStatus(Quickstart.Color.YELLOW, "Not modified. Checking again in 10s.");
+ Thread.sleep(SLEEP_MILLIS);
+ break;
+ case 408: // Timeout
+ Quickstart.printStatus(Quickstart.Color.YELLOW, "Timeout. Trying again in 10s.");
+ Thread.sleep(SLEEP_MILLIS);
+ break;
+ case 403: // Rate Limit exceeded
+ Quickstart.printStatus(Quickstart.Color.YELLOW,
+ "Rate limit exceeded, sleeping until " + githubAPIResponse._resetTimeMs);
+ long sleepMs = Math.max(60_000L, githubAPIResponse._resetTimeMs - System.currentTimeMillis());
+ Thread.sleep(sleepMs);
+ break;
+ case 401: // Unauthorized
+ String msg = "Unauthorized call to GitHub events API. Status message: " + githubAPIResponse._statusMessage
+ + ". Exiting.";
+ Quickstart.printStatus(Quickstart.Color.YELLOW, msg);
+ throw new RuntimeException(msg);
+ default: // Unknown status code
+ Quickstart.printStatus(Quickstart.Color.YELLOW,
+ "Unknown status code " + githubAPIResponse._statusCode + " statusMessage "
+ + githubAPIResponse._statusMessage + ". Retry in 10s");
+ Thread.sleep(SLEEP_MILLIS);
+ break;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception in reading events data", e);
+ try {
+ Thread.sleep(SLEEP_MILLIS);
+ } catch (InterruptedException ex) {
+ LOGGER.error("Caught exception in retry", ex);
+ }
+ }
+ return retVal;
+ }
+
+ @Override
+ public void close()
+ throws Exception {
+ _gitHubAPICaller.shutdown();
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
index a1703d8f6f..950af0d260 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
@@ -18,24 +18,16 @@
*/
package org.apache.pinot.tools.streams.githubevents;
-import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import java.io.File;
-import java.io.IOException;
import java.net.URL;
-import java.nio.charset.StandardCharsets;
import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
-import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.Quickstart;
+import org.apache.pinot.tools.streams.PinotRealtimeSource;
+import org.apache.pinot.tools.streams.PinotSourceDataGenerator;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.apache.pinot.tools.utils.KinesisStarterUtils;
import org.apache.pinot.tools.utils.StreamSourceType;
@@ -53,38 +45,26 @@ import static org.apache.pinot.tools.Quickstart.printStatus;
*/
public class PullRequestMergedEventsStream {
private static final Logger LOGGER = LoggerFactory.getLogger(PullRequestMergedEventsStream.class);
- private static final long SLEEP_MILLIS = 10_000;
- private final ExecutorService _service;
- private boolean _keepStreaming = true;
-
- private final Schema _avroSchema;
- private final String _topicName;
- private final GitHubAPICaller _gitHubAPICaller;
-
- private StreamDataProducer _producer;
+ private PinotRealtimeSource _pinotStream;
public PullRequestMergedEventsStream(File schemaFile, String topicName, String personalAccessToken,
StreamDataProducer producer)
throws Exception {
- _service = Executors.newFixedThreadPool(2);
- try {
- _avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(org.apache.pinot.spi.data.Schema.fromFile(schemaFile));
- } catch (Exception e) {
- LOGGER.error("Got exception while reading Pinot schema from file: [" + schemaFile.getName() + "]");
- throw e;
- }
- _topicName = topicName;
- _gitHubAPICaller = new GitHubAPICaller(personalAccessToken);
- _producer = producer;
+ PinotSourceDataGenerator generator = new GithubPullRequestSourceGenerator(schemaFile, personalAccessToken);
+ _pinotStream =
+ PinotRealtimeSource.builder().setProducer(producer).setGenerator(generator).setTopic(topicName).build();
}
public PullRequestMergedEventsStream(String schemaFilePath, String topicName, String personalAccessToken,
StreamDataProducer producer)
throws Exception {
- _service = Executors.newFixedThreadPool(2);
+ this(getSchemaFile(schemaFilePath), topicName, personalAccessToken, producer);
+ }
+
+ public static File getSchemaFile(String schemaFilePath) {
+ File pinotSchema;
try {
- File pinotSchema;
if (schemaFilePath == null) {
ClassLoader classLoader = PullRequestMergedEventsStream.class.getClassLoader();
URL resource = classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_schema.json");
@@ -93,14 +73,11 @@ public class PullRequestMergedEventsStream {
} else {
pinotSchema = new File(schemaFilePath);
}
- _avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(org.apache.pinot.spi.data.Schema.fromFile(pinotSchema));
} catch (Exception e) {
LOGGER.error("Got exception while reading Pinot schema from file: [" + schemaFilePath + "]");
throw e;
}
- _topicName = topicName;
- _gitHubAPICaller = new GitHubAPICaller(personalAccessToken);
- _producer = producer;
+ return pinotSchema;
}
public static StreamDataProducer getKafkaStreamDataProducer()
@@ -181,187 +158,13 @@ public class PullRequestMergedEventsStream {
* Shuts down the stream.
*/
public void shutdown()
- throws IOException, InterruptedException {
+ throws Exception {
printStatus(Quickstart.Color.GREEN, "***** Shutting down pullRequestMergedEvents Stream *****");
- _keepStreaming = false;
- Thread.sleep(3000L);
- _gitHubAPICaller.shutdown();
- _producer.close();
- _producer = null;
- _service.shutdown();
- }
-
- /**
- * Publishes the message to the kafka topic
- */
- private void publish(GenericRecord message)
- throws IOException {
- if (!_keepStreaming) {
- return;
- }
- _producer.produce(_topicName, message.toString().getBytes(StandardCharsets.UTF_8));
+ _pinotStream.close();
}
public void start() {
-
printStatus(Quickstart.Color.CYAN, "***** Starting pullRequestMergedEvents Stream *****");
-
- _service.submit(() -> {
-
- String etag = null;
- while (true) {
- if (!_keepStreaming) {
- return;
- }
- try {
- GitHubAPICaller.GitHubAPIResponse githubAPIResponse = _gitHubAPICaller.callEventsAPI(etag);
- switch (githubAPIResponse._statusCode) {
- case 200: // Read new events
- etag = githubAPIResponse._etag;
- JsonNode jsonArray = JsonUtils.stringToJsonNode(githubAPIResponse._responseString);
- for (JsonNode eventElement : jsonArray) {
- try {
- GenericRecord genericRecord = convertToPullRequestMergedGenericRecord(eventElement);
- if (genericRecord != null) {
- printStatus(Quickstart.Color.CYAN, genericRecord.toString());
- publish(genericRecord);
- }
- } catch (Exception e) {
- LOGGER.error("Exception in publishing generic record. Skipping", e);
- }
- }
- break;
- case 304: // Not Modified
- printStatus(Quickstart.Color.YELLOW, "Not modified. Checking again in 10s.");
- Thread.sleep(SLEEP_MILLIS);
- break;
- case 408: // Timeout
- printStatus(Quickstart.Color.YELLOW, "Timeout. Trying again in 10s.");
- Thread.sleep(SLEEP_MILLIS);
- break;
- case 403: // Rate Limit exceeded
- printStatus(Quickstart.Color.YELLOW,
- "Rate limit exceeded, sleeping until " + githubAPIResponse._resetTimeMs);
- long sleepMs = Math.max(60_000L, githubAPIResponse._resetTimeMs - System.currentTimeMillis());
- Thread.sleep(sleepMs);
- break;
- case 401: // Unauthorized
- printStatus(Quickstart.Color.YELLOW,
- "Unauthorized call to GitHub events API. Status message: " + githubAPIResponse._statusMessage
- + ". Exiting.");
- return;
- default: // Unknown status code
- printStatus(Quickstart.Color.YELLOW,
- "Unknown status code " + githubAPIResponse._statusCode + " statusMessage "
- + githubAPIResponse._statusMessage + ". Retry in 10s");
- Thread.sleep(SLEEP_MILLIS);
- break;
- }
- } catch (Exception e) {
- LOGGER.error("Exception in reading events data", e);
- try {
- Thread.sleep(SLEEP_MILLIS);
- } catch (InterruptedException ex) {
- LOGGER.error("Caught exception in retry", ex);
- }
- }
- }
- });
- }
-
- /**
- * Checks for events of type PullRequestEvent which have action = closed and merged = true.
- * Find commits, review comments, comments corresponding to this pull request event.
- * Construct a PullRequestMergedEvent with the help of the event, commits, review comments and comments.
- * Converts PullRequestMergedEvent to GenericRecord
- * @param event
- */
- private GenericRecord convertToPullRequestMergedGenericRecord(JsonNode event)
- throws IOException {
- GenericRecord genericRecord = null;
- String type = event.get("type").asText();
-
- if ("PullRequestEvent".equals(type)) {
- JsonNode payload = event.get("payload");
- if (payload != null) {
- String action = payload.get("action").asText();
- JsonNode pullRequest = payload.get("pull_request");
- String merged = pullRequest.get("merged").asText();
- if ("closed".equals(action) && "true".equals(merged)) { // valid pull request merge event
-
- JsonNode commits = null;
- String commitsURL = pullRequest.get("commits_url").asText();
- GitHubAPICaller.GitHubAPIResponse commitsResponse = _gitHubAPICaller.callAPI(commitsURL);
-
- if (commitsResponse._responseString != null) {
- commits = JsonUtils.stringToJsonNode(commitsResponse._responseString);
- }
-
- JsonNode reviewComments = null;
- String reviewCommentsURL = pullRequest.get("review_comments_url").asText();
- GitHubAPICaller.GitHubAPIResponse reviewCommentsResponse = _gitHubAPICaller.callAPI(reviewCommentsURL);
- if (reviewCommentsResponse._responseString != null) {
- reviewComments = JsonUtils.stringToJsonNode(reviewCommentsResponse._responseString);
- }
-
- JsonNode comments = null;
- String commentsURL = pullRequest.get("comments_url").asText();
- GitHubAPICaller.GitHubAPIResponse commentsResponse = _gitHubAPICaller.callAPI(commentsURL);
- if (commentsResponse._responseString != null) {
- comments = JsonUtils.stringToJsonNode(commentsResponse._responseString);
- }
-
- // get PullRequestMergeEvent
- PullRequestMergedEvent pullRequestMergedEvent =
- new PullRequestMergedEvent(event, commits, reviewComments, comments);
- // make generic record
- genericRecord = convertToGenericRecord(pullRequestMergedEvent);
- }
- }
- }
- return genericRecord;
- }
-
- /**
- * Convert the PullRequestMergedEvent to a GenericRecord
- */
- private GenericRecord convertToGenericRecord(PullRequestMergedEvent pullRequestMergedEvent) {
- GenericRecord genericRecord = new GenericData.Record(_avroSchema);
-
- // Dimensions
- genericRecord.put("title", pullRequestMergedEvent.getTitle());
- genericRecord.put("labels", pullRequestMergedEvent.getLabels());
- genericRecord.put("userId", pullRequestMergedEvent.getUserId());
- genericRecord.put("userType", pullRequestMergedEvent.getUserType());
- genericRecord.put("authorAssociation", pullRequestMergedEvent.getAuthorAssociation());
- genericRecord.put("mergedBy", pullRequestMergedEvent.getMergedBy());
- genericRecord.put("assignees", pullRequestMergedEvent.getAssignees());
- genericRecord.put("committers", pullRequestMergedEvent.getCommitters());
- genericRecord.put("reviewers", pullRequestMergedEvent.getReviewers());
- genericRecord.put("commenters", pullRequestMergedEvent.getCommenters());
- genericRecord.put("authors", pullRequestMergedEvent.getAuthors());
- genericRecord.put("requestedReviewers", pullRequestMergedEvent.getRequestedReviewers());
- genericRecord.put("requestedTeams", pullRequestMergedEvent.getRequestedTeams());
- genericRecord.put("repo", pullRequestMergedEvent.getRepo());
- genericRecord.put("organization", pullRequestMergedEvent.getOrganization());
-
- // Metrics
- genericRecord.put("numComments", pullRequestMergedEvent.getNumComments());
- genericRecord.put("numReviewComments", pullRequestMergedEvent.getNumReviewComments());
- genericRecord.put("numCommits", pullRequestMergedEvent.getNumCommits());
- genericRecord.put("numLinesAdded", pullRequestMergedEvent.getNumLinesAdded());
- genericRecord.put("numLinesDeleted", pullRequestMergedEvent.getNumLinesDeleted());
- genericRecord.put("numFilesChanged", pullRequestMergedEvent.getNumFilesChanged());
- genericRecord.put("numReviewers", pullRequestMergedEvent.getNumReviewers());
- genericRecord.put("numCommenters", pullRequestMergedEvent.getNumCommenters());
- genericRecord.put("numCommitters", pullRequestMergedEvent.getNumCommitters());
- genericRecord.put("numAuthors", pullRequestMergedEvent.getNumAuthors());
- genericRecord.put("createdTimeMillis", pullRequestMergedEvent.getCreatedTimeMillis());
- genericRecord.put("elapsedTimeMillis", pullRequestMergedEvent.getElapsedTimeMillis());
-
- // Time column
- genericRecord.put("mergedTimeMillis", pullRequestMergedEvent.getMergedTimeMillis());
-
- return genericRecord;
+ _pinotStream.run();
}
}
diff --git a/pinot-tools/src/test/java/org/apache/pinot/tools/streams/PinotRealtimeSourceTest.java b/pinot-tools/src/test/java/org/apache/pinot/tools/streams/PinotRealtimeSourceTest.java
new file mode 100644
index 0000000000..14f915d62a
--- /dev/null
+++ b/pinot-tools/src/test/java/org/apache/pinot/tools/streams/PinotRealtimeSourceTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.streams;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class PinotRealtimeSourceTest {
+
+ @Test
+ public void testBuilder() {
+ StreamDataProducer producer = Mockito.mock(StreamDataProducer.class);
+ PinotSourceDataGenerator generator = Mockito.mock(PinotSourceDataGenerator.class);
+ PinotRealtimeSource realtimeSource =
+ PinotRealtimeSource.builder().setTopic("mytopic").setProducer(producer).setGenerator(generator).build();
+ Assert.assertNotNull(realtimeSource);
+
+ PinotStreamRateLimiter limiter = Mockito.mock(PinotStreamRateLimiter.class);
+ ExecutorService executorService = Mockito.mock(ExecutorService.class);
+ realtimeSource = PinotRealtimeSource.builder().setRateLimiter(limiter).setProducer(producer).setGenerator(generator)
+ .setTopic("mytopic").setExecutor(executorService).setMaxMessagePerSecond(9527).build();
+ Assert.assertEquals(realtimeSource._executor, executorService);
+ Assert.assertEquals(realtimeSource._producer, producer);
+ Assert.assertEquals(realtimeSource._topicName, "mytopic");
+ String qps = realtimeSource._properties.get(PinotRealtimeSource.KEY_OF_MAX_MESSAGE_PER_SECOND).toString();
+ Assert.assertNotNull(qps);
+ Assert.assertEquals(qps, "9527");
+ Assert.assertEquals(realtimeSource._rateLimiter, limiter);
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testBuilderNoNullProducerThrowExceptions() {
+ PinotSourceDataGenerator generator = Mockito.mock(PinotSourceDataGenerator.class);
+ PinotRealtimeSource realtimeSource =
+ PinotRealtimeSource.builder().setTopic("mytopic").setGenerator(generator).build();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testBuilderNoNullGeneratorThrowExceptions() {
+ StreamDataProducer producer = Mockito.mock(StreamDataProducer.class);
+ PinotRealtimeSource realtimeSource =
+ PinotRealtimeSource.builder().setTopic("mytopic").setProducer(producer).build();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org