You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2023/11/28 21:05:19 UTC

(pinot) branch master updated: Fix incorrect handling of consumer creation errors (#12045)

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fe2d25c21 Fix incorrect handling of consumer creation errors (#12045)
6fe2d25c21 is described below

commit 6fe2d25c21fdc9b8364b6e594d96b78e907717e6
Author: Subbu Subramaniam <mc...@users.noreply.github.com>
AuthorDate: Tue Nov 28 13:05:13 2023 -0800

    Fix incorrect handling of consumer creation errors (#12045)
    
    * Fix incorrect handling of consumer creation errors
    
    The current handling of exceptions during creation of a consumer is
    incorrect, since the ExternalView for the segment remains in ERROR
    state while we specify the IdealState to be OFFLINE. This happens
    for one replica, while other replicas may consume fine and reach
    ONLINE state eventually. At that time, however, the particular
    segment that had problem consuming is not able to transition to
    ONLINE since a transition from ERROR to ONLINE is not suppored by Helix.
    
    A partition state of ERROR is a special state in Helix. Helix does not
    work the same way moving from ERROR to other states. Instead,
    Helix provides an admin API to reset the state of a partition from ERROR
    to its starting state (which in our case is OFFLINE). When this reset
    API is invoked, a state transition message of ERROR to StartingState
    is sent to the specific instance that hosts the partition in question.
    If the participant's currentstate is not ERROR, then this message
    is discarded automatically (and Pinot will never see it). Otherwise,
    it is passed on to Pinot and we have a transition from ERROR to OFFLINE.
    
    Tested by manually inserting an exception in the consumer creation
    code ad observing that externalview changes to ERROR, and then later
    onto OFFLINE.
    
    * Ignore Helix expection on reset call
    
    Helix throws an exceptioin if we call reset on a partition that is not in ERROR
    state in ExternalView. So, we need to ignore any exception that helix sends out
    on the reset call (it is a best-effort call anyway).
    
    * Address linter check
    
    * Added tests
    
    * Addressed review comments
    
    * Addressed review comment to fix space
---
 .../controller/helix/ControllerRequestClient.java  |  10 ++
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  16 +++
 .../realtime/RealtimeSegmentDataManager.java       |  55 +++++++-
 .../tests/BaseRealtimeClusterIntegrationTest.java  |   6 +
 .../tests/LLCRealtimeClusterIntegrationTest.java   | 149 +++++++++++++++++++++
 .../utils/builder/ControllerRequestURLBuilder.java |   4 +
 6 files changed, 233 insertions(+), 7 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index ff27954f70..e37172e3b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -325,6 +325,16 @@ public class ControllerRequestClient {
     }
   }
 
+  public void runPeriodicTask(String taskName)
+      throws IOException {
+    try {
+      HttpClient.wrapAndThrowHttpException(_httpClient.sendGetRequest(new URL(
+          _controllerRequestURLBuilder.forPeriodTaskRun(taskName)).toURI()));
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
   protected String getBrokerTenantRequestPayload(String tenantName, int numBrokers) {
     return new Tenant(TenantRole.BROKER, tenantName, numBrokers, 0, 0).toJsonString();
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 90cd05d16b..092e47cced 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -821,6 +821,22 @@ public class PinotLLCRealtimeSegmentManager {
       _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
       throw e;
     }
+    // We know that we have successfully set the idealstate to be OFFLINE.
+    // We can now do a best effort to reset the externalview to be OFFLINE if it is in ERROR state.
+    // If the externalview is not in error state, then this reset will be ignored by the helix participant
+    // in the server when it receives the ERROR to OFFLINE state transition.
+    // Helix throws an exception if we try to reset state of a partition that is NOT in ERROR state in EV,
+    // So, if any exceptions are thrown, ignore it here.
+    // TODO: https://github.com/apache/pinot/issues/12055
+    // If we have reaosn codes, then the server can indicate to us the reason (in this case, consumer was never
+    // created, OR consumer was created but could not consume the segment compeltely), and we can call reset()
+    // in one of the cases and not the other.
+    try {
+      _helixAdmin.resetPartition(_helixManager.getClusterName(), instanceName,
+          realtimeTableName, Collections.singletonList(segmentName));
+    } catch (Exception e) {
+      // Ignore
+    }
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 3b66bc97fc..0f880c5f22 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1134,16 +1134,40 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
     }
   }
 
-  // Inform the controller that the server had to stop consuming due to an error.
-  protected void postStopConsumedMsg(String reason) {
-    do {
+  private static class ConsumptionStopIndicator {
+    final StreamPartitionMsgOffset _offset;
+    final String _segmentName;
+    final String _instanceId;
+    final Logger _logger;
+    final ServerSegmentCompletionProtocolHandler _protocolHandler;
+    final String _reason;
+    private ConsumptionStopIndicator(StreamPartitionMsgOffset offset, String segmentName, String instanceId,
+        ServerSegmentCompletionProtocolHandler protocolHandler, String reason, Logger logger) {
+      _offset = offset;
+      _segmentName = segmentName;
+      _instanceId = instanceId;
+      _protocolHandler = protocolHandler;
+      _logger = logger;
+      _reason = reason;
+    }
+
+    SegmentCompletionProtocol.Response postSegmentStoppedConsuming() {
       SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
-      params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason).withSegmentName(_segmentNameStr)
+      params.withStreamPartitionMsgOffset(_offset.toString()).withReason(_reason).withSegmentName(_segmentName)
           .withInstanceId(_instanceId);
 
       SegmentCompletionProtocol.Response response = _protocolHandler.segmentStoppedConsuming(params);
+      _logger.info("Got response {}", response.toJsonString());
+      return response;
+    }
+  }
+  // Inform the controller that the server had to stop consuming due to an error.
+  protected void postStopConsumedMsg(String reason) {
+    ConsumptionStopIndicator indicator = new ConsumptionStopIndicator(_currentOffset,
+        _segmentNameStr, _instanceId, _protocolHandler, reason, _segmentLogger);
+    do {
+      SegmentCompletionProtocol.Response response = indicator.postSegmentStoppedConsuming();
       if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED) {
-        _segmentLogger.info("Got response {}", response.toJsonString());
         break;
       }
       Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
@@ -1491,9 +1515,26 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
       _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
           "Failed to initialize segment data manager", e));
       _segmentLogger.warn(
-          "Calling controller to mark the segment as OFFLINE in Ideal State because of initialization error: '{}'",
+          "Scheduling task to call controller to mark the segment as OFFLINE in Ideal State due"
+           + " to initialization error: '{}'",
           e.getMessage());
-      postStopConsumedMsg("Consuming segment initialization error");
+      // Since we are going to throw exception from this thread (helix execution thread), the externalview
+      // entry for this segment will be ERROR. We allow time for Helix to make this transition, and then
+      // invoke controller API mark it OFFLINE in the idealstate.
+      new Thread(() -> {
+        ConsumptionStopIndicator indicator = new ConsumptionStopIndicator(_currentOffset, _segmentNameStr, _instanceId,
+            _protocolHandler, "Consuming segment initialization error", _segmentLogger);
+        try {
+          // Allow 30s for Helix to mark currentstate and externalview to ERROR, because
+          // we are about to receive an ERROR->OFFLINE state transition once we call
+          // postSegmentStoppedConsuming() method.
+          Thread.sleep(30_000);
+          indicator.postSegmentStoppedConsuming();
+        } catch (InterruptedException ie) {
+          // We got interrupted trying to post stop-consumed message. Give up at this point
+          return;
+        }
+      }).start();
       throw e;
     }
   }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
index af39f3280e..6774dc52ab 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java
@@ -74,10 +74,16 @@ public abstract class BaseRealtimeClusterIntegrationTest extends BaseClusterInte
     // Initialize the query generator
     setUpQueryGenerator(avroFiles);
 
+    runValidationJob(600_000);
+
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
   }
 
