You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/04/17 22:23:20 UTC

[incubator-pinot] branch master updated: Admin tool for listing segments with invalid intervals for offline tables (#4117)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c0120d7  Admin tool for listing segments with invalid intervals for offline tables (#4117)
c0120d7 is described below

commit c0120d72d79b176ffc955496dba3662bb018afe4
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Wed Apr 17 15:23:15 2019 -0700

    Admin tool for listing segments with invalid intervals for offline tables (#4117)
---
 .../pinot/common/config/TableNameBuilder.java      |   7 +
 .../controller/util/SegmentIntervalUtils.java      |  74 ++++++++++
 .../validation/OfflineSegmentIntervalChecker.java  |  33 +----
 .../pinot/tools/admin/PinotAdministrator.java      |   3 +-
 .../OfflineSegmentIntervalCheckerCommand.java      | 150 +++++++++++++++++++++
 5 files changed, 240 insertions(+), 27 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/TableNameBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/config/TableNameBuilder.java
index f8e6bea..5ecac01 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/TableNameBuilder.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/TableNameBuilder.java
@@ -122,4 +122,11 @@ public class TableNameBuilder {
   public static boolean isTableResource(@Nonnull String resourceName) {
     return OFFLINE.tableHasTypeSuffix(resourceName) || REALTIME.tableHasTypeSuffix(resourceName);
   }
+
+  /**
+   * Return whether the given resource name represents an offline table resource.
+   */
+  public static boolean isOfflineTableResource(@Nonnull String resourceName) {
+    return OFFLINE.tableHasTypeSuffix(resourceName);
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentIntervalUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentIntervalUtils.java
new file mode 100644
index 0000000..b018689
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/SegmentIntervalUtils.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
+import org.apache.pinot.common.utils.time.TimeUtils;
+import org.joda.time.Duration;
+import org.joda.time.Interval;
+
+
+/**
+ * Helper methods for segment interval validations
+ */
+public class SegmentIntervalUtils {
+
+  /**
+   * Checks if the given segment metadata time interval is valid
+   */
+  public static boolean isValidInterval(Interval timeInterval) {
+    return timeInterval != null && TimeUtils.timeValueInValidRange(timeInterval.getStartMillis()) && TimeUtils
+        .timeValueInValidRange(timeInterval.getEndMillis());
+  }
+
+  /**
+   * We only want to check missing segments if the table has at least 2 segments and a time column
+   */
+  public static boolean eligibleForMissingSegmentCheck(int numSegments,
+      SegmentsValidationAndRetentionConfig validationConfig) {
+    return numSegments >= 2 && StringUtils.isNotEmpty(validationConfig.getTimeColumnName());
+  }
+
+  /**
+   * We only want to check intervals if the table has a time column
+   */
+  public static boolean eligibleForSegmentIntervalCheck(SegmentsValidationAndRetentionConfig validationConfig) {
+    return StringUtils.isNotEmpty(validationConfig.getTimeColumnName());
+  }
+
+  /**
+   * Converts push frequency into duration. For invalid or less than 'hourly' push frequency, treats it as 'daily'.
+   */
+  public static Duration convertToDuration(String pushFrequency) {
+    if ("hourly".equalsIgnoreCase(pushFrequency)) {
+      return Duration.standardHours(1L);
+    }
+    if ("daily".equalsIgnoreCase(pushFrequency)) {
+      return Duration.standardDays(1L);
+    }
+    if ("weekly".equalsIgnoreCase(pushFrequency)) {
+      return Duration.standardDays(7L);
+    }
+    if ("monthly".equalsIgnoreCase(pushFrequency)) {
+      return Duration.standardDays(30L);
+    }
+    return Duration.standardDays(1L);
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 31b01fd..08e7c41 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -20,8 +20,8 @@ package org.apache.pinot.controller.validation;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
@@ -29,12 +29,13 @@ import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
-import org.apache.pinot.common.utils.time.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.util.SegmentIntervalUtils;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
+import org.joda.time.base.BaseInterval;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,13 +80,12 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
     int numMissingSegments = 0;
     int numSegments = offlineSegmentZKMetadataList.size();
     SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
-    if (numSegments >= 2 && StringUtils.isNotEmpty(validationConfig.getTimeColumnName())) {
+    if (SegmentIntervalUtils.eligibleForMissingSegmentCheck(numSegments, validationConfig)) {
       List<Interval> segmentIntervals = new ArrayList<>(numSegments);
       int numSegmentsWithInvalidIntervals = 0;
       for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentZKMetadataList) {
         Interval timeInterval = offlineSegmentZKMetadata.getTimeInterval();
-        if (timeInterval != null && TimeUtils.timeValueInValidRange(timeInterval.getStartMillis()) && TimeUtils
-            .timeValueInValidRange(timeInterval.getEndMillis())) {
+        if (SegmentIntervalUtils.isValidInterval(timeInterval)) {
           segmentIntervals.add(timeInterval);
         } else {
           numSegmentsWithInvalidIntervals ++;
@@ -95,7 +95,7 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
         LOGGER.warn("Table: {} has {} segments with invalid interval", offlineTableName,
             numSegmentsWithInvalidIntervals);
       }
-      Duration frequency = convertToDuration(validationConfig.getSegmentPushFrequency());
+      Duration frequency = SegmentIntervalUtils.convertToDuration(validationConfig.getSegmentPushFrequency());
       numMissingSegments = computeNumMissingSegments(segmentIntervals, frequency);
     }
     // Update the gauge that contains the number of missing segments
@@ -132,25 +132,6 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
   }
 
   /**
-   * Converts push frequency into duration. For invalid or less than 'hourly' push frequency, treats it as 'daily'.
-   */
-  private Duration convertToDuration(String pushFrequency) {
-    if ("hourly".equalsIgnoreCase(pushFrequency)) {
-      return Duration.standardHours(1L);
-    }
-    if ("daily".equalsIgnoreCase(pushFrequency)) {
-      return Duration.standardDays(1L);
-    }
-    if ("weekly".equalsIgnoreCase(pushFrequency)) {
-      return Duration.standardDays(7L);
-    }
-    if ("monthly".equalsIgnoreCase(pushFrequency)) {
-      return Duration.standardDays(30L);
-    }
-    return Duration.standardDays(1L);
-  }
-
-  /**
    * Computes the number of missing segments based on the given existing segment intervals and the expected frequency
    * of the intervals.
    * <p>We count the interval as missing if there are at least two intervals between the start of the previous interval
@@ -171,7 +152,7 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
     }
 
     // Sort the intervals by ascending starting time
-    segmentIntervals.sort((o1, o2) -> Long.compare(o1.getStartMillis(), o2.getStartMillis()));
+    segmentIntervals.sort(Comparator.comparingLong(BaseInterval::getStartMillis));
 
     int numMissingSegments = 0;
     long frequencyMs = frequency.getMillis();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
index f56b96b..f541821 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
@@ -32,6 +32,7 @@ import org.apache.pinot.tools.admin.command.CreateSegmentCommand;
 import org.apache.pinot.tools.admin.command.DeleteClusterCommand;
 import org.apache.pinot.tools.admin.command.GenerateDataCommand;
 import org.apache.pinot.tools.admin.command.MoveReplicaGroup;
+import org.apache.pinot.tools.admin.command.OfflineSegmentIntervalCheckerCommand;
 import org.apache.pinot.tools.admin.command.PostQueryCommand;
 import org.apache.pinot.tools.admin.command.RealtimeProvisioningHelperCommand;
 import org.apache.pinot.tools.admin.command.RebalanceTableCommand;
@@ -79,7 +80,7 @@ public class PinotAdministrator {
 
   // @formatter:off
   @Argument(handler = SubCommandHandler.class, metaVar = "<subCommand>")
-  @SubCommands({@SubCommand(name = "GenerateData", impl = GenerateDataCommand.class), @SubCommand(name = "CreateSegment", impl = CreateSegmentCommand.class), @SubCommand(name = "StartZookeeper", impl = StartZookeeperCommand.class), @SubCommand(name = "StartKafka", impl = StartKafkaCommand.class), @SubCommand(name = "StreamAvroIntoKafka", impl = StreamAvroIntoKafkaCommand.class), @SubCommand(name = "StartController", impl = StartControllerCommand.class), @SubCommand(name = "StartBroker",  [...]
+  @SubCommands({@SubCommand(name = "GenerateData", impl = GenerateDataCommand.class), @SubCommand(name = "CreateSegment", impl = CreateSegmentCommand.class), @SubCommand(name = "StartZookeeper", impl = StartZookeeperCommand.class), @SubCommand(name = "StartKafka", impl = StartKafkaCommand.class), @SubCommand(name = "StreamAvroIntoKafka", impl = StreamAvroIntoKafkaCommand.class), @SubCommand(name = "StartController", impl = StartControllerCommand.class), @SubCommand(name = "StartBroker",  [...]
   Command _subCommand;
   // @formatter:on
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
new file mode 100644
index 0000000..8a3096a
--- /dev/null
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.admin.command;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.controller.util.SegmentIntervalUtils;
+import org.apache.pinot.tools.Command;
+import org.joda.time.Interval;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Pinot admin command to list all offline segments with invalid intervals, group by table name
+ */
+public class OfflineSegmentIntervalCheckerCommand extends AbstractBaseAdminCommand implements Command {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OfflineSegmentIntervalCheckerCommand.class);
+
+  private ZKHelixAdmin _helixAdmin;
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+  @Option(name = "-zkAddress", required = true, metaVar = "<http>", usage = "Zookeeper server:port/cluster")
+  private String _zkAddress;
+
+  @Option(name = "-clusterName", required = true, metaVar = "<String>", usage = "Helix cluster name")
+  private String _clusterName;
+
+  @Option(name = "-tableNames", metaVar = "<string>", usage = "Comma separated list of tables to check for invalid segment intervals")
+  private String _tableNames;
+
+  @Option(name = "-help", help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
+  private boolean _help = false;
+
+  @Override
+  public boolean getHelp() {
+    return _help;
+  }
+
+  @Override
+  public String toString() {
+    return "OfflineSegmentIntervalChecker";
+  }
+
+  @Override
+  public String getName() {
+    return "OfflineSegmentIntervalChecker";
+  }
+
+  @Override
+  public void cleanup() {
+
+  }
+
+  @Override
+  public String description() {
+    return "Prints out offline segments with invalid time intervals";
+  }
+
+  @Override
+  public boolean execute()
+      throws Exception {
+    LOGGER.info("Executing command: " + toString());
+
+    _helixAdmin = new ZKHelixAdmin(_zkAddress);
+    _propertyStore = new ZkHelixPropertyStore<>(_zkAddress, new ZNRecordSerializer(),
+        PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName));
+
+    List<String> offlineTables = new ArrayList<>();
+    if (StringUtils.isBlank(_tableNames)) {
+      List<String> resourcesInCluster = _helixAdmin.getResourcesInCluster(_clusterName);
+      for (String tableName : resourcesInCluster) {
+        if (TableNameBuilder.isOfflineTableResource(tableName)) {
+          offlineTables.add(tableName);
+        }
+      }
+    } else {
+      for (String tableName : _tableNames.split(",")) {
+        if (ZKMetadataProvider.getOfflineTableConfig(_propertyStore, tableName) != null) {
+          offlineTables.add(tableName);
+        } else {
+          LOGGER.warn("Table config not found for table {}. Skipping", tableName);
+        }
+      }
+    }
+
+    LOGGER.info("Tables to check: {}", offlineTables);
+    for (String offlineTableName : offlineTables) {
+      LOGGER.info("Checking table {}", offlineTableName);
+      List<String> segmentsWithInvalidIntervals = checkOfflineTablesSegmentIntervals(offlineTableName);
+      if (CollectionUtils.isNotEmpty(segmentsWithInvalidIntervals)) {
+        LOGGER.info("Table: {} has {} segments with invalid interval: {}", offlineTableName,
+            segmentsWithInvalidIntervals.size(), segmentsWithInvalidIntervals);
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Checks segments of table for invalid intervals and prints them out
+   * @param offlineTableName
+   */
+  private List<String> checkOfflineTablesSegmentIntervals(String offlineTableName) {
+    TableConfig tableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, offlineTableName);
+    List<OfflineSegmentZKMetadata> offlineSegmentZKMetadataList =
+        ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, offlineTableName);
+
+    // collect segments with invalid time intervals
+    List<String> segmentsWithInvalidIntervals = new ArrayList<>();
+    if (SegmentIntervalUtils.eligibleForSegmentIntervalCheck(tableConfig.getValidationConfig())) {
+      for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentZKMetadataList) {
+        Interval timeInterval = offlineSegmentZKMetadata.getTimeInterval();
+        if (!SegmentIntervalUtils.isValidInterval(timeInterval)) {
+          segmentsWithInvalidIntervals.add(offlineSegmentZKMetadata.getSegmentName());
+        }
+      }
+    }
+    return segmentsWithInvalidIntervals;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org