You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/04/19 22:36:32 UTC

[GitHub] [pinot] dongxiaoman opened a new pull request, #8567: [draft] Refactor quickstart data source

dongxiaoman opened a new pull request, #8567:
URL: https://github.com/apache/pinot/pull/8567

   Instructions:
   1. The PR has to be tagged with at least one of the following labels (*):
      1. `feature`
      2. `bugfix`
      3. `performance`
      4. `ui`
      5. `backward-incompat`
      6. `release-notes` (**)
   2. Remove these instructions before publishing the PR.
    
   (*) Other labels to consider:
   - `testing`
   - `dependencies`
   - `docker`
   - `kubernetes`
   - `observability`
   - `security`
   - `code-style`
   - `extension-point`
   - `refactor`
   - `cleanup`
   
   (**) Use `release-notes` label for scenarios like:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dongxiaoman commented on a diff in pull request #8567: [draft] Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
dongxiaoman commented on code in PR #8567:
URL: https://github.com/apache/pinot/pull/8567#discussion_r855611494


##########
pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java:
##########
@@ -60,94 +41,40 @@ public MeetupRsvpStream()
 
   public MeetupRsvpStream(boolean partitionByKey)
       throws Exception {
-    _partitionByKey = partitionByKey;
+    // calling this constructor means that we wish to use EVENT_ID as key. RsvpId is used by MeetupRsvpJsonStream
+    this(partitionByKey ? RsvpSourceGenerator.KeyColumn.EVENT_ID : RsvpSourceGenerator.KeyColumn.NONE);
+  }
 
