You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2019/03/22 16:33:16 UTC
[incubator-pinot] branch master updated: Set processingException
when all queried segments cannot be acquired (#3942)
This is an automated email from the ASF dual-hosted git repository.
sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d4f2ece Set processingException when all queried segments cannot be acquired (#3942)
d4f2ece is described below
commit d4f2ecef660ab1d4efa9696a53b0623aac867c3f
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Fri Mar 22 09:33:09 2019 -0700
Set processingException when all queried segments cannot be acquired (#3942)
* Set processingException when all queried segments cannot be acquired
* Minor fixes
* Address review comments
* Disable setting processing exception until metrics are validated
* Address review comments
* Address review comments
* Address review comments
* Address review comments
* Address review comments: Make InstanceDataManager symmetric with respect to tracking segments added and deleted
---
.../pinot/common/exception/QueryException.java | 2 +
.../core/data/manager/BaseTableDataManager.java | 48 +++++++++++++++++----
.../core/data/manager/InstanceDataManager.java | 14 ++++++
.../pinot/core/data/manager/TableDataManager.java | 15 +++++++
.../query/executor/ServerQueryExecutorV1Impl.java | 33 +++++++++++++-
.../data/manager/BaseTableDataManagerTest.java | 5 +++
.../pinot/query/executor/QueryExecutorTest.java | 50 ++++++++++++++++++++--
.../starter/helix/HelixInstanceDataManager.java | 16 +++++++
.../SegmentOnlineOfflineStateModelFactory.java | 5 +++
9 files changed, 175 insertions(+), 13 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index e0ed903..6c45897 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -41,6 +41,7 @@ public class QueryException {
public static final int SEGMENT_PLAN_EXECUTION_ERROR_CODE = 160;
public static final int COMBINE_SEGMENT_PLAN_TIMEOUT_ERROR_CODE = 170;
public static final int ACCESS_DENIED_ERROR_CODE = 180;
+ public static final int SEGMENTS_MISSING_ERROR_CODE = 190;
public static final int QUERY_EXECUTION_ERROR_CODE = 200;
// TODO: Handle these errors in broker
public static final int SERVER_SHUTTING_DOWN_ERROR_CODE = 210;
@@ -96,6 +97,7 @@ public class QueryException {
public static final ProcessingException QUERY_VALIDATION_ERROR = new ProcessingException(QUERY_VALIDATION_ERROR_CODE);
public static final ProcessingException UNKNOWN_ERROR = new ProcessingException(UNKNOWN_ERROR_CODE);
public static final ProcessingException QUOTA_EXCEEDED_ERROR = new ProcessingException(TOO_MANY_REQUESTS_ERROR_CODE);
+ public static final ProcessingException SEGMENTS_MISSING_ERROR = new ProcessingException(SEGMENTS_MISSING_ERROR_CODE);
static {
JSON_PARSING_ERROR.setMessage("JsonParsingError");
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index a456691..3f074a1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -19,10 +19,13 @@
package org.apache.pinot.core.data.manager;
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.helix.ZNRecord;
@@ -42,9 +45,13 @@ import org.slf4j.LoggerFactory;
@ThreadSafe
public abstract class BaseTableDataManager implements TableDataManager {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseTableDataManager.class);
+ // cache deleted segment names for utmost this duration
+ private static final int MAX_CACHE_DURATION_SEC = 6 * 3600; // 6 hours
protected final ConcurrentHashMap<String, SegmentDataManager> _segmentDataManagerMap = new ConcurrentHashMap<>();
+ protected Cache<String, Boolean> _deletedSegmentsCache;
+
protected TableDataManagerConfig _tableDataManagerConfig;
protected String _instanceId;
protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -59,6 +66,7 @@ public abstract class BaseTableDataManager implements TableDataManager {
@Nonnull ZkHelixPropertyStore<ZNRecord> propertyStore, @Nonnull ServerMetrics serverMetrics) {
LOGGER.info("Initializing table data manager for table: {}", tableDataManagerConfig.getTableName());
+ _deletedSegmentsCache = CacheBuilder.newBuilder().expireAfterWrite(MAX_CACHE_DURATION_SEC, TimeUnit.SECONDS).build();
_tableDataManagerConfig = tableDataManagerConfig;
_instanceId = instanceId;
_propertyStore = propertyStore;
@@ -117,6 +125,8 @@ public abstract class BaseTableDataManager implements TableDataManager {
ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment);
SegmentDataManager oldSegmentManager = _segmentDataManagerMap.put(segmentName, newSegmentManager);
+
+ // release old segment if needed
if (oldSegmentManager == null) {
_logger.info("Added new immutable segment: {} to table: {}", segmentName, _tableNameWithType);
} else {
@@ -156,6 +166,35 @@ public abstract class BaseTableDataManager implements TableDataManager {
}
}
+ /**
+ * Called when a segment is deleted. The actual handling of segment delete is outside of this method.
+ * This method provides book-keeping around deleted segments.
+ * @param segmentName name of the segment to track.
+ */
+ public void notifySegmentDeleted(@Nonnull String segmentName) {
+ // add segment to the cache
+ _deletedSegmentsCache.put(segmentName, true);
+ }
+
+ /**
+ * Check if a segment is recently deleted.
+ *
+ * @param segmentName name of the segment to check.
+ * @return true if segment is in the cache, false otherwise
+ */
+ public boolean isRecentlyDeleted(@Nonnull String segmentName) {
+ return _deletedSegmentsCache.getIfPresent(segmentName) != null;
+ }
+
+ /**
+ * Remove a segment from the deleted cache if it is being added back.
+ *
+ * @param segmentName name of the segment that needs to removed from the cache (if needed)
+ */
+ public void notifySegmentAdded(@Nonnull String segmentName) {
+ _deletedSegmentsCache.invalidate(segmentName);
+ }
+
@Nonnull
@Override
public List<SegmentDataManager> acquireAllSegments() {
@@ -176,8 +215,6 @@ public abstract class BaseTableDataManager implements TableDataManager {
SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
if (segmentDataManager != null && segmentDataManager.increaseReferenceCount()) {
segmentDataManagers.add(segmentDataManager);
- } else {
- handleMissingSegment(segmentName);
}
}
return segmentDataManagers;
@@ -189,17 +226,10 @@ public abstract class BaseTableDataManager implements TableDataManager {
if (segmentDataManager != null && segmentDataManager.increaseReferenceCount()) {
return segmentDataManager;
} else {
- handleMissingSegment(segmentName);
return null;
}
}
- private void handleMissingSegment(String segmentName) {
- // could not find segment
- LOGGER.error("Could not find segment " + segmentName + " for table " + _tableNameWithType);
- _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, 1);
- }
-
@Override
public void releaseSegment(@Nonnull SegmentDataManager segmentDataManager) {
if (segmentDataManager.decreaseReferenceCount()) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 85090dc..9399c7a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -79,6 +79,20 @@ public interface InstanceDataManager {
throws Exception;
/**
+ * Handles addition of a segment from the table.
+ *
+ * This method performs book keeping of added segments, especially if the deleted-cache needs to be invalidated
+ */
+ void notifySegmentAdded(@Nonnull String tableNameWithType, @Nonnull String segmentName);
+
+ /**
+ * Handles deletion of a segment from the table.
+ *
+ * This method performs book keeping of deleted segments.
+ */
+ void notifySegmentDeleted(@Nonnull String tableNameWithType, @Nonnull String segmentName);
+
+ /**
* Reloads a segment in a table.
*/
void reloadSegment(@Nonnull String tableNameWithType, @Nonnull String segmentName)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
index c237fbf..16eae6b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/TableDataManager.java
@@ -80,6 +80,21 @@ public interface TableDataManager {
void removeSegment(@Nonnull String segmentName);
/**
+ * Track a deleted segment.
+ */
+ void notifySegmentDeleted(@Nonnull String segmentName);
+
+ /**
+ * Track addition of a segment
+ */
+ void notifySegmentAdded(@Nonnull String segmentName);
+
+ /**
+ * Check if a segment is recently deleted.
+ */
+ boolean isRecentlyDeleted(@Nonnull String segmentName);
+
+ /**
* Acquires all segments of the table.
* <p>It is the caller's responsibility to return the segments by calling {@link #releaseSegment(SegmentDataManager)}.
*
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 33e0db5..4ab000c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.executor;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -124,7 +125,24 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType);
Preconditions.checkState(tableDataManager != null, "Failed to find data manager for table: " + tableNameWithType);
- List<SegmentDataManager> segmentDataManagers = tableDataManager.acquireSegments(queryRequest.getSegmentsToQuery());
+
+ // acquire the segments
+ int missingSegments = 0;
+ List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
+ List<SegmentDataManager> segmentDataManagers = new ArrayList<>();
+ for (String segmentName : segmentsToQuery) {
+ SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+ if (segmentDataManager != null) {
+ segmentDataManagers.add(segmentDataManager);
+ } else {
+ if (!tableDataManager.isRecentlyDeleted(segmentName)) {
+ LOGGER.error("Could not find segment {} for table {} for requestId {}", segmentName, tableNameWithType,
+ requestId);
+ missingSegments++;
+ }
+ }
+ }
+
int numSegmentsQueried = segmentDataManagers.size();
boolean enableTrace = queryRequest.isEnableTrace();
if (enableTrace) {
@@ -195,6 +213,19 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
long queryProcessingTime = queryProcessingTimer.getDurationMs();
dataTable.getMetadata().put(DataTable.NUM_SEGMENTS_QUERIED, Integer.toString(numSegmentsQueried));
dataTable.getMetadata().put(DataTable.TIME_USED_MS_METADATA_KEY, Long.toString(queryProcessingTime));
+
+ if (missingSegments > 0) {
+ // TODO: add this exception to the datatable after verfying the metrics
+ // Currently, given the deleted segments cache is in-memory only, a server restart will reset it
+ // We might end up sending partial-response metadata in such cases. It appears that the likelihood of
+ // this occurence is low; ie, segment has to be retained out and the server must be restarted while the
+ // broker view is still behind. We would however like to validate that and/or conf control this based on
+ // data.
+ /*dataTable.addException(QueryException.getException(QueryException.SEGMENTS_MISSING_ERROR,
+ "Could not find " + missingSegments + " segments on the server"));*/
+ _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, missingSegments);
+ }
+
LOGGER.debug("Query processing time for request Id - {}: {}", requestId, queryProcessingTime);
LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, dataTable);
return dataTable;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 9b28f5a..a19b288 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -158,6 +158,11 @@ public class BaseTableDataManagerTest {
// Removing the segment again is fine.
tableDataManager.removeSegment(segmentName);
+ // Delete the segment
+ tableDataManager.notifySegmentDeleted(segmentName);
+ // check that it is recorded as deleted
+ Assert.assertTrue(tableDataManager.isRecentlyDeleted(segmentName));
+
// Add a new segment and remove it in order this time.
final String anotherSeg = "AnotherSegment";
ImmutableSegment ix1 = makeImmutableSegment(anotherSeg, totalDocs);
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index b6ce1e2..a4b3241 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -68,6 +68,7 @@ public class QueryExecutorTest {
private final List<ImmutableSegment> _indexSegments = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
private final List<String> _segmentNames = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE);
+ private InstanceDataManager _instanceDataManager;
private ServerMetrics _serverMetrics;
private QueryExecutor _queryExecutor;
@@ -105,8 +106,8 @@ public class QueryExecutorTest {
for (ImmutableSegment indexSegment : _indexSegments) {
tableDataManager.addSegment(indexSegment);
}
- InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
- when(instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
+ _instanceDataManager = mock(InstanceDataManager.class);
+ when(_instanceDataManager.getTableDataManager(TABLE_NAME)).thenReturn(tableDataManager);
// Set up the query executor
resourceUrl = getClass().getClassLoader().getResource(QUERY_EXECUTOR_CONFIG_PATH);
@@ -115,7 +116,7 @@ public class QueryExecutorTest {
queryExecutorConfig.setDelimiterParsingDisabled(false);
queryExecutorConfig.load(new File(resourceUrl.getFile()));
_queryExecutor = new ServerQueryExecutorV1Impl();
- _queryExecutor.init(queryExecutorConfig, instanceDataManager, _serverMetrics);
+ _queryExecutor.init(queryExecutorConfig, _instanceDataManager, _serverMetrics);
}
@Test
@@ -154,6 +155,49 @@ public class QueryExecutorTest {
Assert.assertEquals(instanceResponse.getDouble(0, 0), 0.0);
}
+ @Test
+ public void testDeletedSegmentQuery() {
+ String query = "SELECT count(*) FROM " + TABLE_NAME;
+ _instanceDataManager.notifySegmentDeleted(TABLE_NAME, _segmentNames.get(0));
+
+ InstanceRequest instanceRequest = new InstanceRequest(0L, COMPILER.compileToBrokerRequest(query));
+ instanceRequest.setSearchSegments(_segmentNames);
+ DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+ Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L);
+
+ for (String key : instanceResponse.getMetadata().keySet()) {
+ if (key.startsWith(DataTable.EXCEPTION_METADATA_KEY)) {
+ Assert.fail("Response should not contain exceptions");
+ }
+ }
+ }
+
+ // TODO: enable this when the code is updated to set the exception
+ @Test(enabled=false)
+ public void testMissingSegmentQuery() {
+ String query = "SELECT count(*) FROM " + TABLE_NAME;
+
+ List<String> searchSegments = new ArrayList<>(NUM_SEGMENTS_TO_GENERATE + 1);
+ searchSegments.addAll(_segmentNames);
+ searchSegments.add("NON_EXISTENT_SEGMENT");
+
+ InstanceRequest instanceRequest = new InstanceRequest(0L, COMPILER.compileToBrokerRequest(query));
+ instanceRequest.setSearchSegments(searchSegments);
+ DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+ Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L);
+
+ boolean exception = false;
+ for (String key : instanceResponse.getMetadata().keySet()) {
+ if (key.startsWith(DataTable.EXCEPTION_METADATA_KEY)) {
+ // "null" below stems from a quirk around how the processing exception is built
+ Assert.assertEquals("null:\nCould not find 1 segments on the server", instanceResponse.getMetadata().get(key));
+ exception = true;
+ }
+ }
+ Assert.assertTrue(exception, "Expected missing segment exception");
+ }
+
+
@AfterClass
public void tearDown() {
for (IndexSegment segment : _indexSegments) {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 2e33aa3..d12645d 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -152,6 +152,22 @@ public class HelixInstanceDataManager implements InstanceDataManager {
}
@Override
+ public void notifySegmentAdded(@Nonnull String tableNameWithType, @Nonnull String segmentName) {
+ TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
+ if (tableDataManager != null) {
+ tableDataManager.notifySegmentAdded(segmentName);
+ }
+ }
+
+ @Override
+ public void notifySegmentDeleted(@Nonnull String tableNameWithType, @Nonnull String segmentName) {
+ TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType);
+ if (tableDataManager != null) {
+ tableDataManager.notifySegmentDeleted(segmentName);
+ }
+ }
+
+ @Override
public void reloadSegment(@Nonnull String tableNameWithType, @Nonnull String segmentName)
throws Exception {
LOGGER.info("Reloading single segment: {} in table: {}", segmentName, tableNameWithType);
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index eea3b57..d915373 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -166,6 +166,8 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
} else {
_instanceDataManager.addRealtimeSegment(tableNameWithType, segmentName);
}
+ // handle any book-keeping after a segment is added
+ _instanceDataManager.notifySegmentAdded(tableNameWithType, segmentName);
} catch (Exception e) {
_logger.error("Caught exception in state transition from OFFLINE -> ONLINE for resource: {}, partition: {}",
tableNameWithType, segmentName, e);
@@ -196,6 +198,9 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
String tableNameWithType = message.getResourceName();
String segmentName = message.getPartitionName();
+ // handle any additional book-keeping that needs to be done when a segment is dropped
+ _instanceDataManager.notifySegmentDeleted(tableNameWithType, segmentName);
+
// This method might modify the file on disk. Use segment lock to prevent race condition
Lock segmentLock = SegmentLocks.getSegmentLock(tableNameWithType, segmentName);
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org