You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2019/03/22 16:33:16 UTC

[incubator-pinot] branch master updated: Set processingException when all queried segments cannot be acquired (#3942)

This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d4f2ece  Set processingException when all queried segments cannot be acquired (#3942)
d4f2ece is described below

commit d4f2ecef660ab1d4efa9696a53b0623aac867c3f
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Fri Mar 22 09:33:09 2019 -0700

    Set processingException when all queried segments cannot be acquired (#3942)
    
    * Set processingException when all queried segments cannot be acquired
    
    * Minor fixes
    
    * Address review comments
    
    * Disable setting processing exception until metrics are validated
    
    * Address review comments
    
    * Address review comments
    
    * Address review comments
    
    * Address review comments
    
    * Address review comments: Make InstanceDataManager symmetric with respect to tracking segments added and deleted
---
 .../pinot/common/exception/QueryException.java     |  2 +
 .../core/data/manager/BaseTableDataManager.java    | 48 +++++++++++++++++----
 .../core/data/manager/InstanceDataManager.java     | 14 ++++++
 .../pinot/core/data/manager/TableDataManager.java  | 15 +++++++
 .../query/executor/ServerQueryExecutorV1Impl.java  | 33 +++++++++++++-
 .../data/manager/BaseTableDataManagerTest.java     |  5 +++
 .../pinot/query/executor/QueryExecutorTest.java    | 50 ++++++++++++++++++++--
 .../starter/helix/HelixInstanceDataManager.java    | 16 +++++++
 .../SegmentOnlineOfflineStateModelFactory.java     |  5 +++
 9 files changed, 175 insertions(+), 13 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index e0ed903..6c45897 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -41,6 +41,7 @@ public class QueryException {
   public static final int SEGMENT_PLAN_EXECUTION_ERROR_CODE = 160;
   public static final int COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE = 170;
   public static final int ACCESS_DENIED_ERROR_CODE = 180;
+  public static final int SEGMENTS_MISSING_ERROR_CODE = 190;
   public static final int QUERY_EXECUTION_ERROR_CODE = 200;
   // TODO: Handle these errors in broker
   public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210;
@@ -96,6 +97,7 @@ public class QueryException {
   public static final ProcessingException QUERY_VALIDATION_ERROR = new ProcessingException(QUERY_VALIDATION_ERROR_CODE);
   public static final ProcessingException UNKNOWN_ERROR = new ProcessingException(UNKNOWN_ERROR_CODE);
   public static final ProcessingException QUOTA_EXCEEDED_ERROR = new ProcessingException(TOO_MANY_REQUESTS_ERROR_CODE);
+  public static final ProcessingException SEGMENTS_MISSING_ERROR = new ProcessingException(SEGMENTS_MISSING_ERROR_CODE);
 
   static {
     JSON_PARSING_ERROR.setMessage("JsonParsingError");
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index a456691..3f074a1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -19,10 +19,13 @@
 package org.apache.pinot.core.data.manager;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.helix.ZNRecord;
@@ -42,9 +45,13 @@ import org.slf4j.LoggerFactory;
 @ThreadSafe
 public abstract class BaseTableDataManager implements TableDataManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableDataManager.class);
+  // cache deleted segment names for utmost this duration
+  private static final int MAX_CACHE_DURATION_SEC = 6 * 3600; // 6 hours
 
   protected final ConcurrentHashMap<String, SegmentDataManager> _segmentDataManagerMap = new ConcurrentHashMap<>();
 
+  protected Cache<String, Boolean> _deletedSegmentsCache;
+
   protected TableDataManagerConfig _tableDataManagerConfig;
   protected String _instanceId;
   protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -59,6 +66,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
       @Nonnull ZkHelixPropertyStore<ZNRecord> propertyStore, @Nonnull ServerMetrics serverMetrics) {
     LOGGER.info("Initializing table data manager for table: {}", tableDataManagerConfig.getTableName());
 
+    _deletedSegmentsCache = CacheBuilder.newBuilder().expireAfterWrite(MAX_CACHE_DURATION_SEC, TimeUnit.SECONDS).build();
     _tableDataManagerConfig = tableDataManagerConfig;
     _instanceId = instanceId;
     _propertyStore = propertyStore;
@@ -117,6 +125,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
 
     ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment);
     SegmentDataManager oldSegmentManager = _segmentDataManagerMap.put(segmentName, newSegmentManager);
+
+    // release old segment if needed
     if (oldSegmentManager == null) {
       _logger.info("Added new immutable segment: {} to table: {}", segmentName, _tableNameWithType);
     } else {
@@ -156,6 +166,35 @@ public abstract class BaseTableDataManager implements TableDataManager {
     }
   }
 
+  /**
+   * Called when a segment is deleted. The actual handling of segment delete is outside of this method.
+   * This method provides book-keeping around deleted segments.
+   * @param segmentName name of the segment to track.
+   */
+  public void notifySegmentDeleted(@Nonnull String segmentName) {
+    // add segment to the cache
+    _deletedSegmentsCache.put(segmentName, true);
+  }
+
+  /**
+   * Check if a segment is recently deleted.
+   *
+   * @param segmentName name of the segment to check.
+   * @return true if segment is in the cache, false otherwise
+   */
+  public boolean isRecentlyDeleted(@Nonnull String segmentName) {
+    return _deletedSegmentsCache.getIfPresent(segmentName) != null;
+  }
+
+  /**
+   * Remove a segment from the deleted cache if it is being added back.
+   *
+   * @param segmentName name of the segment that needs to removed from the cache (if needed)
+   */
+  public void notifySegmentAdded(@Nonnull String segmentName) {
+    _deletedSegmentsCache.invalidate(segmentName);
+  }
+
   @Nonnull
   @Override
   public List<SegmentDataManager> acquireAllSegments() {
@@ -176,8 +215,6 @@ public abstract class BaseTableDataManager implements TableDataManager {
       SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
       if (segmentDataManager != null && segmentDataManager.increaseReferenceCount()) {
         segmentDataManagers.add(segmentDataManager);
-      } else {
-        handleMissingSegment(segmentName);
       }
     }
     return segmentDataManagers;
@@ -189,17 +226,10 @@ public abstract class BaseTableDataManager implements TableDataManager {
     if (segmentDataManager != null && segmentDataManager.increaseReferenceCount()) {
       return segmentDataManager;
     } else {
-      handleMissingSegment(segmentName);
       return null;
     }
   }
 
-  private void handleMissingSegment(String segmentName) {
-    // could not find segment
-    LOGGER.error("Could not find segment " + segmentName + " for table " + _tableNameWithType);
-    _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, 1);
-  }
-
   @Override
   public void releaseSegment(@Nonnull SegmentDataManager segmentDataManager) {
     if (segmentDataManager.decreaseReferenceCount()) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 85090dc..9399c7a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -79,6 +79,20 @@ public interface InstanceDataManager {
       throws Exception;
 
   /**
+   * Handles addition of a segment from the table.
+   *
+   * This method performs book keeping of added segments, especially if the deleted-cache needs to be invalidated
+   */
+  void notifySegmentAdded(@Nonnull String tableNameWithType, @Nonnull String segmentName);
+
+  /**
+   * Handles deletion of a segment from the table.
+   *
+   * This method performs book keeping of deleted segments.
+   */
+  void notifySegmentDeleted(@Nonnull String tableNameWithType, @Nonnull String segmentName);
+
+  /**
    * Reloads a segment in a table.
    */
   void reloadSegment(@Nonnull String tableNameWithType, @Nonnull String segmentName)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
index c237fbf..16eae6b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
@@ -80,6 +80,21 @@ public interface TableDataManager {
   void removeSegment(@Nonnull String segmentName);
 
   /**
+   * Track a deleted segment.
+   */
+  void notifySegmentDeleted(@Nonnull String segmentName);
+
+  /**
+   * Track addition of a segment
+   */
+  void notifySegmentAdded(@Nonnull String segmentName);
+
+  /**
+   * Check if a segment is recently deleted.
+   */
+  boolean isRecentlyDeleted(@Nonnull String segmentName);
+
+  /**
    * Acquires all segments of the table.
    * <p>It is the caller's responsibility to return the segments by calling {@link #releaseSegment(SegmentDataManager)}.
    *
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 33e0db5..4ab000c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.executor;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -124,7 +125,24 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
 
     TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
     Preconditions.checkState(tableDataManager != null, "Failed to find data manager for table: " + tableNameWithType);
-    List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(queryRequest.getSegmentsToQuery());
+
+    // acquire the segments
+    int missingSegments = 0;
+    List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
+    List<SegmentDataManager> segmentDataManagers = new ArrayList<>();
+    for (String segmentName : segmentsToQuery) {
+      SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+      if (segmentDataManager != null) {
+        segmentDataManagers.add(segmentDataManager);
+      } else {
+        if (!tableDataManager.isRecentlyDeleted(segmentName)) {
+          LOGGER.error("Could not find segment {} for table {} for requestId {}", segmentName, tableNameWithType,
+              requestId);
+          missingSegments++;
+        }
+      }
+    }
+
     int numSegmentsQueried = segmentDataManagers.size();
     boolean enableTrace = queryRequest.isEnableTrace();
     if (enableTrace) {
@@ -195,6 +213,19 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
     long queryProcessingTime = queryProcessingTimer.getDurationMs();
     dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED, Integer.toString(numSegmentsQueried));
     dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY, Long.toString(queryProcessingTime));
+
+    if (missingSegments > 0) {
+      // TODO: add this exception to the datatable after verfying the metrics
+      // Currently, given the deleted segments cache is in-memory only, a server restart will reset it
+      // We might end up sending partial-response metadata in such cases. It appears that the likelihood of
+      // this occurence is low; ie, segment has to be retained out and the server must be restarted while the
+      // broker view is still behind. We would however like to validate that and/or conf control this based on 
+      // data.
+      /*dataTable.addException(QueryException.getException(QueryException.SEGMENTS_MISSING_ERROR,
+          "Could not find " + missingSegments + " segments on the server"));*/
+      _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, missingSegments);
+    }
+
     LOGGER.debug("Query processing time for request Id - {}: {}", requestId, queryProcessingTime);
     LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, dataTable);
     return dataTable;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 9b28f5a..a19b288 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -158,6 +158,11 @@ public class BaseTableDataManagerTest {
     // Removing the segment again is fine.
     tableDataManager.removeSegment(segmentName);
 
+    // Delete the segment
+    tableDataManager.notifySegmentDeleted(segmentName);
+    // check that it is recorded as deleted
+    Assert.assertTrue(tableDataManager.isRecentlyDeleted(segmentName));
+
     // Add a new segment and remove it in order this time.
     final String anotherSeg = "AnotherSegment";
     ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs);
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index b6ce1e2..a4b3241 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -68,6 +68,7 @@ public class QueryExecutorTest {
   private final List<ImmutableSegment> _indexSegments = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
   private final List<String> _segmentNames = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
 
+  private InstanceDataManager _instanceDataManager;
   private ServerMetrics _serverMetrics;
   private QueryExecutor _queryExecutor;
 
@@ -105,8 +106,8 @@ public class QueryExecutorTest {
     for (ImmutableSegment indexSegment : _indexSegments) {
       tableDataManager.addSegment(indexSegment);
     }
-    InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
-    when(instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
+    _instanceDataManager = mock(InstanceDataManager.class);
+    when(_instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
 
     // Set up the query executor
     resourceUrl = getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH);
@@ -115,7 +116,7 @@ public class QueryExecutorTest {
     queryExecutorConfig.setDelimiterParsingDisabled(false);
     queryExecutorConfig.load(new File(resourceUrl.getFile()));
     _queryExecutor = new ServerQueryExecutorV1Impl();
-    _queryExecutor.init(queryExecutorConfig, instanceDataManager, _serverMetrics);
+    _queryExecutor.init(queryExecutorConfig, _instanceDataManager, _serverMetrics);
   }
 
   @Test
@@ -154,6 +155,49 @@ public class QueryExecutorTest {
     Assert.assertEquals(instanceResponse.getDouble(0, 0), 0.0);
   }
 
+  @Test
+  public void testDeletedSegmentQuery() {
+    String query = "SELECT count(*) FROM " + TABLE_NAME;
+    _instanceDataManager.notifySegmentDeleted(TABLE_NAME, _segmentNames.get(0));
+
+    InstanceRequest instanceRequest = new InstanceRequest(0L, COMPILER.compileToBrokerRequest(query));
+    instanceRequest.setSearchSegments(_segmentNames);
+    DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L);
+
+    for (String key : instanceResponse.getMetadata().keySet()) {
+      if (key.startsWith(DataTable.EXCEPTION_METADATA_KEY)) {
+        Assert.fail("Response should not contain exceptions");
+      }
+    }
+  }
+
+  // TODO: enable this when the code is updated to set the exception
+  @Test(enabled=false)
+  public void testMissingSegmentQuery() {
+    String query = "SELECT count(*) FROM " + TABLE_NAME;
+
+    List<String> searchSegments = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE + 1);
+    searchSegments.addAll(_segmentNames);
+    searchSegments.add("NON_EXISTENT_SEGMENT");
+
+    InstanceRequest instanceRequest = new InstanceRequest(0L, COMPILER.compileToBrokerRequest(query));
+    instanceRequest.setSearchSegments(searchSegments);
+    DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L);
+
+    boolean exception = false;
+    for (String key : instanceResponse.getMetadata().keySet()) {
+      if (key.startsWith(DataTable.EXCEPTION_METADATA_KEY)) {
+        // "null" below stems from a quirk around how the processing exception is built
+        Assert.assertEquals("null:\nCould not find 1 segments on the server", instanceResponse.getMetadata().get(key));
+        exception = true;
+      }
+    }
+    Assert.assertTrue(exception, "Expected missing segment exception");
+  }
+
+
   @AfterClass
   public void tearDown() {
     for (IndexSegment segment : _indexSegments) {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 2e33aa3..d12645d 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -152,6 +152,22 @@ public class HelixInstanceDataManager implements InstanceDataManager {
   }
 
   @Override
+  public void notifySegmentAdded(@Nonnull String tableNameWithType, @Nonnull String segmentName) {
+    TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
+    if (tableDataManager != null) {
+      tableDataManager.notifySegmentAdded(segmentName);
+    }
+  }
+
+  @Override
+  public void notifySegmentDeleted(@Nonnull String tableNameWithType, @Nonnull String segmentName) {
+    TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
+    if (tableDataManager != null) {
+      tableDataManager.notifySegmentDeleted(segmentName);
+    }
+  }
+
+  @Override
   public void reloadSegment(@Nonnull String tableNameWithType, @Nonnull String segmentName)
       throws Exception {
     LOGGER.info("Reloading single segment: {} in table: {}", segmentName, tableNameWithType);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index eea3b57..d915373 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -166,6 +166,8 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
         } else {
           _instanceDataManager.addRealtimeSegment(tableNameWithType, segmentName);
         }
+        // handle any book-keeping after a segment is added
+        _instanceDataManager.notifySegmentAdded(tableNameWithType, segmentName);
       } catch (Exception e) {
         _logger.error("Caught exception in state transition from OFFLINE -> ONLINE for resource: {}, partition: {}",
             tableNameWithType, segmentName, e);
@@ -196,6 +198,9 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
       String tableNameWithType = message.getResourceName();
       String segmentName = message.getPartitionName();
 
+      // handle any additional book-keeping that needs to be done when a segment is dropped
+      _instanceDataManager.notifySegmentDeleted(tableNameWithType, segmentName);
+
       // This method might modify the file on disk. Use segment lock to prevent race condition
       Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName);
       try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org