You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "navina (via GitHub)" <gi...@apache.org> on 2023/06/30 21:30:49 UTC

[GitHub] [pinot] navina opened a new pull request, #11017: Remove support for High level consumers in Apache Pinot

navina opened a new pull request, #11017:
URL: https://github.com/apache/pinot/pull/11017

   * Controller will throw runtime exception during startup if there are hlc tables in the cluster
   * Add table api will disallow table creation with consumer.type = highlevel
   * Marked StreamLevelConsumer as `@Deprecated` in pinot-spi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1277119385


##########
pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java:
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+public class FakeStreamConsumerFactory extends StreamConsumerFactory {

Review Comment:
   I was wrong. I added this because I moved the `HelixBrokerStarterTest` from using `hlc` use `llc` consumer . 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1276654705


##########
pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java:
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+public class FakeStreamConsumerFactory extends StreamConsumerFactory {

Review Comment:
   Not needed. will remove it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1278176777


##########
pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java:
##########
@@ -136,13 +138,17 @@ public void testCompatibilityWithTableConfig() {
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS").build();
     tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+            .setStreamConfigs(Map.of(StreamConfigProperties.STREAM_TYPE, "foo"))

Review Comment:
   We will eventually remove the notion of consumerType, anyway



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1273923026


##########
pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java:
##########
@@ -26,6 +26,7 @@
  * Test implementation of {@link StreamLevelConsumer}
  * This is currently a no-op
  */
+@Deprecated(since = "Pinot no longer support high level consumer model since v0.12.*")

Review Comment:
   yes. after removing the integ test, this doesn't make sense. will remove it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1273922540


##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -501,6 +494,28 @@ protected void configure() {
 
     LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
+    List<String> existingHlcTables = new ArrayList<>();
+    _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
+      if (tableConfig != null) {
+        Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);

Review Comment:
   In the past, I have been recommended to use `IngestionConfigUtils.getStreamConfigMap` which searches for configs in 2 places - TableConfig#IngestionConfig and TableConfig#IndexingConfig. Likely due for compatibility reasons. 
   We should fix that separately (if not already) - re-write existing table configs to move stream configs from indexConfig into IngestionConfig. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1273924051


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -126,6 +126,7 @@ public StreamConfig(String tableNameWithType, Map<String, String> streamConfigMa
     String consumerFactoryClassKey =
         StreamConfigProperties.constructStreamProperty(_type, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS);
     // For backward compatibility, default consumer factory is for Kafka.
+    // TODO: remove this default and make it mandatory to have a factory class

Review Comment:
   I thought we shouldn't have a default stream system. more strict config checks. Can remove the comment, if you disagree. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1278176712


##########
pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java:
##########
@@ -136,13 +138,17 @@ public void testCompatibilityWithTableConfig() {
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS").build();
     tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+            .setStreamConfigs(Map.of(StreamConfigProperties.STREAM_TYPE, "foo"))

Review Comment:
   I added a validation in TableConfigUtils which exposes the existing misconfigurations in this test. Since these test are checking for misconfiguration, adding a standard test util method to fetch streamconfigs is going to help much. So, I think this change can remain. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1281425878


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -223,6 +212,17 @@ public StreamConfig(String tableNameWithType, Map<String, String> streamConfigMa
     _streamConfigMap.putAll(streamConfigMap);
   }
 
+  public static void validateConsumerType(String streamType, Map<String, String> streamConfigMap) {
+    String consumerTypesKey =
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES);
+    String consumerTypes = streamConfigMap.get(consumerTypesKey);
+    Preconditions.checkNotNull(consumerTypes, consumerTypesKey + " cannot be null");
+    for (String consumerType : consumerTypes.split(",")) {
+      Preconditions.checkState(ConsumerType.LOWLEVEL.name().equalsIgnoreCase(consumerType)
+              || SIMPLE_CONSUMER_TYPE_STRING.equalsIgnoreCase(consumerType),

Review Comment:
   yes. and this precondition is allowing it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1274221321


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -126,6 +126,7 @@ public StreamConfig(String tableNameWithType, Map<String, String> streamConfigMa
     String consumerFactoryClassKey =
         StreamConfigProperties.constructStreamProperty(_type, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS);
     // For backward compatibility, default consumer factory is for Kafka.
+    // TODO: remove this default and make it mandatory to have a factory class

Review Comment:
   IMO it is okay to use Kafka by default, as long as we document it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #11017:
URL: https://github.com/apache/pinot/pull/11017#issuecomment-1615248955

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/11017?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#11017](https://app.codecov.io/gh/apache/pinot/pull/11017?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (5df090e) into [master](https://app.codecov.io/gh/apache/pinot/commit/0b097a8dab7d2953d331010c856420967fe873e1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (0b097a8) will **decrease** coverage by `0.12%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #11017      +/-   ##
   ==========================================
   - Coverage    0.11%    0.00%   -0.12%     
   ==========================================
     Files        2192     2176      -16     
     Lines      118016   117589     -427     
     Branches    17869    17816      -53     
   ==========================================
   - Hits          137        0     -137     
   + Misses     117859   117589     -270     
   + Partials       20        0      -20     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1temurin11 | `?` | |
   | integration1temurin17 | `?` | |
   | integration1temurin20 | `0.00% <0.00%> (ø)` | |
   | integration2temurin11 | `?` | |
   | integration2temurin17 | `?` | |
   | unittests1temurin17 | `?` | |
   | unittests1temurin20 | `?` | |
   | unittests2temurin11 | `?` | |
   | unittests2temurin17 | `?` | |
   | unittests2temurin20 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/pinot/pull/11017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...apache/pinot/controller/BaseControllerStarter.java](https://app.codecov.io/gh/apache/pinot/pull/11017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9CYXNlQ29udHJvbGxlclN0YXJ0ZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...va/org/apache/pinot/controller/ControllerConf.java](https://app.codecov.io/gh/apache/pinot/pull/11017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9Db250cm9sbGVyQ29uZi5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...lugin/stream/kafka20/KafkaStreamLevelConsumer.java](https://app.codecov.io/gh/apache/pinot/pull/11017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthU3RyZWFtTGV2ZWxDb25zdW1lci5qYXZh) | `0.00% <ø> (ø)` | |
   | [...lugin/stream/pulsar/PulsarStreamLevelConsumer.java](https://app.codecov.io/gh/apache/pinot/pull/11017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LXB1bHNhci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9wdWxzYXIvUHVsc2FyU3RyZWFtTGV2ZWxDb25zdW1lci5qYXZh) | `0.00% <ø> (ø)` | |
   | [...he/pinot/segment/local/utils/TableConfigUtils.java](https://app.codecov.io/gh/apache/pinot/pull/11017?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91dGlscy9UYWJsZUNvbmZpZ1V0aWxzLmphdmE=) | `0.00% <0.00%> (ø)` | |
   
   ... and [18 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/11017/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1281431147


##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -502,6 +495,24 @@ protected void configure() {
 
     LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
+    List<String> existingHlcTables = new ArrayList<>();
+    _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
+      if (tableConfig != null) {
+        Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        try {
+          StreamConfig.validateConsumerType(
+              streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), streamConfigMap);

Review Comment:
   yeah I don't want to change it in this PR. Might break more tests. Also, if the default was assumed to be "kafka" before, then existing llc tables might also end-up breaking due to the mandatory stream type check here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1281456311


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -223,6 +212,17 @@ public StreamConfig(String tableNameWithType, Map<String, String> streamConfigMa
     _streamConfigMap.putAll(streamConfigMap);
   }
 
+  public static void validateConsumerType(String streamType, Map<String, String> streamConfigMap) {
+    String consumerTypesKey =
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES);
+    String consumerTypes = streamConfigMap.get(consumerTypesKey);
+    Preconditions.checkNotNull(consumerTypes, consumerTypesKey + " cannot be null");
+    for (String consumerType : consumerTypes.split(",")) {
+      Preconditions.checkState(ConsumerType.LOWLEVEL.name().equalsIgnoreCase(consumerType)
+              || SIMPLE_CONSUMER_TYPE_STRING.equalsIgnoreCase(consumerType),

Review Comment:
   Ah, sorry I misread it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang merged pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang merged PR #11017:
URL: https://github.com/apache/pinot/pull/11017


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1276652701


##########
pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java:
##########
@@ -136,13 +138,17 @@ public void testCompatibilityWithTableConfig() {
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS").build();
     tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+            .setStreamConfigs(Map.of(StreamConfigProperties.STREAM_TYPE, "foo"))

Review Comment:
   > Is this correct? Do we need topic name, consumer type etc? Same for other tests
   
   Yeah we need to improve validations for our realtime table configs. It's the same for all other tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1276648350


##########
pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java:
##########
@@ -58,7 +58,8 @@ public class PinotTableRestletResourceTest extends ControllerTest {
   private static final String OFFLINE_TABLE_NAME = "testOfflineTable";
   private static final String REALTIME_TABLE_NAME = "testRealtimeTable";
   private final TableConfigBuilder _offlineBuilder = new TableConfigBuilder(TableType.OFFLINE);
-  private final TableConfigBuilder _realtimeBuilder = new TableConfigBuilder(TableType.REALTIME);
+  private final TableConfigBuilder _realtimeBuilder = new TableConfigBuilder(TableType.REALTIME)
+      .setStreamConfigs(Map.of("stream.type", "foo", "consumer.type", "lowlevel"));

Review Comment:
   Yes. We don't seem to validate stream.type. Is that what you are referring to? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1273003693


##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -501,6 +494,28 @@ protected void configure() {
 
     LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
+    List<String> existingHlcTables = new ArrayList<>();
+    _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
+      if (tableConfig != null) {
+        Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);

Review Comment:
   Please follow the same logic as `StreamConfig`, or just use `StreamConfig` to extract the `ConsumerType`



##########
pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java:
##########
@@ -58,7 +58,8 @@ public class PinotTableRestletResourceTest extends ControllerTest {
   private static final String OFFLINE_TABLE_NAME = "testOfflineTable";
   private static final String REALTIME_TABLE_NAME = "testRealtimeTable";
   private final TableConfigBuilder _offlineBuilder = new TableConfigBuilder(TableType.OFFLINE);
-  private final TableConfigBuilder _realtimeBuilder = new TableConfigBuilder(TableType.REALTIME);
+  private final TableConfigBuilder _realtimeBuilder = new TableConfigBuilder(TableType.REALTIME)
+      .setStreamConfigs(Map.of("stream.type", "foo", "consumer.type", "lowlevel"));

Review Comment:
   Is this correct?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -135,6 +135,20 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N
     }
     // Sanitize the table config before validation
     sanitize(tableConfig);
+
+    // Only allow realtime tables with LLC consumer.type
+    if (tableConfig.getTableType() == TableType.REALTIME) {

Review Comment:
   Let's make a util for this, and make the controller starter also call the util



##########
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java:
##########
@@ -993,11 +994,7 @@ public String getLeadControllerResourceRebalanceStrategy() {
   }
 
   public boolean getHLCTablesAllowed() {

Review Comment:
   Let's just remove it



##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -501,6 +494,28 @@ protected void configure() {
 
     LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
+    List<String> existingHlcTables = new ArrayList<>();
+    _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
+      if (tableConfig != null) {
+        Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        String streamType = streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka");
+        String consumerType = streamConfigMap.get(StreamConfigProperties.constructStreamProperty(streamType,
+            StreamConfigProperties.STREAM_CONSUMER_TYPES));
+        if (StreamConfig.ConsumerType.LOWLEVEL.name().equalsIgnoreCase(consumerType)
+            || "simple".equalsIgnoreCase(consumerType)) {
+          return;
+        } else {
+          existingHlcTables.add(rt);
+        }
+      }
+    });
+    if (existingHlcTables.size() > 0) {
+      LOGGER.error("High Level Consumer (HLC) based realtime tables are no longer supported. Please delete the "
+          + "following HLC tables before proceeding: \n");
+      existingHlcTables.forEach(s -> LOGGER.error("{}\n", s));

Review Comment:
   Let's log it in one line
   ```suggestion
         LOGGER.error("High Level Consumer (HLC) based realtime tables are no longer supported. Please delete the "
             + "following HLC tables before proceeding: {}", existingHlcTables);
   ```



##########
pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java:
##########
@@ -26,6 +26,7 @@
  * Test implementation of {@link StreamLevelConsumer}
  * This is currently a no-op
  */
+@Deprecated(since = "Pinot no longer support high level consumer model since v0.12.*")

Review Comment:
   Should we just clean them up?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -501,6 +494,28 @@ protected void configure() {
 
     LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
+    List<String> existingHlcTables = new ArrayList<>();
+    _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
+      if (tableConfig != null) {
+        Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        String streamType = streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka");
+        String consumerType = streamConfigMap.get(StreamConfigProperties.constructStreamProperty(streamType,
+            StreamConfigProperties.STREAM_CONSUMER_TYPES));
+        if (StreamConfig.ConsumerType.LOWLEVEL.name().equalsIgnoreCase(consumerType)
+            || "simple".equalsIgnoreCase(consumerType)) {
+          return;
+        } else {
+          existingHlcTables.add(rt);
+        }
+      }
+    });
+    if (existingHlcTables.size() > 0) {
+      LOGGER.error("High Level Consumer (HLC) based realtime tables are no longer supported. Please delete the "
+          + "following HLC tables before proceeding: \n");
+      existingHlcTables.forEach(s -> LOGGER.error("{}\n", s));
+      throw new RuntimeException("Unable to start controller due to existing HLC tables!");

Review Comment:
   ```suggestion
         throw new RuntimeException("Unable to start controller due to existing HLC tables: " + existingHlcTables);
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -126,6 +126,7 @@ public StreamConfig(String tableNameWithType, Map<String, String> streamConfigMa
     String consumerFactoryClassKey =
         StreamConfigProperties.constructStreamProperty(_type, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS);
     // For backward compatibility, default consumer factory is for Kafka.
+    // TODO: remove this default and make it mandatory to have a factory class

Review Comment:
   I don't follow this TODO. We want to default to low level Kafka



##########
pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java:
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+public class FakeStreamConsumerFactory extends StreamConsumerFactory {

Review Comment:
   Do we need this?



##########
pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java:
##########
@@ -136,13 +138,17 @@ public void testCompatibilityWithTableConfig() {
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS").build();
     tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+            .setStreamConfigs(Map.of(StreamConfigProperties.STREAM_TYPE, "foo"))

Review Comment:
   Is this correct?  Do we need topic name, consumer type etc? Same for other tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1281271373


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java:
##########
@@ -41,6 +41,7 @@
 /**
  * An implementation of a {@link StreamLevelConsumer} which consumes from the kafka stream
  */
+@Deprecated(since = "Pinot no longer support high level consumer model since v0.12.*")

Review Comment:
   Do not add `since` here as it was introduced in java 9 and some users are still on java 8



##########
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java:
##########
@@ -33,6 +33,7 @@
 /**
  * A {@link StreamLevelConsumer} implementation for the Pulsar stream
  */
+@Deprecated(since = "Pinot no longer support high level consumer model since v0.12.*")

Review Comment:
   Same here



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -223,6 +212,17 @@ public StreamConfig(String tableNameWithType, Map<String, String> streamConfigMa
     _streamConfigMap.putAll(streamConfigMap);
   }
 
+  public static void validateConsumerType(String streamType, Map<String, String> streamConfigMap) {
+    String consumerTypesKey =
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES);
+    String consumerTypes = streamConfigMap.get(consumerTypesKey);
+    Preconditions.checkNotNull(consumerTypes, consumerTypesKey + " cannot be null");
+    for (String consumerType : consumerTypes.split(",")) {
+      Preconditions.checkState(ConsumerType.LOWLEVEL.name().equalsIgnoreCase(consumerType)
+              || SIMPLE_CONSUMER_TYPE_STRING.equalsIgnoreCase(consumerType),

Review Comment:
   (MAJOR) `SIMPLE_CONSUMER_TYPE_STRING` is allowed, and it is equivalent to llc



##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -502,6 +495,24 @@ protected void configure() {
 
     LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
+    List<String> existingHlcTables = new ArrayList<>();
+    _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
+      if (tableConfig != null) {
+        Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        try {
+          StreamConfig.validateConsumerType(
+              streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka"), streamConfigMap);

Review Comment:
   (minor) `STREAM_TYPE` is a mandatory field, and we shouldn't need to put a default here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org