You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/03/31 05:25:13 UTC

[pinot] branch master updated: Remove the tableDataManager deletion mechanism when the number of segments drops to 0 to avoid race conditions. (#8422)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 51904fb  Remove the tableDataManager deletion mechanism when the number of segments drops to 0 to avoid race conditions. (#8422)
51904fb is described below

commit 51904fb99e4368c430503b2bf0eb5c42ce5663d1
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Wed Mar 30 22:24:51 2022 -0700

    Remove the tableDataManager deletion mechanism when the number of segments drops to 0 to avoid race conditions. (#8422)
    
    Add TableDeletionMessage, the tableDataManager can only be removed upon receiving the TableDeletionMessage when the table is dropped.
---
 .../common/messages/TableDeletionMessage.java      | 46 ++++++++++++++++++
 .../apache/pinot/common/metrics/ServerMeter.java   |  1 +
 .../helix/core/PinotHelixResourceManager.java      | 54 ++++++++++++++++++++--
 .../core/data/manager/InstanceDataManager.java     |  6 +++
 .../pinot/integration/tests/ClusterTest.java       |  4 ++
 .../tests/BaseClusterIntegrationTestSet.java       | 22 +++++++++
 .../tests/OfflineClusterIntegrationTest.java       | 20 ++++++++
 .../tests/RealtimeClusterIntegrationTest.java      |  2 +
 .../server/starter/helix/BaseServerStarter.java    |  6 +++
 .../starter/helix/HelixInstanceDataManager.java    | 39 +++++++++++++---
 .../helix/SegmentMessageHandlerFactory.java        | 25 ++++++++++
 11 files changed, 216 insertions(+), 9 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/TableDeletionMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/TableDeletionMessage.java
new file mode 100644
index 0000000..5b05ab0
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/TableDeletionMessage.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.UUID;
+import javax.annotation.Nonnull;
+import org.apache.helix.model.Message;
+
+/**
+ * This Helix message is sent from the controller to the servers to remove TableDataManager when the table is deleted.
+ */
+public class TableDeletionMessage extends Message {
+  public static final String DELETE_TABLE_MSG_SUB_TYPE = "DELETE_TABLE";
+
+  public TableDeletionMessage(@Nonnull String tableNameWithType) {
+    super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString());
+    setResourceName(tableNameWithType);
+    setMsgSubType(DELETE_TABLE_MSG_SUB_TYPE);
+    // Give it infinite time to process the message, as long as session is alive
+    setExecutionTimeout(-1);
+  }
+
+  public TableDeletionMessage(Message message) {
+    super(message.getRecord());
+    String msgSubType = message.getMsgSubType();
+    Preconditions.checkArgument(msgSubType.equals(DELETE_TABLE_MSG_SUB_TYPE),
+        "Invalid message sub type: " + msgSubType + " for TableDeletionMessage");
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 4a9b290..e6512df 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -33,6 +33,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   QUERY_EXECUTION_EXCEPTIONS("exceptions", false),
   HELIX_ZOOKEEPER_RECONNECTS("reconnects", true),
   DELETED_SEGMENT_COUNT("segments", false),
+  DELETE_TABLE_FAILURES("tables", false),
   REALTIME_ROWS_CONSUMED("rows", true),
   INVALID_REALTIME_ROWS_DROPPED("rows", false),
   REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ecbd3b7..3b90117 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -91,6 +91,7 @@ import org.apache.pinot.common.messages.RoutingTableRebuildMessage;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.messages.TableConfigRefreshMessage;
+import org.apache.pinot.common.messages.TableDeletionMessage;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -1757,6 +1758,17 @@ public class PinotHelixResourceManager {
     HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, offlineTableName);
     LOGGER.info("Deleting table {}: Removed from broker resource", offlineTableName);
 
+    // Drop the table on servers
+    // TODO: Make this api idempotent and blocking by waiting for externalview to converge on controllers
+    //      instead of servers. This is because if externalview gets updated with significant delay,
+    //      we may have the race condition for table recreation that the new table will use the old states
+    //      (old table data manager) on the servers.
+    //      Steps needed:
+    //      1. Drop the helix resource first (set idealstate as null)
+    //      2. Wait for the externalview to converge
+    //      3. Get servers for the tenant, and send delete table message to these servers
+    deleteTableOnServer(offlineTableName);
+
     // Drop the table
     if (_helixAdmin.getResourcesInCluster(_helixClusterName).contains(offlineTableName)) {
       _helixAdmin.dropResource(_helixClusterName, offlineTableName);
@@ -1800,6 +1812,11 @@ public class PinotHelixResourceManager {
     HelixHelper.removeResourceFromBrokerIdealState(_helixZkManager, realtimeTableName);
     LOGGER.info("Deleting table {}: Removed from broker resource", realtimeTableName);
 
+    // Drop the table on servers
+    // TODO: Make this api idempotent and blocking by waiting for externalview to converge on controllers
+    //      instead of servers. Follow the same steps for offline tables.
+    deleteTableOnServer(realtimeTableName);
+
     // Cache the state and drop the table
     Set<String> instancesForTable = null;
     if (_helixAdmin.getResourcesInCluster(_helixClusterName).contains(realtimeTableName)) {
@@ -2053,6 +2070,36 @@ public class PinotHelixResourceManager {
     return ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata);
   }
 
+  /**
+   * Delete the table on servers by sending table deletion message
+   */
+  private void deleteTableOnServer(String tableNameWithType) {
+    LOGGER.info("Sending delete table message for table: {}", tableNameWithType);
+    Criteria recipientCriteria = new Criteria();
+    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setResource(tableNameWithType);
+    recipientCriteria.setSessionSpecific(true);
+    TableDeletionMessage tableDeletionMessage = new TableDeletionMessage(tableNameWithType);
+    ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
+
+    // Externalview can be null for newly created table, skip sending the message
+    if (_helixZkManager.getHelixDataAccessor()
+        .getProperty(_helixZkManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType)) == null) {
+      LOGGER.warn("No delete table message sent for newly created table: {} as the externalview is null.",
+          tableNameWithType);
+      return;
+    }
+    // Infinite timeout on the recipient
+    int timeoutMs = -1;
+    int numMessagesSent = messagingService.send(recipientCriteria, tableDeletionMessage, null, timeoutMs);
+    if (numMessagesSent > 0) {
+      LOGGER.info("Sent {} delete table messages for table: {}", numMessagesSent, tableNameWithType);
+    } else {
+      LOGGER.warn("No delete table message sent for table: {}", tableNameWithType);
+    }
+  }
+
   public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
       SegmentZKMetadata segmentZKMetadata, int expectedVersion, String downloadUrl, @Nullable String crypter,
       long segmentSizeInBytes) {
@@ -3285,10 +3332,11 @@ public class PinotHelixResourceManager {
             tableNameWithType, segmentsToCheck));
   }
 
-  private Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) {
+  public Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) {
     ExternalView externalView = getTableExternalView(tableNameWithType);
-    Preconditions
-        .checkState(externalView != null, String.format("External view is null for table (%s)", tableNameWithType));
+    if (externalView == null) {
+      return Collections.emptySet();
+    }
     Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
     Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
     for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 14d98b3..a879953 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -67,6 +67,12 @@ public interface InstanceDataManager {
   void shutDown();
 
   /**
+   * Delete a table.
+   */
+  void deleteTable(String tableNameWithType)
+      throws Exception;
+
+  /**
    * Adds a segment from local disk into an OFFLINE table.
    */
   void addOfflineSegment(String offlineTableName, String segmentName, File indexDir)
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index f706477..ed28045 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -245,6 +245,10 @@ public abstract class ClusterTest extends ControllerTest {
     }
   }
 
+  protected List<HelixServerStarter> getServerStarters() {
+    return _serverStarters;
+  }
+
   protected void startServerHttps() {
     FileUtils.deleteQuietly(new File(Server.DEFAULT_INSTANCE_BASE_DIR));
     _serverStarters = new ArrayList<>();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 9d12aca..58019ce 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -30,6 +30,7 @@ import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.query.utils.idset.IdSets;
+import org.apache.pinot.server.starter.helix.HelixServerStarter;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
@@ -73,6 +74,27 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
   }
 
   /**
+   * Test server table data manager deletion after the table is dropped
+   */
+  protected void cleanupTestTableDataManager(String tableNameWithType)
+      throws Exception {
+    List<HelixServerStarter> serverStarters = getServerStarters();
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        for (HelixServerStarter serverStarter : serverStarters) {
+          if (serverStarter.getServerInstance().getInstanceDataManager().getTableDataManager(tableNameWithType)
+              != null) {
+            return false;
+          }
+        }
+        return true;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }, 600_000L, "Failed to delete table data managers");
+  }
+
+  /**
    * Test hardcoded queries.
    * <p>NOTE:
    * <p>For queries with <code>LIMIT</code>, need to remove limit or add <code>LIMIT 10000</code> to the H2 SQL query
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 5936f68..eb2994c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -32,7 +32,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
@@ -71,6 +73,8 @@ import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.function.scalar.StringFunctions.decodeUrl;
 import static org.apache.pinot.common.function.scalar.StringFunctions.encodeUrl;
+import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_CHECK_INTERVAL_MS;
+import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
 import static org.testng.Assert.*;
 
 
@@ -409,7 +413,23 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       assertEquals(segmentsZKMetadata.size(), 1);
       assertNotEquals(segmentsZKMetadata.get(0).getRefreshTime(), Long.MIN_VALUE);
     }
+    waitForNumOfSegmentsBecomeOnline(offlineTableName, 1);
     dropOfflineTable(SEGMENT_UPLOAD_TEST_TABLE);
+    cleanupTestTableDataManager(offlineTableName);
+  }
+
+  private void waitForNumOfSegmentsBecomeOnline(String tableNameWithType, int numSegments)
+      throws InterruptedException, TimeoutException {
+    long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
+    do {
+      Set<String> onlineSegments = _helixResourceManager.getOnlineSegmentsFromExternalView(tableNameWithType);
+      if (onlineSegments.size() == numSegments) {
+        return;
+      }
+      Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
+    } while (System.currentTimeMillis() < endTimeMs);
+    throw new TimeoutException(String
+        .format("Time out while waiting segments become ONLINE. (tableNameWithType = %s)", tableNameWithType));
   }
 
   @Test(dependsOnMethods = "testRangeIndexTriggering")
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index b3c3df4..6208c36 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -167,6 +168,7 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
   public void tearDown()
       throws Exception {
     dropRealtimeTable(getTableName());
+    cleanupTestTableDataManager(TableNameBuilder.REALTIME.tableNameWithType(getTableName()));
     stopServer();
     stopBroker();
     stopController();
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 19467eb..d940ccb 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.server.starter.helix;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -718,6 +719,11 @@ public abstract class BaseServerStarter implements ServiceStartable {
     return _serverConf;
   }
 
+  @VisibleForTesting
+  public ServerInstance getServerInstance() {
+    return _serverInstance;
+  }
+
   /**
    * Helper method to set system resource info into instance config.
    *
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 21d3c90..3cc1296 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import javax.annotation.Nullable;
@@ -39,6 +40,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -73,6 +75,9 @@ import org.slf4j.LoggerFactory;
 @ThreadSafe
 public class HelixInstanceDataManager implements InstanceDataManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(HelixInstanceDataManager.class);
+  // TODO: Make this configurable
+  private static final long EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS = 20 * 60_000L; // 20 minutes
+  private static final long EXTERNAL_VIEW_CHECK_INTERVAL_MS = 1_000L; // 1 second
 
   private final ConcurrentHashMap<String, TableDataManager> _tableDataManagerMap = new ConcurrentHashMap<>();
 
@@ -185,17 +190,39 @@ public class HelixInstanceDataManager implements InstanceDataManager {
   }
 
   @Override
+  public void deleteTable(String tableNameWithType)
+      throws Exception {
+    // Wait externalview to converge
+    long endTimeMs = System.currentTimeMillis() + EXTERNAL_VIEW_DROPPED_MAX_WAIT_MS;
+    do {
+      ExternalView externalView = _helixManager.getHelixDataAccessor()
+          .getProperty(_helixManager.getHelixDataAccessor().keyBuilder().externalView(tableNameWithType));
+      if (externalView == null) {
+        LOGGER.info("ExternalView converged for the table to delete: {}", tableNameWithType);
+        _tableDataManagerMap.compute(tableNameWithType, (k, v) -> {
+          if (v != null) {
+            v.shutDown();
+            LOGGER.info("Removed table: {}", tableNameWithType);
+          } else {
+            LOGGER.warn("Failed to find table data manager for table: {}, skip removing the table", tableNameWithType);
+          }
+          return null;
+        });
+        return;
+      }
+      Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
+    } while (System.currentTimeMillis() < endTimeMs);
+    throw new TimeoutException(
+        "Timeout while waiting for ExternalView to converge for the table to delete: " + tableNameWithType);
+  }
+
+  @Override
   public void removeSegment(String tableNameWithType, String segmentName) {
     LOGGER.info("Removing segment: {} from table: {}", segmentName, tableNameWithType);
     _tableDataManagerMap.computeIfPresent(tableNameWithType, (k, v) -> {
       v.removeSegment(segmentName);
       LOGGER.info("Removed segment: {} from table: {}", segmentName, k);
-      if (v.getNumSegments() == 0) {
-        v.shutDown();
-        return null;
-      } else {
-        return v;
-      }
+      return v;
     });
   }
 
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index d00a558..ed6e9b6 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -26,6 +26,7 @@ import org.apache.helix.model.Message;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
+import org.apache.pinot.common.messages.TableDeletionMessage;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -58,6 +59,8 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
         return new SegmentRefreshMessageHandler(new SegmentRefreshMessage(message), _metrics, context);
       case SegmentReloadMessage.RELOAD_SEGMENT_MSG_SUB_TYPE:
         return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), _metrics, context);
+      case TableDeletionMessage.DELETE_TABLE_MSG_SUB_TYPE:
+        return new TableDeletionMessageHandler(new TableDeletionMessage(message), _metrics, context);
       default:
         LOGGER.warn("Unsupported user defined message sub type: {} for segment: {}", msgSubType,
             message.getPartitionName());
@@ -144,6 +147,28 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
     }
   }
 
+  private class TableDeletionMessageHandler extends DefaultMessageHandler {
+    TableDeletionMessageHandler(TableDeletionMessage tableDeletionMessage, ServerMetrics metrics,
+        NotificationContext context) {
+      super(tableDeletionMessage, metrics, context);
+    }
+
+    @Override
+    public HelixTaskResult handleMessage()
+        throws InterruptedException {
+      HelixTaskResult helixTaskResult = new HelixTaskResult();
+      _logger.info("Handling table deletion message");
+      try {
+        _instanceDataManager.deleteTable(_tableNameWithType);
+        helixTaskResult.setSuccess(true);
+      } catch (Exception e) {
+        _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETE_TABLE_FAILURES, 1);
+        Utils.rethrowException(e);
+      }
+      return helixTaskResult;
+    }
+  }
+
   private static class DefaultMessageHandler extends MessageHandler {
     final String _segmentName;
     final String _tableNameWithType;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org