+  protected void runValidationJob(long timeoutMs)
+      throws Exception {
+  }
+
   protected void createSegmentsAndUpload(List<File> avroFile, Schema schema, TableConfig tableConfig)
       throws Exception {
     // Do nothing. This is specific to LLC use cases for now.
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 51b19993f1..2389fe8ba6 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -22,7 +22,9 @@ import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -32,13 +34,20 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.http.HttpStatus;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory;
+import org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConsumer;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -46,12 +55,19 @@ import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
+import org.junit.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -101,6 +117,62 @@ public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegr
     }
   }
 
+  @Override
+  protected void overrideControllerConf(Map<String, Object> properties) {
+    // Make sure the realtime segment validation manager does not run by itself, only when we invoke it.
+    properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_PERIOD, "2h");
+    properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
+        3600);
+  }
+
+  @Override
+  protected void runValidationJob(long timeoutMs)
+      throws Exception {
+    final int partition = ExceptingKafkaConsumerFactory.PARTITION_FOR_EXCEPTIONS;
+    if (partition < 0) {
+      return;
+    }
+    int[] seqNumbers = {ExceptingKafkaConsumerFactory.SEQ_NUM_FOR_CREATE_EXCEPTION,
+        ExceptingKafkaConsumerFactory.SEQ_NUM_FOR_CONSUME_EXCEPTION};
+    Arrays.sort(seqNumbers);
+    for (int seqNum : seqNumbers) {
+      if (seqNum < 0) {
+        continue;
+      }
+      TestUtils.waitForCondition(() -> isOffline(partition, seqNum), 5000L, timeoutMs,
+          "Failed to find offline segment in partition " + partition + " seqNum ", true,
+          Duration.ofMillis(timeoutMs / 10));
+      getControllerRequestClient().runPeriodicTask("RealtimeSegmentValidationManager");
+    }
+  }
+
+  private boolean isOffline(int partition, int seqNum) {
+    ExternalView ev = _helixAdmin.getResourceExternalView(getHelixClusterName(),
+        TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
+
+    boolean isOffline = false;
+    for (String segmentNameStr : ev.getPartitionSet()) {
+      if (LLCSegmentName.isLLCSegment(segmentNameStr)) {
+        LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+        if (segmentName.getSequenceNumber() == seqNum && segmentName.getPartitionGroupId() == partition
+            && ev.getStateMap(segmentNameStr).values().contains(
+            CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE)) {
+          isOffline = true;
+        }
+      }
+    }
+    return isOffline;
+  }
+
+  @Override
+  protected Map<String, String> getStreamConfigMap() {
+    Map<String, String> streamConfigMap = super.getStreamConfigMap();
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(
+        streamConfigMap.get(StreamConfigProperties.STREAM_TYPE),
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), ExceptingKafkaConsumerFactory.class.getName());
+    ExceptingKafkaConsumerFactory.init(getHelixClusterName(), _helixAdmin, getTableName());
+    return streamConfigMap;
+  }
   @Override
   protected IngestionConfig getIngestionConfig() {
     IngestionConfig ingestionConfig = new IngestionConfig();
@@ -372,4 +444,81 @@ public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegr
       throws Exception {
     super.testHardcodedServerPartitionedSqlQueries();
   }
+
+  public static class ExceptingKafkaConsumerFactory extends KafkaConsumerFactory {
+
+    public static final int PARTITION_FOR_EXCEPTIONS = 1; // Setting this to -1 disables all exceptions thrown.
+    public static final int SEQ_NUM_FOR_CREATE_EXCEPTION = 1;
+    public static final int SEQ_NUM_FOR_CONSUME_EXCEPTION = 3;
+
+    private static HelixAdmin _helixAdmin;
+    private static String _helixClusterName;
+    private static String _tableName;
+    public ExceptingKafkaConsumerFactory() {
+      super();
+    }
+
+    public static void init(String helixClusterName, HelixAdmin helixAdmin, String tableName) {
+      _helixAdmin = helixAdmin;
+      _helixClusterName = helixClusterName;
+      _tableName = tableName;
+    }
+    @Override
+    public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
+      /*
+       * The segment data manager is creating a consumer to consume rows into a segment.
+       * Check the partition and sequence number of the segment and decide whether it
+       * qualifies for:
+       * - Throwing exception during create OR
+       * - Throwing exception during consumption.
+       * Make sure that this still works if retries are added in RealtimeSegmentDataManager
+       */
+      boolean exceptionDuringConsume = false;
+      int seqNum = getSegmentSeqNum(partition);
+      if (partition == PARTITION_FOR_EXCEPTIONS) {
+        if (seqNum == SEQ_NUM_FOR_CREATE_EXCEPTION) {
+          throw new RuntimeException("TestException during consumer creation");
+        } else if (seqNum == SEQ_NUM_FOR_CONSUME_EXCEPTION) {
+          exceptionDuringConsume = true;
+        }
+      }
+      return new ExceptingKafkaConsumer(clientId, _streamConfig, partition, exceptionDuringConsume);
+    }
+
+    private int getSegmentSeqNum(int partition) {
+      IdealState is = _helixAdmin.getResourceIdealState(_helixClusterName,
+          TableNameBuilder.REALTIME.tableNameWithType(_tableName));
+      AtomicInteger seqNum = new AtomicInteger(-1);
+      is.getPartitionSet().forEach(segmentNameStr -> {
+        if (LLCSegmentName.isLLCSegment(segmentNameStr)) {
+          if (is.getInstanceStateMap(segmentNameStr).values().contains(
+              CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)) {
+            LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+            if (segmentName.getPartitionGroupId() == partition) {
+              seqNum.set(segmentName.getSequenceNumber());
+            }
+          }
+        }
+      });
+      Assert.assertTrue("No consuming segment found in partition " + partition, seqNum.get() >= 0);
+      return seqNum.get();
+    }
+
+    public class ExceptingKafkaConsumer extends KafkaPartitionLevelConsumer {
+      private final boolean _exceptionDuringConsume;
+      public ExceptingKafkaConsumer(String clientId, StreamConfig streamConfig, int partition,
+          boolean exceptionDuringConsume) {
+        super(clientId, streamConfig, partition);
+        _exceptionDuringConsume = exceptionDuringConsume;
+      }
+      @Override
+      public MessageBatch<StreamMessage<byte[]>> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+          StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
+        if (_exceptionDuringConsume) {
+          throw new RuntimeException("TestException during consumption");
+        }
+        return super.fetchMessages(startMsgOffset, endMsgOffset, timeoutMillis);
+      }
+    }
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 192ba71ec0..d4f255b778 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -109,6 +109,10 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "users", username, params.toString());
   }
 
+  public String forPeriodTaskRun(String taskName) {
+    return StringUtil.join("/", _baseUrl, "periodictask", "run?taskname=" + taskName);
+  }
+
   public String forUpdateUserConfig(String username, String componentTypeStr, boolean passwordChanged) {
     StringBuilder params = new StringBuilder();
     if (StringUtils.isNotBlank(username)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org