You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/03/31 05:25:13 UTC
[pinot] branch master updated: Remove the tableDataManager deletion mechanism when the number of segments drops to 0 to avoid race conditions. (#8422)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 51904fb Remove the tableDataManager deletion mechanism when the number of segments drops to 0 to avoid race conditions. (#8422)
51904fb is described below
commit 51904fb99e4368c430503b2bf0eb5c42ce5663d1
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Wed Mar 30 22:24:51 2022 -0700
Remove the tableDataManager deletion mechanism when the number of segments drops to 0 to avoid race conditions. (#8422)
Add TableDeletionMessage, the tableDataManager can only be removed upon receiving the TableDeletionMessage when the table is dropped.
---
.../common/messages/TableDeletionMessage.java | 46 ++++++++++++++++++
.../apache/pinot/common/metrics/ServerMeter.java | 1 +
.../helix/core/PinotHelixResourceManager.java | 54 ++++++++++++++++++++--
.../core/data/manager/InstanceDataManager.java | 6 +++
.../pinot/integration/tests/ClusterTest.java | 4 ++
.../tests/BaseClusterIntegrationTestSet.java | 22 +++++++++
.../tests/OfflineClusterIntegrationTest.java | 20 ++++++++
.../tests/RealtimeClusterIntegrationTest.java | 2 +
.../server/starter/helix/BaseServerStarter.java | 6 +++
.../starter/helix/HelixInstanceDataManager.java | 39 +++++++++++++---
.../helix/SegmentMessageHandlerFactory.java | 25 ++++++++++
11 files changed, 216 insertions(+), 9 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/TableDeletionMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/TableDeletionMessage.java
new file mode 100644
index 0000000..5b05ab0
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/TableDeletionMessage.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.UUID;
+import javax.annotation.Nonnull;
+import org.apache.helix.model.Message;
+
+/**
+ * This Helix message is sent from the controller to the servers to remove TableDataManager when the table is deleted.
+ */
+public class TableDeletionMessage extends Message {
+ public static final String DELETE_TABLE_MSG_SUB_TYPE = "DELETE_TABLE";
+
+ public TableDeletionMessage(@Nonnull String tableNameWithType) {
+ super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+ setResourceName(tableNameWithType);
+ setMsgSubType(DELETE_TABLE_MSG_SUB_TYPE);
+ // Give it infinite time to process the message, as long as session is alive
+ setExecutionTimeout(-1);
+ }
+
+ public TableDeletionMessage(Message message) {
+ super(message.getRecord());
+ String msgSubType = message.getMsgSubType();
+ Preconditions.checkArgument(msgSubType.equals(DELETE_TABLE_MSG_SUB_TYPE),
+ "Invalid message sub type: " + msgSubType + " for TableDeletionMessage");
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 4a9b290..e6512df 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -33,6 +33,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
QUERY_EXECUTION_EXCEPTIONS("exceptions", false),
HELIX_ZOOKEEPER_RECONNECTS("reconnects", true),
DELETED_SEGMENT_COUNT("segments", false),
+ DELETE_TABLE_FAILURES("tables", false),
REALTIME_ROWS_CONSUMED("rows", true),
INVALID_REALTIME_ROWS_DROPPED("rows", false),
REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ecbd3b7..3b90117 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -91,6 +91,7 @@ import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
import org.apache.pinot.common.messages.TableConfigRefreshMessage;
+import org.apache.pinot.common.messages.TableDeletionMessage;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -1757,6 +1758,17 @@ public class PinotHelixResourceManager {
HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, offlineTableName);
LOGGER.info("Deleting table {}: Removed from broker resource", offlineTableName);
+ // Drop the table on servers
+ // TODO: Make this api idempotent and blocking by waiting for externalview to converge on controllers
+ // instead of servers. This is because if externalview gets updated with significant delay,
+ // we may have the race condition for table recreation that the new table will use the old states
+ // (old table data manager) on the servers.
+ // Steps needed:
+ // 1. Drop the helix resource first (set idealstate as null)
+ // 2. Wait for the externalview to converge
+ // 3. Get servers for the tenant, and send delete table message to these servers
+ deleteTableOnServer(offlineTableName);
+
// Drop the table
if (_helixAdmin.getResourcesInCluster(_helixClusterName).contains(offlineTableName)) {
_helixAdmin.dropResource(_helixClusterName, offlineTableName);
@@ -1800,6 +1812,11 @@ public class PinotHelixResourceManager {
HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, realtimeTableName);
LOGGER.info("Deleting table {}: Removed from broker resource", realtimeTableName);
+ // Drop the table on servers
+ // TODO: Make this api idempotent and blocking by waiting for externalview to converge on controllers
+ // instead of servers. Follow the same steps for offline tables.
+ deleteTableOnServer(realtimeTableName);
+
// Cache the state and drop the table
Set<String> instancesForTable = null;
if (_helixAdmin.getResourcesInCluster(_helixClusterName).contains(realtimeTableName)) {
@@ -2053,6 +2070,36 @@ public class PinotHelixResourceManager {
return ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata);
}
+ /**
+ * Delete the table on servers by sending table deletion message
+ */
+ private void deleteTableOnServer(String tableNameWithType) {
+ LOGGER.info("Sending delete table message for table: {}", tableNameWithType);
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setInstanceName("%");
+ recipientCriteria.setResource(tableNameWithType);
+ recipientCriteria.setSessionSpecific(true);
+ TableDeletionMessage tableDeletionMessage = new TableDeletionMessage(tableNameWithType);
+ ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
+
+ // Externalview can be null for newly created table, skip sending the message
+ if (_helixZkManager.getHelixDataAccessor()
+ .getProperty(_helixZkManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType)) == null) {
+ LOGGER.warn("No delete table message sent for newly created table: {} as the externalview is null.",
+ tableNameWithType);
+ return;
+ }
+ // Infinite timeout on the recipient
+ int timeoutMs = -1;
+ int numMessagesSent = messagingService.send(recipientCriteria, tableDeletionMessage, null, timeoutMs);
+ if (numMessagesSent > 0) {
+ LOGGER.info("Sent {} delete table messages for table: {}", numMessagesSent, tableNameWithType);
+ } else {
+ LOGGER.warn("No delete table message sent for table: {}", tableNameWithType);
+ }
+ }
+
public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl, @Nullable String crypter,
long segmentSizeInBytes) {
@@ -3285,10 +3332,11 @@ public class PinotHelixResourceManager {
tableNameWithType, segmentsToCheck));
}
- private Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) {
+ public Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) {
ExternalView externalView = getTableExternalView(tableNameWithType);
- Preconditions
- .checkState(externalView != null, String.format("External view is null for table (%s)", tableNameWithType));
+ if (externalView == null) {
+ return Collections.emptySet();
+ }
Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 14d98b3..a879953 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -67,6 +67,12 @@ public interface InstanceDataManager {
void shutDown();
/**
+ * Delete a table.
+ */
+ void deleteTable(String tableNameWithType)
+ throws Exception;
+
+ /**
* Adds a segment from local disk into an OFFLINE table.
*/
void addOfflineSegment(String offlineTableName, String segmentName, File indexDir)
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index f706477..ed28045 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -245,6 +245,10 @@ public abstract class ClusterTest extends ControllerTest {
}
}
+ protected List<HelixServerStarter> getServerStarters() {
+ return _serverStarters;
+ }
+
protected void startServerHttps() {
FileUtils.deleteQuietly(new File(Server.DEFAULT_INSTANCE_BASE_DIR));
_serverStarters = new ArrayList<>();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 9d12aca..58019ce 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -30,6 +30,7 @@ import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.query.utils.idset.IdSets;
+import org.apache.pinot.server.starter.helix.HelixServerStarter;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
@@ -73,6 +74,27 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
}
/**
+ * Test server table data manager deletion after the table is dropped
+ */
+ protected void cleanupTestTableDataManager(String tableNameWithType)
+ throws Exception {
+ List<HelixServerStarter> serverStarters = getServerStarters();
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ for (HelixServerStarter serverStarter : serverStarters) {
+ if (serverStarter.getServerInstance().getInstanceDataManager().getTableDataManager(tableNameWithType)
+ != null) {
+ return false;
+ }
+ }
+ return true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, 600_000L, "Failed to delete table data managers");
+ }
+
+ /**
* Test hardcoded queries.
* <p>NOTE:
* <p>For queries with <code>LIMIT</code>, need to remove limit or add <code>LIMIT 10000</code> to the H2 SQL query
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 5936f68..eb2994c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -32,7 +32,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
@@ -71,6 +73,8 @@ import org.testng.annotations.Test;
import static org.apache.pinot.common.function.scalar.StringFunctions.decodeUrl;
import static org.apache.pinot.common.function.scalar.StringFunctions.encodeUrl;
+import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_CHECK_INTERVAL_MS;
+import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
import static org.testng.Assert.*;
@@ -409,7 +413,23 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
assertEquals(segmentsZKMetadata.size(), 1);
assertNotEquals(segmentsZKMetadata.get(0).getRefreshTime(), Long.MIN_VALUE);
}
+ waitForNumOfSegmentsBecomeOnline(offlineTableName, 1);
dropOfflineTable(SEGMENT_UPLOAD_TEST_TABLE);
+ cleanupTestTableDataManager(offlineTableName);
+ }
+
+ private void waitForNumOfSegmentsBecomeOnline(String tableNameWithType, int numSegments)
+ throws InterruptedException, TimeoutException {
+ long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
+ do {
+ Set<String> onlineSegments = _helixResourceManager.getOnlineSegmentsFromExternalView(tableNameWithType);
+ if (onlineSegments.size() == numSegments) {
+ return;
+ }
+ Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
+ } while (System.currentTimeMillis() < endTimeMs);
+ throw new TimeoutException(String
+ .format("Time out while waiting segments become ONLINE. (tableNameWithType = %s)", tableNameWithType));
}
@Test(dependsOnMethods = "testRangeIndexTriggering")
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index b3c3df4..6208c36 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -167,6 +168,7 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
public void tearDown()
throws Exception {
dropRealtimeTable(getTableName());
+ cleanupTestTableDataManager(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
stopServer();
stopBroker();
stopController();
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 19467eb..d940ccb 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.server.starter.helix;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.ArrayList;
@@ -718,6 +719,11 @@ public abstract class BaseServerStarter implements ServiceStartable {
return _serverConf;
}
+ @VisibleForTesting
+ public ServerInstance getServerInstance() {
+ return _serverInstance;
+ }
+
/**
* Helper method to set system resource info into instance config.
*
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 21d3c90..3cc1296 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import javax.annotation.Nullable;
@@ -39,6 +40,7 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -73,6 +75,9 @@ import org.slf4j.LoggerFactory;
@ThreadSafe
public class HelixInstanceDataManager implements InstanceDataManager {
private static final Logger LOGGER = LoggerFactory.getLogger(HelixInstanceDataManager.class);
+ // TODO: Make this configurable
+ private static final long EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = 20 * 60_000L; // 20 minutes
+ private static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 second
private final ConcurrentHashMap<String, TableDataManager> _tableDataManagerMap = new ConcurrentHashMap<>();
@@ -185,17 +190,39 @@ public class HelixInstanceDataManager implements InstanceDataManager {
}
@Override
+ public void deleteTable(String tableNameWithType)
+ throws Exception {
+ // Wait externalview to converge
+ long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS;
+ do {
+ ExternalView externalView = _helixManager.getHelixDataAccessor()
+ .getProperty(_helixManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType));
+ if (externalView == null) {
+ LOGGER.info("ExternalView converged for the table to delete: {}", tableNameWithType);
+ _tableDataManagerMap.compute(tableNameWithType, (k, v) -> {
+ if (v != null) {
+ v.shutDown();
+ LOGGER.info("Removed table: {}", tableNameWithType);
+ } else {
+ LOGGER.warn("Failed to find table data manager for table: {}, skip removing the table", tableNameWithType);
+ }
+ return null;
+ });
+ return;
+ }
+ Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
+ } while (System.currentTimeMillis() < endTimeMs);
+ throw new TimeoutException(
+ "Timeout while waiting for ExternalView to converge for the table to delete: " + tableNameWithType);
+ }
+
+ @Override
public void removeSegment(String tableNameWithType, String segmentName) {
LOGGER.info("Removing segment: {} from table: {}", segmentName, tableNameWithType);
_tableDataManagerMap.computeIfPresent(tableNameWithType, (k, v) -> {
v.removeSegment(segmentName);
LOGGER.info("Removed segment: {} from table: {}", segmentName, k);
- if (v.getNumSegments() == 0) {
- v.shutDown();
- return null;
- } else {
- return v;
- }
+ return v;
});
}
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index d00a558..ed6e9b6 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -26,6 +26,7 @@ import org.apache.helix.model.Message;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.messages.SegmentRefreshMessage;
import org.apache.pinot.common.messages.SegmentReloadMessage;
+import org.apache.pinot.common.messages.TableDeletionMessage;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -58,6 +59,8 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
return new SegmentRefreshMessageHandler(new SegmentRefreshMessage(message), _metrics, context);
case SegmentReloadMessage.RELOAD_SEGMENT_MSG_SUB_TYPE:
return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), _metrics, context);
+ case TableDeletionMessage.DELETE_TABLE_MSG_SUB_TYPE:
+ return new TableDeletionMessageHandler(new TableDeletionMessage(message), _metrics, context);
default:
LOGGER.warn("Unsupported user defined message sub type: {} for segment: {}", msgSubType,
message.getPartitionName());
@@ -144,6 +147,28 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
}
}
+ private class TableDeletionMessageHandler extends DefaultMessageHandler {
+ TableDeletionMessageHandler(TableDeletionMessage tableDeletionMessage, ServerMetrics metrics,
+ NotificationContext context) {
+ super(tableDeletionMessage, metrics, context);
+ }
+
+ @Override
+ public HelixTaskResult handleMessage()
+ throws InterruptedException {
+ HelixTaskResult helixTaskResult = new HelixTaskResult();
+ _logger.info("Handling table deletion message");
+ try {
+ _instanceDataManager.deleteTable(_tableNameWithType);
+ helixTaskResult.setSuccess(true);
+ } catch (Exception e) {
+ _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETE_TABLE_FAILURES, 1);
+ Utils.rethrowException(e);
+ }
+ return helixTaskResult;
+ }
+ }
+
private static class DefaultMessageHandler extends MessageHandler {
final String _segmentName;
final String _tableNameWithType;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org