You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/03/16 07:39:23 UTC

[incubator-pinot] branch master updated: Fix bug #6671: RealtimeTableDataManager shuts down SegmentBuildTimeLeaseExtender for all tables in the host (#6682)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ec38f7  Fix bug #6671: RealtimeTableDataManager shuts down SegmentBuildTimeLeaseExtender for all tables in the host (#6682)
4ec38f7 is described below

commit 4ec38f79315d4017e5e2ac45e8989fa7fc4584fa
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Tue Mar 16 00:38:58 2021 -0700

    Fix bug #6671: RealtimeTableDataManager shuts down SegmentBuildTimeLeaseExtender for all tables in the host (#6682)
    
    Co-authored-by: Jiapeng Tao <ji...@jiatao-mn1.linkedin.biz>
---
 .../realtime/LLRealtimeSegmentDataManager.java     |  2 +-
 .../manager/realtime/RealtimeTableDataManager.java |  2 +-
 .../realtime/SegmentBuildTimeLeaseExtender.java    | 59 +++++++++++++++-------
 .../realtime/LLRealtimeSegmentDataManagerTest.java | 37 ++++++++++++--
 .../starter/helix/HelixInstanceDataManager.java    |  8 ++-
 5 files changed, 83 insertions(+), 25 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 82a86d2..89a7210 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1115,7 +1115,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _serverMetrics = serverMetrics;
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getServerInstance();
-    _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
+    _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
     _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
 
     String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 73d30e1..7417c3a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -122,7 +122,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
   @Override
   protected void doInit() {
-    _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics, _tableNameWithType);
+    _leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, _serverMetrics, _tableNameWithType);
 
     File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
     try {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
index 69d7e80..725dc95 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.core.data.manager.realtime;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
@@ -45,42 +45,63 @@ public class SegmentBuildTimeLeaseExtender {
   private static final int EXTRA_TIME_SECONDS = 120;
   // Retransmit lease request 10% before lease expires.
   private static final int REPEAT_REQUEST_PERIOD_SEC = (EXTRA_TIME_SECONDS * 9 / 10);
-  private static Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);
-  private static final Map<String, SegmentBuildTimeLeaseExtender> INSTANCE_TO_LEASE_EXTENDER = new HashMap<>(1);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);
+  private static final Map<String, SegmentBuildTimeLeaseExtender> TABLE_TO_LEASE_EXTENDER = new ConcurrentHashMap<>();
+  private static ScheduledExecutorService _executor;
 
-  private ScheduledExecutorService _executor;
   private final Map<String, Future> _segmentToFutureMap = new ConcurrentHashMap<>();
   private final String _instanceId;
+  private final String _tableNameWithType;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
 
-  public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String instanceId) {
-    return INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
+  public static void initExecutor() {
+    _executor = new ScheduledThreadPoolExecutor(1);
   }
 
-  public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId,
-      ServerMetrics serverMetrics, String tableNameWithType) {
-    SegmentBuildTimeLeaseExtender leaseExtender = INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
-    if (leaseExtender != null) {
-      LOGGER.warn("Instance already exists");
-    } else {
-      leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType);
-      INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender);
+  public static void shutdownExecutor() {
+    if (_executor != null) {
+      _executor.shutdownNow();
+      _executor = null;
     }
-    return leaseExtender;
+  }
+
+  @VisibleForTesting
+  public static boolean isExecutorShutdown() {
+    return _executor == null;
+  }
+
+  public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String tableNameWithType) {
+    return TABLE_TO_LEASE_EXTENDER.get(tableNameWithType);
+  }
+
+  public static SegmentBuildTimeLeaseExtender getOrCreate(final String instanceId,
+      ServerMetrics serverMetrics, String tableNameWithType) {
+    return TABLE_TO_LEASE_EXTENDER.compute(tableNameWithType, (k, v) -> {
+      if (v == null) {
+        return new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType);
+      } else {
+        LOGGER.warn("Lease extender for Table: {} already exists", tableNameWithType);
+        return v;
+      }
+    });
   }
 
   private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics, String tableNameWithType) {
     _instanceId = instanceId;
+    _tableNameWithType = tableNameWithType;
     _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics, tableNameWithType);
