You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/07/25 05:22:49 UTC

[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11017: Remove support for High level consumers in Apache Pinot

Jackie-Jiang commented on code in PR #11017:
URL: https://github.com/apache/pinot/pull/11017#discussion_r1273003693


##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -501,6 +494,28 @@ protected void configure() {
 
     LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
+    List<String> existingHlcTables = new ArrayList<>();
+    _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
+      if (tableConfig != null) {
+        Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);

Review Comment:
   Please follow the same logic as `StreamConfig`, or just use `StreamConfig` to extract the `ConsumerType`



##########
pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTableRestletResourceTest.java:
##########
@@ -58,7 +58,8 @@ public class PinotTableRestletResourceTest extends ControllerTest {
   private static final String OFFLINE_TABLE_NAME = "testOfflineTable";
   private static final String REALTIME_TABLE_NAME = "testRealtimeTable";
   private final TableConfigBuilder _offlineBuilder = new TableConfigBuilder(TableType.OFFLINE);
-  private final TableConfigBuilder _realtimeBuilder = new TableConfigBuilder(TableType.REALTIME);
+  private final TableConfigBuilder _realtimeBuilder = new TableConfigBuilder(TableType.REALTIME)
+      .setStreamConfigs(Map.of("stream.type", "foo", "consumer.type", "lowlevel"));

Review Comment:
   Is this correct?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -135,6 +135,20 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N
     }
     // Sanitize the table config before validation
     sanitize(tableConfig);
+
+    // Only allow realtime tables with LLC consumer.type
+    if (tableConfig.getTableType() == TableType.REALTIME) {

Review Comment:
   Let's make a util for this, and make the controller starter also call the util



##########
pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java:
##########
@@ -993,11 +994,7 @@ public String getLeadControllerResourceRebalanceStrategy() {
   }
 
   public boolean getHLCTablesAllowed() {

Review Comment:
   Let's just remove it



##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -501,6 +494,28 @@ protected void configure() {
 
     LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
+    List<String> existingHlcTables = new ArrayList<>();
+    _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
+      if (tableConfig != null) {
+        Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        String streamType = streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka");
+        String consumerType = streamConfigMap.get(StreamConfigProperties.constructStreamProperty(streamType,
+            StreamConfigProperties.STREAM_CONSUMER_TYPES));
+        if (StreamConfig.ConsumerType.LOWLEVEL.name().equalsIgnoreCase(consumerType)
+            || "simple".equalsIgnoreCase(consumerType)) {
+          return;
+        } else {
+          existingHlcTables.add(rt);
+        }
+      }
+    });
+    if (existingHlcTables.size() > 0) {
+      LOGGER.error("High Level Consumer (HLC) based realtime tables are no longer supported. Please delete the "
+          + "following HLC tables before proceeding: \n");
+      existingHlcTables.forEach(s -> LOGGER.error("{}\n", s));

Review Comment:
   Let's log it in one line
   ```suggestion
         LOGGER.error("High Level Consumer (HLC) based realtime tables are no longer supported. Please delete the "
             + "following HLC tables before proceeding: {}", existingHlcTables);
   ```



##########
pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java:
##########
@@ -26,6 +26,7 @@
  * Test implementation of {@link StreamLevelConsumer}
  * This is currently a no-op
  */
+@Deprecated(since = "Pinot no longer support high level consumer model since v0.12.*")

Review Comment:
   Should we just clean them up?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -501,6 +494,28 @@ protected void configure() {
 
     LOGGER.info("Starting controller admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
     _adminApp.start(_listenerConfigs);
+    List<String> existingHlcTables = new ArrayList<>();
+    _helixResourceManager.getAllRealtimeTables().forEach(rt -> {
+      TableConfig tableConfig = _helixResourceManager.getTableConfig(rt);
+      if (tableConfig != null) {
+        Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap(tableConfig);
+        String streamType = streamConfigMap.getOrDefault(StreamConfigProperties.STREAM_TYPE, "kafka");
+        String consumerType = streamConfigMap.get(StreamConfigProperties.constructStreamProperty(streamType,
+            StreamConfigProperties.STREAM_CONSUMER_TYPES));
+        if (StreamConfig.ConsumerType.LOWLEVEL.name().equalsIgnoreCase(consumerType)
+            || "simple".equalsIgnoreCase(consumerType)) {
+          return;
+        } else {
+          existingHlcTables.add(rt);
+        }
+      }
+    });
+    if (existingHlcTables.size() > 0) {
+      LOGGER.error("High Level Consumer (HLC) based realtime tables are no longer supported. Please delete the "
+          + "following HLC tables before proceeding: \n");
+      existingHlcTables.forEach(s -> LOGGER.error("{}\n", s));
+      throw new RuntimeException("Unable to start controller due to existing HLC tables!");

Review Comment:
   ```suggestion
         throw new RuntimeException("Unable to start controller due to existing HLC tables: " + existingHlcTables);
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java:
##########
@@ -126,6 +126,7 @@ public StreamConfig(String tableNameWithType, Map<String, String> streamConfigMa
     String consumerFactoryClassKey =
         StreamConfigProperties.constructStreamProperty(_type, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS);
     // For backward compatibility, default consumer factory is for Kafka.
+    // TODO: remove this default and make it mandatory to have a factory class

Review Comment:
   I don't follow this TODO. We want to default to low level Kafka



##########
pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java:
##########
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.broker;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+
+
+public class FakeStreamConsumerFactory extends StreamConsumerFactory {

Review Comment:
   Do we need this?



##########
pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java:
##########
@@ -136,13 +138,17 @@ public void testCompatibilityWithTableConfig() {
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS").build();
     tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+            .setStreamConfigs(Map.of(StreamConfigProperties.STREAM_TYPE, "foo"))

Review Comment:
   Is this correct?  Do we need topic name, consumer type etc? Same for other tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org