You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/01/28 19:37:40 UTC
[incubator-pinot] branch metric_after_periodic_task_run updated:
Some more refactoring to keep counting of numTablesProcessed in base class
of periodic tasks
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch metric_after_periodic_task_run
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/metric_after_periodic_task_run by this push:
new a75222a Some more refactoring to keep counting of numTablesProcessed in base class of periodic tasks
a75222a is described below
commit a75222a944928615c653e67a4fa2e2c9034cf993
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jan 28 11:37:27 2019 -0800
Some more refactoring to keep counting of numTablesProcessed in base class of periodic tasks
---
.../controller/helix/SegmentStatusChecker.java | 17 +++--
.../helix/core/minion/PinotTaskManager.java | 8 ++-
.../core/periodictask/ControllerPeriodicTask.java | 27 +++++---
.../core/relocation/RealtimeSegmentRelocator.java | 18 +++---
.../helix/core/retention/RetentionManager.java | 73 ++++++++++------------
.../BrokerResourceValidationManager.java | 30 +++++----
.../validation/OfflineSegmentIntervalChecker.java | 28 ++++-----
.../RealtimeSegmentValidationManager.java | 42 ++++++-------
.../periodictask/ControllerPeriodicTaskTest.java | 7 ++-
9 files changed, 121 insertions(+), 129 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d971e1e..84bf706 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -77,7 +77,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
_realTimeTableCount = 0;
_offlineTableCount = 0;
_disabledTableCount = 0;
@@ -94,20 +93,18 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
@Override
protected void processTable(String tableNameWithType) {
- try {
- updateSegmentMetrics(tableNameWithType);
- _numTablesProcessed ++;
- } catch (Exception e) {
- LOGGER.error("Caught exception while updating segment status for table {}", tableNameWithType, e);
+ updateSegmentMetrics(tableNameWithType);
+ }
- // Remove the metric for this table
- resetTableMetrics(tableNameWithType);
- }
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while updating segment status for table {}", tableNameWithType, e);
+ // Remove the metric for this table
+ resetTableMetrics(tableNameWithType);
}
@Override
protected void postprocess() {
- super.postprocess();
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, _realTimeTableCount);
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, _offlineTableCount);
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, _disabledTableCount);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index dea29c0..ed3bad6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -102,7 +102,6 @@ public class PinotTaskManager extends ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
_metricsRegistry.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
_taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
@@ -130,7 +129,11 @@ public class PinotTaskManager extends ControllerPeriodicTask {
}
}
}
- _numTablesProcessed ++;
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Exception in PinotTaskManager for table {}", tableNameWithType, e);
}
@Override
@@ -150,7 +153,6 @@ public class PinotTaskManager extends ControllerPeriodicTask {
_metricsRegistry.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
}
- super.postprocess();
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index dc06af0..e7ddbb3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -43,8 +43,6 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
private volatile boolean _stopPeriodicTask;
private volatile boolean _periodicTaskInProgress;
- protected int _numTablesProcessed;
-
public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) {
super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
@@ -123,16 +121,28 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
*/
protected void process(List<String> tableNamesWithType) {
if (!shouldStopPeriodicTask()) {
+
+ int numTablesProcessed = 0;
preprocess();
+
for (String tableNameWithType : tableNamesWithType) {
if (shouldStopPeriodicTask()) {
LOGGER.info("Skip processing table {} and all the remaining tables for task {}.", tableNameWithType,
getTaskName());
break;
}
- processTable(tableNameWithType);
+ try {
+ processTable(tableNameWithType);
+ numTablesProcessed++;
+ } catch (Exception e) {
+ exceptionHandler(tableNameWithType, e);
+ }
}
+
postprocess();
+ _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, getTaskName(),
+ numTablesProcessed);
+
} else {
LOGGER.info("Skip processing all tables for task {}", getTaskName());
}
@@ -141,9 +151,7 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
/**
* This method runs before processing all tables
*/
- protected void preprocess() {
- _numTablesProcessed = 0;
- }
+ protected abstract void preprocess();
/**
* Execute the controller periodic task for the given table
@@ -154,10 +162,9 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
/**
* This method runs after processing all tables
*/
- protected void postprocess() {
- _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, getTaskName(),
- _numTablesProcessed);
- }
+ protected abstract void postprocess();
+
+ protected abstract void exceptionHandler(String tableNameWithType, Exception e);
@VisibleForTesting
protected boolean shouldStopPeriodicTask() {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index deb03a9..5cbcb48 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -69,25 +69,23 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
}
@Override
protected void processTable(String tableNameWithType) {
- try {
- CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == CommonConstants.Helix.TableType.REALTIME) {
- runRelocation(tableNameWithType);
- _numTablesProcessed ++;
- }
- } catch (Exception e) {
- LOGGER.error("Exception in relocating realtime segments of table {}", tableNameWithType, e);
+ CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+ runRelocation(tableNameWithType);
}
}
@Override
protected void postprocess() {
- super.postprocess();
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Exception in relocating realtime segments of table {}", tableNameWithType, e);
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 4e493cc..53f9b35 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -71,60 +71,55 @@ public class RetentionManager extends ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
}
@Override
protected void processTable(String tableNameWithType) {
- try {
- LOGGER.info("Start managing retention for table: {}", tableNameWithType);
- manageRetentionForTable(tableNameWithType);
- _numTablesProcessed ++;
- } catch (Exception e) {
- LOGGER.error("Caught exception while managing retention for table: {}", tableNameWithType, e);
- }
+ LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+ manageRetentionForTable(tableNameWithType);
}
@Override
protected void postprocess() {
LOGGER.info("Removing aged (more than {} days) deleted segments for all tables", _deletedSegmentsRetentionInDays);
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
- super.postprocess();
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while managing retention for table: {}", tableNameWithType, e);
}
private void manageRetentionForTable(String tableNameWithType) {
- try {
- // Build retention strategy from table config
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.error("Failed to get table config for table: {}", tableNameWithType);
- return;
- }
- SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
- String segmentPushType = validationConfig.getSegmentPushType();
- if (!"APPEND".equalsIgnoreCase(segmentPushType)) {
- LOGGER.info("Segment push type is not APPEND for table: {}, skip", tableNameWithType);
- return;
- }
- String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
- String retentionTimeValue = validationConfig.getRetentionTimeValue();
- RetentionStrategy retentionStrategy;
- try {
- retentionStrategy = new TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
- Long.parseLong(retentionTimeValue));
- } catch (Exception e) {
- LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", retentionTimeUnit, retentionTimeValue);
- return;
- }
- // Scan all segment ZK metadata and purge segments if necessary
- if (TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableNameWithType)) {
- manageRetentionForOfflineTable(tableNameWithType, retentionStrategy);
- } else {
- manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
- }
+ // Build retention strategy from table config
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.error("Failed to get table config for table: {}", tableNameWithType);
+ return;
+ }
+ SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+ String segmentPushType = validationConfig.getSegmentPushType();
+ if (!"APPEND".equalsIgnoreCase(segmentPushType)) {
+ LOGGER.info("Segment push type is not APPEND for table: {}, skip", tableNameWithType);
+ return;
+ }
+ String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+ String retentionTimeValue = validationConfig.getRetentionTimeValue();
+ RetentionStrategy retentionStrategy;
+ try {
+ retentionStrategy = new TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
+ Long.parseLong(retentionTimeValue));
} catch (Exception e) {
- LOGGER.error("Caught exception while managing retention for table: {}", tableNameWithType, e);
+ LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", retentionTimeUnit, retentionTimeValue);
+ return;
+ }
+
+ // Scan all segment ZK metadata and purge segments if necessary
+ if (TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableNameWithType)) {
+ manageRetentionForOfflineTable(tableNameWithType, retentionStrategy);
+ } else {
+ manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index ebcc5b6..c71a4ab 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -46,33 +46,31 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
_instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
}
@Override
protected void processTable(String tableNameWithType) {
- try {
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping broker resource validation", tableNameWithType);
- return;
- }
-
- // Rebuild broker resource
- Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
- tableConfig.getTenantConfig().getBroker());
- _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
- _numTablesProcessed ++;
- } catch (Exception e) {
- LOGGER.warn("Caught exception while validating broker resource for table: {}", tableNameWithType, e);
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping broker resource validation", tableNameWithType);
+ return;
}
+
+ // Rebuild broker resource
+ Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+ tableConfig.getTenantConfig().getBroker());
+ _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
}
@Override
protected void postprocess() {
- super.postprocess();
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while validating broker resource for table: {}", tableNameWithType, e);
}
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 8610347..ca9563b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -57,27 +57,19 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
}
@Override
protected void processTable(String tableNameWithType) {
- try {
+ CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
- CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
-
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
- return;
- }
-
- validateOfflineSegmentPush(tableConfig);
- _numTablesProcessed ++;
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+ return;
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while checking offline segment intervals for table: {}", tableNameWithType, e);
+ validateOfflineSegmentPush(tableConfig);
}
}
@@ -215,7 +207,11 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
@Override
protected void postprocess() {
- super.postprocess();
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.warn("Caught exception while checking offline segment intervals for table: {}", tableNameWithType, e);
}
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 70fdbe9..a43d63b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -67,7 +67,6 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
@Override
protected void preprocess() {
- super.preprocess();
// Update realtime document counts only if certain time has passed after previous run
_updateRealtimeDocumentCount = false;
long currentTimeMs = System.currentTimeMillis();
@@ -81,29 +80,24 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
@Override
protected void processTable(String tableNameWithType) {
- try {
- CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == CommonConstants.Helix.TableType.REALTIME) {
-
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
- return;
- }
+ CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.REALTIME) {
- if (_updateRealtimeDocumentCount) {
- updateRealtimeDocumentCount(tableConfig);
- }
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+ return;
+ }
- Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
- StreamConfig streamConfig = new StreamConfig(streamConfigMap);
- if (streamConfig.hasLowLevelConsumerType()) {
- _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
- }
- _numTablesProcessed ++;
+ if (_updateRealtimeDocumentCount) {
+ updateRealtimeDocumentCount(tableConfig);
+ }
+
+ Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+ if (streamConfig.hasLowLevelConsumerType()) {
+ _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while validating realtime table: {}", tableNameWithType, e);
}
}
@@ -155,7 +149,11 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
@Override
protected void postprocess() {
- super.postprocess();
+ }
+
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while validating realtime table: {}", tableNameWithType, e);
}
@Override
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 5c59221..e20cdc8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -73,7 +73,6 @@ public class ControllerPeriodicTaskTest {
@Override
public void processTable(String tableNameWithType) {
_tablesProcessed.getAndIncrement();
- _numTablesProcessed ++;
}
};
@@ -164,7 +163,6 @@ public class ControllerPeriodicTaskTest {
@Override
protected void preprocess() {
- super.preprocess();
}
@Override
@@ -174,9 +172,12 @@ public class ControllerPeriodicTaskTest {
@Override
public void postprocess() {
- super.postprocess();
}
+ @Override
+ public void exceptionHandler(String tableNameWithType, Exception e) {
+
+ }
@Override
public void stopTask() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org