-    _executor = new ScheduledThreadPoolExecutor(1);
   }
 
   public void shutDown() {
-    if (_executor != null) {
-      _executor.shutdownNow();
-      _executor = null;
+    for (Map.Entry<String, Future> entry : _segmentToFutureMap.entrySet()) {
+      Future future = entry.getValue();
+      boolean cancelled = future.cancel(true);
+      if (!cancelled) {
+        LOGGER.warn("Task could not be cancelled for {}" + entry.getKey());
+      }
     }
     _segmentToFutureMap.clear();
+    TABLE_TO_LEASE_EXTENDER.remove(_tableNameWithType);
   }
 
   /**
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 6da00df..39cf466 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -36,7 +38,11 @@ import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
+import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
+import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
@@ -58,6 +64,8 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -128,9 +136,9 @@ public class LLRealtimeSegmentDataManagerTest {
     return JsonUtils.stringToObject(_tableConfigJson, TableConfig.class);
   }
 
-  private RealtimeTableDataManager createTableDataManager() {
+  private RealtimeTableDataManager createTableDataManager(TableConfig tableConfig) {
     final String instanceId = "server-1";
-    SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), _tableName);
+    SegmentBuildTimeLeaseExtender.getOrCreate(instanceId, new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), tableConfig.getTableName());
     RealtimeTableDataManager tableDataManager = mock(RealtimeTableDataManager.class);
     when(tableDataManager.getServerInstance()).thenReturn(instanceId);
     RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class);
@@ -154,7 +162,7 @@ public class LLRealtimeSegmentDataManagerTest {
     LLCRealtimeSegmentZKMetadata segmentZKMetadata = createZkMetadata();
     TableConfig tableConfig = createTableConfig();
     InstanceZKMetadata instanceZKMetadata = new InstanceZKMetadata();
-    RealtimeTableDataManager tableDataManager = createTableDataManager();
+    RealtimeTableDataManager tableDataManager = createTableDataManager(tableConfig);
     String resourceDir = _segmentDir;
     LLCSegmentName llcSegmentName = new LLCSegmentName(_segmentNameStr);
     _partitionGroupIdToSemaphoreMap.putIfAbsent(_partitionGroupId, new Semaphore(1));
@@ -169,11 +177,13 @@ public class LLRealtimeSegmentDataManagerTest {
   @BeforeClass
   public void setUp() {
     _segmentDirFile.deleteOnExit();
+    SegmentBuildTimeLeaseExtender.initExecutor();
   }
 
   @AfterClass
   public void tearDown() {
     FileUtils.deleteQuietly(_segmentDirFile);
+    SegmentBuildTimeLeaseExtender.shutdownExecutor();
   }
 
   @Test
@@ -765,6 +775,27 @@ public class LLRealtimeSegmentDataManagerTest {
     Assert.assertEquals(secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore().availablePermits(), 1);
   }
 
+  @Test
+  public void testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor()
+      throws Exception {
+    TableConfig tableConfig = createTableConfig();
+    tableConfig.setUpsertConfig(null);
+    ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+    when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(TableConfigUtils.toZNRecord(tableConfig));
+
+    TableDataManagerConfig tableDataManagerConfig = mock(TableDataManagerConfig.class);
+    when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("REALTIME");
+    when(tableDataManagerConfig.getTableName()).thenReturn(tableConfig.getTableName());
+    when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+
+    TableDataManager tableDataManager = TableDataManagerProvider
+        .getTableDataManager(tableDataManagerConfig, "testInstance", propertyStore, mock(ServerMetrics.class),
+            mock(HelixManager.class));
+    tableDataManager.start();
+    tableDataManager.shutDown();
+    Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
+  }
+
   public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager {
 
     public Field _state;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index deb3a6a..30ba125 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -41,6 +41,7 @@ import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
@@ -91,9 +92,12 @@ public class HelixInstanceDataManager implements InstanceDataManager {
       Preconditions.checkState(instanceSegmentTarDir.mkdirs());
     }
 
+    // Initialize segment build time lease extender executor
+    SegmentBuildTimeLeaseExtender.initExecutor();
+    LOGGER.info("Initialized segment build time lease extender executor");
+
     // Initialize the table data manager provider
     TableDataManagerProvider.init(_instanceDataManagerConfig);
-
     LOGGER.info("Initialized Helix instance data manager");
   }
 
@@ -108,6 +112,8 @@ public class HelixInstanceDataManager implements InstanceDataManager {
     for (TableDataManager tableDataManager : _tableDataManagerMap.values()) {
       tableDataManager.shutDown();
     }
+    SegmentBuildTimeLeaseExtender.shutdownExecutor();
+    LOGGER.info("Segment build time lease extender executor shut down");
     LOGGER.info("Helix instance data manager shut down");
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org