+  public MeetupRsvpStream(RsvpSourceGenerator.KeyColumn keyColumn)
+      throws Exception {
     Properties properties = new Properties();
     properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
-    _producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
-    _source = new Source(createConsumer());
+    StreamDataProducer producer =
+        StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
+    _pinotRealtimeSource =
+        PinotRealtimeSource.builder().setGenerator(new RsvpSourceGenerator(keyColumn)).setProducer(producer)
+            .setRateLimiter(permits -> {
+              int delay = (int) (Math.log(ThreadLocalRandom.current().nextDouble()) / Math.log(0.999)) + 1;
+              try {
+                Thread.sleep(delay);
+              } catch (InterruptedException ex) {
+                LOGGER.warn("Interrupted from sleep but will continue", ex);
+              }
+            }).build();
   }
 
   public void run()
       throws Exception {
-    _source.start();
+    _pinotRealtimeSource.run();
   }
 
   public void stopPublishing() {
-    _producer.close();
-    _source.close();
-  }
-
-  protected Consumer<RSVP> createConsumer() {
-    return message -> {
-      try {
-        if (_partitionByKey) {
-          _producer.produce(_topicName, message.getEventId().getBytes(UTF_8),

Review Comment:
   For Reviewer: Note that this line is the difference with `MeetupRsvpJsonStream`. `getEventId()` is the different key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dongxiaoman commented on a diff in pull request #8567: [draft] Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
dongxiaoman commented on code in PR #8567:
URL: https://github.com/apache/pinot/pull/8567#discussion_r855611007


##########
pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpJsonStream.java:
##########
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.tools.streams;
-
-import java.util.function.Consumer;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-
-public class MeetupRsvpJsonStream extends MeetupRsvpStream {
-
-  public MeetupRsvpJsonStream()
-      throws Exception {
-    super();
-  }
-
-  public MeetupRsvpJsonStream(boolean partitionByKey)
-      throws Exception {
-    super(partitionByKey);
-  }
-
-  @Override
-  protected Consumer<RSVP> createConsumer() {
-    return message -> {
-      if (_partitionByKey) {
-        try {
-          _producer.produce(_topicName, message.getRsvpId().getBytes(UTF_8), message.getPayload().toString()

Review Comment:
   For Reviewer: this line is the only difference from its parent class. That the partitioning key is using `RsvpId()` instead of `EventId()`. So in the refactoring I just make the Generator configurable with partition key, and removed this file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dongxiaoman commented on a diff in pull request #8567: Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
dongxiaoman commented on code in PR #8567:
URL: https://github.com/apache/pinot/pull/8567#discussion_r857821451


##########
pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.RowWithKey;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Represents one Pinot Real Time Source that is capable of
+ * 1. Keep running forever
+ * 2. Pull from generator and write into StreamDataProducer
+ * The Source has a thread that is looping forever.
+ */
+public class PinotRealtimeSource implements AutoCloseable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSource.class);
+  public static final String KEY_OF_MAX_MESSAGE_PER_SECOND = "pinot.stream.max.message.per.second";
+  public static final String KEY_OF_TOPIC_NAME = "pinot.topic.name";
+  public static final long DEFAULT_MAX_MESSAGE_PER_SECOND = Long.MAX_VALUE;
+  public static final long DEFAULT_EMPTY_SOURCE_SLEEP_MS = 10;
+  final StreamDataProducer _producer;
+  final PinotSourceGenerator _generator;
+  final String _topicName;
+  final ExecutorService _executor;
+  final Properties _properties;
+  PinotStreamRateLimiter _rateLimiter;
+  protected volatile boolean _shutdown;
+
+  /**
+   * Constructs a source by passing in a Properties file, a generator, and a producer
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceGenerator generator, StreamDataProducer producer) {
+    this(settings, generator, producer, null, null);
+  }
+
+  /**
+   * Constructs a source by passing in properties file, a generator, a producer and an executor service
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   * @param executor the preferred executor instead of creating a thread pool. Null for default one
+   * @param rateLimiter the specialized rate limiter for customization. Null for default guava one
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceGenerator generator, StreamDataProducer producer,
+      @Nullable ExecutorService executor, @Nullable PinotStreamRateLimiter rateLimiter) {
+    _properties = settings;
+    _producer = producer;
+    Preconditions.checkNotNull(_producer, "Producer of a stream cannot be null");
+    _generator = generator;
+    Preconditions.checkNotNull(_generator, "Generator of a stream cannot be null");
+    _executor = executor == null ? Executors.newSingleThreadExecutor() : executor;
+    _topicName = settings.getProperty(KEY_OF_TOPIC_NAME);
+    Preconditions.checkNotNull(_topicName, "Topic name needs to be set via " + KEY_OF_TOPIC_NAME);
+    _rateLimiter = rateLimiter == null ? new GuavaRateLimiter(extractMaxQps(settings)) : rateLimiter;
+  }
+
+  public void run() {
+    _executor.execute(() -> {
+      while (!_shutdown) {
+        List<RowWithKey> rows = _generator.generateRows();
+        // we expect the generator implementation to return empty rows when there is no data available
+        // as a stream, we expect data to be available all the time
+        if (rows.isEmpty()) {
+          try {
+            Thread.sleep(DEFAULT_EMPTY_SOURCE_SLEEP_MS);
+          } catch (InterruptedException ex) {
+            LOGGER.warn("Interrupted from sleep, will check shutdown flag later", ex);
+          }
+        } else {
+          _rateLimiter.acquire(rows.size());
+          if (!_shutdown) {
+            _producer.produceKeyedBatch(_topicName, rows);

Review Comment:
   The `default void produceKeyedBatch()` is introduced so we can have a batch interface too. I hope to introduce this method so if the stream producer does RPC calls per method, we can easily batch to improve throughput. (E.g., if the `_producer` is behind a microservice endpoint) In that case the implementation in the `StreamProducer` can override the `produceKeyedBatch` to provide a more efficient call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #8567: [draft] Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #8567:
URL: https://github.com/apache/pinot/pull/8567#issuecomment-1104531670

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/8567?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#8567](https://codecov.io/gh/apache/pinot/pull/8567?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aa693bb) into [master](https://codecov.io/gh/apache/pinot/commit/7e060fdbdd02ee1e83cf44655423613010ae607d?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7e060fd) will **increase** coverage by `3.34%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #8567      +/-   ##
   ============================================
   + Coverage     63.68%   67.03%   +3.34%     
   - Complexity     4224     4226       +2     
   ============================================
     Files          1677     1280     -397     
     Lines         88078    64411   -23667     
     Branches      13354    10111    -3243     
   ============================================
   - Hits          56094    43179   -12915     
   + Misses        27851    18129    -9722     
   + Partials       4133     3103    -1030     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `67.03% <0.00%> (-0.03%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/8567?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...rg/apache/pinot/spi/stream/StreamDataProducer.java](https://codecov.io/gh/apache/pinot/pull/8567/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL1N0cmVhbURhdGFQcm9kdWNlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...va/org/apache/pinot/core/routing/RoutingTable.java](https://codecov.io/gh/apache/pinot/pull/8567/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9yb3V0aW5nL1JvdXRpbmdUYWJsZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/pinot/common/config/NettyConfig.java](https://codecov.io/gh/apache/pinot/pull/8567/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL05ldHR5Q29uZmlnLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/pinot/common/metrics/MinionMeter.java](https://codecov.io/gh/apache/pinot/pull/8567/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25NZXRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/pinot/common/metrics/ControllerMeter.java](https://codecov.io/gh/apache/pinot/pull/8567/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Db250cm9sbGVyTWV0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/BrokerQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/8567/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Ccm9rZXJRdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/MinionQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/8567/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25RdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...apache/pinot/common/helix/ExtraInstanceConfig.java](https://codecov.io/gh/apache/pinot/pull/8567/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vaGVsaXgvRXh0cmFJbnN0YW5jZUNvbmZpZy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ache/pinot/server/access/AccessControlFactory.java](https://codecov.io/gh/apache/pinot/pull/8567/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VydmVyL2FjY2Vzcy9BY2Nlc3NDb250cm9sRmFjdG9yeS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...he/pinot/common/messages/SegmentReloadMessage.java](https://codecov.io/gh/apache/pinot/pull/8567/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWVzc2FnZXMvU2VnbWVudFJlbG9hZE1lc3NhZ2UuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [618 more](https://codecov.io/gh/apache/pinot/pull/8567/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/8567?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/8567?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [7e060fd...aa693bb](https://codecov.io/gh/apache/pinot/pull/8567?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dongxiaoman commented on a diff in pull request #8567: Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
dongxiaoman commented on code in PR #8567:
URL: https://github.com/apache/pinot/pull/8567#discussion_r857821451


##########
pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.RowWithKey;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Represents one Pinot Real Time Source that is capable of
+ * 1. Keep running forever
+ * 2. Pull from generator and write into StreamDataProducer
+ * The Source has a thread that is looping forever.
+ */
+public class PinotRealtimeSource implements AutoCloseable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSource.class);
+  public static final String KEY_OF_MAX_MESSAGE_PER_SECOND = "pinot.stream.max.message.per.second";
+  public static final String KEY_OF_TOPIC_NAME = "pinot.topic.name";
+  public static final long DEFAULT_MAX_MESSAGE_PER_SECOND = Long.MAX_VALUE;
+  public static final long DEFAULT_EMPTY_SOURCE_SLEEP_MS = 10;
+  final StreamDataProducer _producer;
+  final PinotSourceGenerator _generator;
+  final String _topicName;
+  final ExecutorService _executor;
+  final Properties _properties;
+  PinotStreamRateLimiter _rateLimiter;
+  protected volatile boolean _shutdown;
+
+  /**
+   * Constructs a source by passing in a Properties file, a generator, and a producer
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceGenerator generator, StreamDataProducer producer) {
+    this(settings, generator, producer, null, null);
+  }
+
+  /**
+   * Constructs a source by passing in properties file, a generator, a producer and an executor service
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   * @param executor the preferred executor instead of creating a thread pool. Null for default one
+   * @param rateLimiter the specialized rate limiter for customization. Null for default guava one
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceGenerator generator, StreamDataProducer producer,
+      @Nullable ExecutorService executor, @Nullable PinotStreamRateLimiter rateLimiter) {
+    _properties = settings;
+    _producer = producer;
+    Preconditions.checkNotNull(_producer, "Producer of a stream cannot be null");
+    _generator = generator;
+    Preconditions.checkNotNull(_generator, "Generator of a stream cannot be null");
+    _executor = executor == null ? Executors.newSingleThreadExecutor() : executor;
+    _topicName = settings.getProperty(KEY_OF_TOPIC_NAME);
+    Preconditions.checkNotNull(_topicName, "Topic name needs to be set via " + KEY_OF_TOPIC_NAME);
+    _rateLimiter = rateLimiter == null ? new GuavaRateLimiter(extractMaxQps(settings)) : rateLimiter;
+  }
+
+  public void run() {
+    _executor.execute(() -> {
+      while (!_shutdown) {
+        List<RowWithKey> rows = _generator.generateRows();
+        // we expect the generator implementation to return empty rows when there is no data available
+        // as a stream, we expect data to be available all the time
+        if (rows.isEmpty()) {
+          try {
+            Thread.sleep(DEFAULT_EMPTY_SOURCE_SLEEP_MS);
+          } catch (InterruptedException ex) {
+            LOGGER.warn("Interrupted from sleep, will check shutdown flag later", ex);
+          }
+        } else {
+          _rateLimiter.acquire(rows.size());
+          if (!_shutdown) {
+            _producer.produceKeyedBatch(_topicName, rows);

Review Comment:
   The `default void produceKeyedBatch()` is introduced so we can have a batch interface too. I hope to introduce this method so if the stream producer does RPC calls per method, we can easily batch to improve throughput. (E.g., if the `_producer` is behind a microservice endpoint)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dongxiaoman commented on a diff in pull request #8567: [draft] Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
dongxiaoman commented on code in PR #8567:
URL: https://github.com/apache/pinot/pull/8567#discussion_r855612260


##########
pinot-tools/src/main/java/org/apache/pinot/tools/streams/RsvpSourceGenerator.java:
##########
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.tools.streams;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.ImmutableList;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.pinot.spi.stream.RowWithKey;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * A simple random generator that fakes RSVP
+ */
+public class RsvpSourceGenerator implements PinotSourceGenerator {
+  private final KeyColumn _keyColumn;
+  public static final DateTimeFormatter DATE_TIME_FORMATTER =
+      new DateTimeFormatterBuilder().parseCaseInsensitive().append(DateTimeFormatter.ISO_LOCAL_DATE).appendLiteral(' ')
+          .append(DateTimeFormatter.ISO_LOCAL_TIME).toFormatter();
+
+  public RsvpSourceGenerator(KeyColumn keyColumn) {
+    _keyColumn = keyColumn;
+  }
+
+  public RSVP createMessage() {
+    String eventId = Math.abs(ThreadLocalRandom.current().nextLong()) + "";
+    ObjectNode json = JsonUtils.newObjectNode();
+    json.put("venue_name", "venue_name" + ThreadLocalRandom.current().nextInt());
+    json.put("event_name", "event_name" + ThreadLocalRandom.current().nextInt());
+    json.put("event_id", eventId);
+    json.put("event_time", DATE_TIME_FORMATTER.format(LocalDateTime.now().plusDays(10)));
+    json.put("group_city", "group_city" + ThreadLocalRandom.current().nextInt());
+    json.put("group_country", "group_country" + ThreadLocalRandom.current().nextInt());
+    json.put("group_id", Math.abs(ThreadLocalRandom.current().nextLong()));
+    json.put("group_name", "group_name" + ThreadLocalRandom.current().nextInt());
+    json.put("group_lat", ThreadLocalRandom.current().nextFloat());
+    json.put("group_lon", ThreadLocalRandom.current().nextFloat());
+    json.put("mtime", DATE_TIME_FORMATTER.format(LocalDateTime.now()));
+    json.put("rsvp_count", 1);
+    return new RSVP(eventId, eventId, json);
+  }
+
+  @Override
+  public void init(Properties properties) {
+  }
+
+  @Override
+  public List<RowWithKey> generateRows() {
+    RSVP msg = createMessage();
+    byte[] key;
+    switch (_keyColumn) {

Review Comment:
   For reviewer: the RsvpStream and RsvpJsonStream is merged to use this class instead, and using the config of _keyColumn to apply different key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin merged pull request #8567: Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
richardstartin merged PR #8567:
URL: https://github.com/apache/pinot/pull/8567


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dongxiaoman commented on a diff in pull request #8567: Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
dongxiaoman commented on code in PR #8567:
URL: https://github.com/apache/pinot/pull/8567#discussion_r857826852


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowWithKey.java:
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.spi.stream;
+
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+
+/**
+ * Helper class so the key and payload can be easily tied together instead of using pair
+ */
+public class RowWithKey {

Review Comment:
   Good point. I moved the classed to be inside `StreamDataProducer` so it will not be used in other places. If in future people see the need for such class they can do refactor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dongxiaoman commented on a diff in pull request #8567: Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
dongxiaoman commented on code in PR #8567:
URL: https://github.com/apache/pinot/pull/8567#discussion_r857827090


##########
pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotSourceGenerator.java:
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.spi.stream.RowWithKey;
+
+
+/**
+ * Represents one Pinot Real Time Data Source that can constantly generate data
+ * For example it can be pulling a batch from Kafka, or polling some data via HTTP GET
+ * The generator will be driven by PinotRealtimeSource to keep producing into some downstream sink
+ */
+public interface PinotSourceGenerator extends AutoCloseable {

Review Comment:
   Thanks. Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] dongxiaoman commented on a diff in pull request #8567: Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
dongxiaoman commented on code in PR #8567:
URL: https://github.com/apache/pinot/pull/8567#discussion_r857821451


##########
pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.RowWithKey;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Represents one Pinot Real Time Source that is capable of
+ * 1. Keep running forever
+ * 2. Pull from generator and write into StreamDataProducer
+ * The Source has a thread that is looping forever.
+ */
+public class PinotRealtimeSource implements AutoCloseable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSource.class);
+  public static final String KEY_OF_MAX_MESSAGE_PER_SECOND = "pinot.stream.max.message.per.second";
+  public static final String KEY_OF_TOPIC_NAME = "pinot.topic.name";
+  public static final long DEFAULT_MAX_MESSAGE_PER_SECOND = Long.MAX_VALUE;
+  public static final long DEFAULT_EMPTY_SOURCE_SLEEP_MS = 10;
+  final StreamDataProducer _producer;
+  final PinotSourceGenerator _generator;
+  final String _topicName;
+  final ExecutorService _executor;
+  final Properties _properties;
+  PinotStreamRateLimiter _rateLimiter;
+  protected volatile boolean _shutdown;
+
+  /**
+   * Constructs a source by passing in a Properties file, a generator, and a producer
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceGenerator generator, StreamDataProducer producer) {
+    this(settings, generator, producer, null, null);
+  }
+
+  /**
+   * Constructs a source by passing in properties file, a generator, a producer and an executor service
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   * @param executor the preferred executor instead of creating a thread pool. Null for default one
+   * @param rateLimiter the specialized rate limiter for customization. Null for default guava one
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceGenerator generator, StreamDataProducer producer,
+      @Nullable ExecutorService executor, @Nullable PinotStreamRateLimiter rateLimiter) {
+    _properties = settings;
+    _producer = producer;
+    Preconditions.checkNotNull(_producer, "Producer of a stream cannot be null");
+    _generator = generator;
+    Preconditions.checkNotNull(_generator, "Generator of a stream cannot be null");
+    _executor = executor == null ? Executors.newSingleThreadExecutor() : executor;
+    _topicName = settings.getProperty(KEY_OF_TOPIC_NAME);
+    Preconditions.checkNotNull(_topicName, "Topic name needs to be set via " + KEY_OF_TOPIC_NAME);
+    _rateLimiter = rateLimiter == null ? new GuavaRateLimiter(extractMaxQps(settings)) : rateLimiter;
+  }
+
+  public void run() {
+    _executor.execute(() -> {
+      while (!_shutdown) {
+        List<RowWithKey> rows = _generator.generateRows();
+        // we expect the generator implementation to return empty rows when there is no data available
+        // as a stream, we expect data to be available all the time
+        if (rows.isEmpty()) {
+          try {
+            Thread.sleep(DEFAULT_EMPTY_SOURCE_SLEEP_MS);
+          } catch (InterruptedException ex) {
+            LOGGER.warn("Interrupted from sleep, will check shutdown flag later", ex);
+          }
+        } else {
+          _rateLimiter.acquire(rows.size());
+          if (!_shutdown) {
+            _producer.produceKeyedBatch(_topicName, rows);

Review Comment:
   The `default void produceKeyedBatch()` is introduced so we can have a batch interface too. I hope to introduce this method so we can easily batch produce rows to improve throughput. (E.g., if the `_producer` is behind a microservice endpoint) In that case the implementation in the `StreamProducer` can override the `produceKeyedBatch` to provide a more efficient call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8567: Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8567:
URL: https://github.com/apache/pinot/pull/8567#discussion_r858097006


##########
pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.RowWithKey;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Represents one Pinot Real Time Source that is capable of
+ * 1. Keep running forever
+ * 2. Pull from generator and write into StreamDataProducer
+ * The Source has a thread that is looping forever.
+ */
+public class PinotRealtimeSource implements AutoCloseable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSource.class);
+  public static final String KEY_OF_MAX_MESSAGE_PER_SECOND = "pinot.stream.max.message.per.second";
+  public static final String KEY_OF_TOPIC_NAME = "pinot.topic.name";
+  public static final long DEFAULT_MAX_MESSAGE_PER_SECOND = Long.MAX_VALUE;
+  public static final long DEFAULT_EMPTY_SOURCE_SLEEP_MS = 10;
+  final StreamDataProducer _producer;
+  final PinotSourceGenerator _generator;
+  final String _topicName;
+  final ExecutorService _executor;
+  final Properties _properties;
+  PinotStreamRateLimiter _rateLimiter;
+  protected volatile boolean _shutdown;
+
+  /**
+   * Constructs a source by passing in a Properties file, a generator, and a producer
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceGenerator generator, StreamDataProducer producer) {
+    this(settings, generator, producer, null, null);
+  }
+
+  /**
+   * Constructs a source by passing in properties file, a generator, a producer and an executor service
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   * @param executor the preferred executor instead of creating a thread pool. Null for default one
+   * @param rateLimiter the specialized rate limiter for customization. Null for default guava one
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceGenerator generator, StreamDataProducer producer,
+      @Nullable ExecutorService executor, @Nullable PinotStreamRateLimiter rateLimiter) {
+    _properties = settings;
+    _producer = producer;
+    Preconditions.checkNotNull(_producer, "Producer of a stream cannot be null");
+    _generator = generator;
+    Preconditions.checkNotNull(_generator, "Generator of a stream cannot be null");
+    _executor = executor == null ? Executors.newSingleThreadExecutor() : executor;
+    _topicName = settings.getProperty(KEY_OF_TOPIC_NAME);
+    Preconditions.checkNotNull(_topicName, "Topic name needs to be set via " + KEY_OF_TOPIC_NAME);
+    _rateLimiter = rateLimiter == null ? new GuavaRateLimiter(extractMaxQps(settings)) : rateLimiter;
+  }
+
+  public void run() {
+    _executor.execute(() -> {
+      while (!_shutdown) {
+        List<RowWithKey> rows = _generator.generateRows();
+        // we expect the generator implementation to return empty rows when there is no data available
+        // as a stream, we expect data to be available all the time
+        if (rows.isEmpty()) {
+          try {
+            Thread.sleep(DEFAULT_EMPTY_SOURCE_SLEEP_MS);
+          } catch (InterruptedException ex) {
+            LOGGER.warn("Interrupted from sleep, will check shutdown flag later", ex);
+          }
+        } else {
+          _rateLimiter.acquire(rows.size());
+          if (!_shutdown) {
+            _producer.produceKeyedBatch(_topicName, rows);

Review Comment:
   +1. this actually is very helpful. https://github.com/apache/pinot/issues/8537 is likely due to a batch produce instability. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] walterddr commented on a diff in pull request #8567: Refactor quickstart data source

Posted by GitBox <gi...@apache.org>.
walterddr commented on code in PR #8567:
URL: https://github.com/apache/pinot/pull/8567#discussion_r856927279


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowWithKey.java:
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.spi.stream;
+
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+
+/**
+ * Helper class so the key and payload can be easily tied together instead of using pair
+ */
+public class RowWithKey {

Review Comment:
   since this goes into SPI. we need to be careful explaining what's the intended usage. could you give a bit more in the javadoc?
   
   At the first glance, I thought it is representing a row in a partitioned table with the partition key. 
   
   Also this seems to be an interface used by pinot-tools, it might work better to put it in the pinot-tools module as a helper utils (along with produce batch)



##########
pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotSourceGenerator.java:
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.spi.stream.RowWithKey;
+
+
+/**
+ * Represents one Pinot Real Time Data Source that can constantly generate data
+ * For example it can be pulling a batch from Kafka, or polling some data via HTTP GET
+ * The generator will be driven by PinotRealtimeSource to keep producing into some downstream sink
+ */
+public interface PinotSourceGenerator extends AutoCloseable {

Review Comment:
   SourceGenerator sounds like you are generating a subclass of PinotRealtimeSource. `PinotSourceDataGenerator` may be a better name.



##########
pinot-tools/src/main/java/org/apache/pinot/tools/streams/PinotRealtimeSource.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.streams;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.RowWithKey;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Represents one Pinot Real Time Source that is capable of
+ * 1. Keep running forever
+ * 2. Pull from generator and write into StreamDataProducer
+ * The Source has a thread that is looping forever.
+ */
+public class PinotRealtimeSource implements AutoCloseable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSource.class);
+  public static final String KEY_OF_MAX_MESSAGE_PER_SECOND = "pinot.stream.max.message.per.second";
+  public static final String KEY_OF_TOPIC_NAME = "pinot.topic.name";
+  public static final long DEFAULT_MAX_MESSAGE_PER_SECOND = Long.MAX_VALUE;
+  public static final long DEFAULT_EMPTY_SOURCE_SLEEP_MS = 10;
+  final StreamDataProducer _producer;
+  final PinotSourceGenerator _generator;
+  final String _topicName;
+  final ExecutorService _executor;
+  final Properties _properties;
+  PinotStreamRateLimiter _rateLimiter;
+  protected volatile boolean _shutdown;
+
+  /**
+   * Constructs a source by passing in a Properties file, a generator, and a producer
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceGenerator generator, StreamDataProducer producer) {
+    this(settings, generator, producer, null, null);
+  }
+
+  /**
+   * Constructs a source by passing in properties file, a generator, a producer and an executor service
+   * @param settings the settings for all components passed in
+   * @param generator the generator that can create data
+   * @param producer the producer to write the generator's data into
+   * @param executor the preferred executor instead of creating a thread pool. Null for default one
+   * @param rateLimiter the specialized rate limiter for customization. Null for default guava one
+   */
+  public PinotRealtimeSource(Properties settings, PinotSourceGenerator generator, StreamDataProducer producer,
+      @Nullable ExecutorService executor, @Nullable PinotStreamRateLimiter rateLimiter) {
+    _properties = settings;
+    _producer = producer;
+    Preconditions.checkNotNull(_producer, "Producer of a stream cannot be null");
+    _generator = generator;
+    Preconditions.checkNotNull(_generator, "Generator of a stream cannot be null");
+    _executor = executor == null ? Executors.newSingleThreadExecutor() : executor;
+    _topicName = settings.getProperty(KEY_OF_TOPIC_NAME);
+    Preconditions.checkNotNull(_topicName, "Topic name needs to be set via " + KEY_OF_TOPIC_NAME);
+    _rateLimiter = rateLimiter == null ? new GuavaRateLimiter(extractMaxQps(settings)) : rateLimiter;
+  }
+
+  public void run() {
+    _executor.execute(() -> {
+      while (!_shutdown) {
+        List<RowWithKey> rows = _generator.generateRows();
+        // we expect the generator implementation to return empty rows when there is no data available
+        // as a stream, we expect data to be available all the time
+        if (rows.isEmpty()) {
+          try {
+            Thread.sleep(DEFAULT_EMPTY_SOURCE_SLEEP_MS);
+          } catch (InterruptedException ex) {
+            LOGGER.warn("Interrupted from sleep, will check shutdown flag later", ex);
+          }
+        } else {
+          _rateLimiter.acquire(rows.size());
+          if (!_shutdown) {
+            _producer.produceKeyedBatch(_topicName, rows);

Review Comment:
   since we already consolidated all the generator / producer usage all in this class. let's put the batch operation API here as a helper utils. 
   ```
   static produceBatchRowWithKey(_producer, _topicName, rows) {
     // .. the default impl 
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org