You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/03/16 03:45:55 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6682: Fix bug #6671: RealtimeTableDataManager shuts down SegmentBuildTimeLeaseExtender for all tables in the host

Jackie-Jiang commented on a change in pull request #6682:
URL: https://github.com/apache/incubator-pinot/pull/6682#discussion_r594840078



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
##########
@@ -46,39 +46,58 @@
   // Retransmit lease request 10% before lease expires.
   private static final int REPEAT_REQUEST_PERIOD_SEC = (EXTRA_TIME_SECONDS * 9 / 10);
   private static Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);

Review comment:
       (nit) add final

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
##########
@@ -46,39 +46,58 @@
   // Retransmit lease request 10% before lease expires.
   private static final int REPEAT_REQUEST_PERIOD_SEC = (EXTRA_TIME_SECONDS * 9 / 10);
   private static Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);
-  private static final Map<String, SegmentBuildTimeLeaseExtender> INSTANCE_TO_LEASE_EXTENDER = new HashMap<>(1);
+  private static final Map<String, SegmentBuildTimeLeaseExtender> TABLE_TO_LEASE_EXTENDER = new HashMap<>();
+  private static ScheduledExecutorService _executor;
 
-  private ScheduledExecutorService _executor;
   private final Map<String, Future> _segmentToFutureMap = new ConcurrentHashMap<>();
   private final String _instanceId;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
 
-  public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String instanceId) {
-    return INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
+  public static void initExecutor() {
+    _executor = new ScheduledThreadPoolExecutor(1);
+  }
+
+  public static void shutdownExecutor() {
+    if (_executor != null) {
+      _executor.shutdownNow();
+      _executor = null;
+    }
+  }
+
+  public static boolean isExecutorShutdown() {

Review comment:
       annotate with VisibleForTestOnly

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
##########
@@ -45,6 +46,11 @@ public static void init(InstanceDataManagerConfig instanceDataManagerConfig) {
     if (maxParallelBuilds > 0) {
       _segmentBuildSemaphore = new Semaphore(maxParallelBuilds, true);
     }
+    SegmentBuildTimeLeaseExtender.initExecutor();

Review comment:
       Suggest moving the `initExecutor()` and `shutdownExecutor()` to `HelixInstanceDataManager`.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
##########
@@ -46,39 +46,58 @@
   // Retransmit lease request 10% before lease expires.
   private static final int REPEAT_REQUEST_PERIOD_SEC = (EXTRA_TIME_SECONDS * 9 / 10);
   private static Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);
-  private static final Map<String, SegmentBuildTimeLeaseExtender> INSTANCE_TO_LEASE_EXTENDER = new HashMap<>(1);
+  private static final Map<String, SegmentBuildTimeLeaseExtender> TABLE_TO_LEASE_EXTENDER = new HashMap<>();
+  private static ScheduledExecutorService _executor;
 
-  private ScheduledExecutorService _executor;
   private final Map<String, Future> _segmentToFutureMap = new ConcurrentHashMap<>();
   private final String _instanceId;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
 
-  public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String instanceId) {
-    return INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
+  public static void initExecutor() {
+    _executor = new ScheduledThreadPoolExecutor(1);
+  }
+
+  public static void shutdownExecutor() {
+    if (_executor != null) {
+      _executor.shutdownNow();
+      _executor = null;
+    }
+  }
+
+  public static boolean isExecutorShutdown() {
+    return _executor == null;
   }
 
-  public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId,
+  public static synchronized SegmentBuildTimeLeaseExtender getLeaseExtender(final String tableNameWithType) {
+    return TABLE_TO_LEASE_EXTENDER.get(tableNameWithType);
+  }
+
+  public static synchronized SegmentBuildTimeLeaseExtender getOrCreate(final String instanceId,
       ServerMetrics serverMetrics, String tableNameWithType) {
-    SegmentBuildTimeLeaseExtender leaseExtender = INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
+    SegmentBuildTimeLeaseExtender leaseExtender = TABLE_TO_LEASE_EXTENDER.get(tableNameWithType);
     if (leaseExtender != null) {
-      LOGGER.warn("Instance already exists");
+      LOGGER.warn("Lease extender for Table: {} already exists", tableNameWithType);
     } else {
       leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType);
-      INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender);
+      TABLE_TO_LEASE_EXTENDER.put(tableNameWithType, leaseExtender);
     }
     return leaseExtender;
   }
 
   private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics, String tableNameWithType) {
     _instanceId = instanceId;
     _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics, tableNameWithType);
-    _executor = new ScheduledThreadPoolExecutor(1);
   }
 
   public void shutDown() {
-    if (_executor != null) {
-      _executor.shutdownNow();
-      _executor = null;
+    for (Map.Entry<String, Future> entry : _segmentToFutureMap.entrySet()) {
+      Future future = entry.getValue();
+      if (future != null) {

Review comment:
       I don't think future can be null here

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
##########
@@ -46,39 +46,58 @@
   // Retransmit lease request 10% before lease expires.
   private static final int REPEAT_REQUEST_PERIOD_SEC = (EXTRA_TIME_SECONDS * 9 / 10);
   private static Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class);
-  private static final Map<String, SegmentBuildTimeLeaseExtender> INSTANCE_TO_LEASE_EXTENDER = new HashMap<>(1);
+  private static final Map<String, SegmentBuildTimeLeaseExtender> TABLE_TO_LEASE_EXTENDER = new HashMap<>();
+  private static ScheduledExecutorService _executor;
 
-  private ScheduledExecutorService _executor;
   private final Map<String, Future> _segmentToFutureMap = new ConcurrentHashMap<>();
   private final String _instanceId;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
 
-  public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String instanceId) {
-    return INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
+  public static void initExecutor() {
+    _executor = new ScheduledThreadPoolExecutor(1);
+  }
+
+  public static void shutdownExecutor() {
+    if (_executor != null) {
+      _executor.shutdownNow();
+      _executor = null;
+    }
+  }
+
+  public static boolean isExecutorShutdown() {
+    return _executor == null;
   }
 
-  public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId,
+  public static synchronized SegmentBuildTimeLeaseExtender getLeaseExtender(final String tableNameWithType) {
+    return TABLE_TO_LEASE_EXTENDER.get(tableNameWithType);
+  }
+
+  public static synchronized SegmentBuildTimeLeaseExtender getOrCreate(final String instanceId,

Review comment:
       You may use `ConcurrentHashMap.compute()` which is atomic. Not very important though because it is not on query path




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org