You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2019/06/10 20:01:07 UTC

[incubator-pinot] branch master updated: Cleaning up getTableName() for segment zk metadata (#4288)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new caac81d  Cleaning up getTableName() for segment zk metadata (#4288)
caac81d is described below

commit caac81dfca7bb2de927e2c6306b554ac5cf5d6e6
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Mon Jun 10 13:00:59 2019 -0700

    Cleaning up getTableName() for segment zk metadata (#4288)
    
    * Cleaning up getTableName() for segment zk metadata
    Removing unnecessary calls to segmentZKMetadata.getTableName()
    
    * Updated variable names for HL,LLRealtimeSegmentDataManager
---
 .../helix/core/PinotHelixResourceManager.java      | 47 ++++++++++------------
 .../helix/core/retention/RetentionManager.java     |  4 +-
 .../core/retention/strategy/RetentionStrategy.java |  3 +-
 .../retention/strategy/TimeRetentionStrategy.java  |  6 +--
 .../strategy/TimeRetentionStrategyTest.java        | 15 +++----
 .../realtime/HLRealtimeSegmentDataManager.java     | 19 +++++----
 .../realtime/LLRealtimeSegmentDataManager.java     | 13 +++---
 .../starter/helix/SegmentFetcherAndLoader.java     | 13 +++---
 8 files changed, 57 insertions(+), 63 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 877458d..cb57eb4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1632,7 +1632,7 @@ public class PinotHelixResourceManager {
           "Failed to update ZK metadata for segment: " + segmentName + " of table: " + offlineTableName);
     }
     LOGGER.info("Updated segment: {} of table: {} to property store", segmentName, offlineTableName);
-    final String rawTableName = offlineSegmentZKMetadata.getTableName();
+    final String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName);
     TableConfig tableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, rawTableName);
     Preconditions.checkNotNull(tableConfig);
 
@@ -1640,14 +1640,14 @@ public class PinotHelixResourceManager {
       // Send a message to the servers to update the segment.
       // We return success even if we are not able to send messages (which can happen if no servers are alive).
       // For segment validation errors we would have returned earlier.
-      sendSegmentRefreshMessage(offlineSegmentZKMetadata);
+      sendSegmentRefreshMessage(offlineTableName, offlineSegmentZKMetadata);
       // Send a message to the brokers to update the table's time boundary info if the segment push type is APPEND.
       if (shouldSendTimeboundaryRefreshMsg(rawTableName, tableConfig)) {
-        sendTimeboundaryRefreshMessageToBrokers(offlineSegmentZKMetadata);
+        sendTimeboundaryRefreshMessageToBrokers(offlineTableName, offlineSegmentZKMetadata);
       }
     } else {
       // Go through the ONLINE->OFFLINE->ONLINE state transition to update the segment
-      if (!updateExistedSegment(offlineSegmentZKMetadata)) {
+      if (!updateExistedSegment(offlineTableName, offlineSegmentZKMetadata)) {
         LOGGER.error("Failed to refresh segment: {} of table: {} by the ONLINE->OFFLINE->ONLINE state transition",
             segmentName, offlineTableName);
       }
@@ -1655,13 +1655,13 @@ public class PinotHelixResourceManager {
   }
 
   // Send a message to the pinot brokers to notify them to update its Timeboundary Info.
-  private void sendTimeboundaryRefreshMessageToBrokers(OfflineSegmentZKMetadata segmentZKMetadata) {
+  private void sendTimeboundaryRefreshMessageToBrokers(String tableNameWithType,
+      OfflineSegmentZKMetadata segmentZKMetadata) {
     final String segmentName = segmentZKMetadata.getSegmentName();
-    final String rawTableName = segmentZKMetadata.getTableName();
-    final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+    final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
     final int timeoutMs = -1; // Infinite timeout on the recipient.
 
-    TimeboundaryRefreshMessage refreshMessage = new TimeboundaryRefreshMessage(offlineTableName, segmentName);
+    TimeboundaryRefreshMessage refreshMessage = new TimeboundaryRefreshMessage(tableNameWithType, segmentName);
 
     Criteria recipientCriteria = new Criteria();
     // Currently Helix does not support send message to a Spectator. So we walk around the problem by sending the
@@ -1672,7 +1672,7 @@ public class PinotHelixResourceManager {
     recipientCriteria.setResource(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     recipientCriteria.setDataSource(Criteria.DataSource.EXTERNALVIEW);
     // The brokerResource field in the EXTERNALVIEW stores the offline table name in the Partition subfield.
-    recipientCriteria.setPartition(offlineTableName);
+    recipientCriteria.setPartition(tableNameWithType);
 
     ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
     LOGGER.info("Sending timeboundary refresh message for segment {} of table {}:{} to recipients {}", segmentName,
@@ -1686,7 +1686,7 @@ public class PinotHelixResourceManager {
       // May be the case when none of the brokers are up yet. That is OK, because when they come up they will get
       // the latest time boundary info.
       LOGGER.warn("Unable to send timeboundary refresh message for {} of table {}, nMsgs={}", segmentName,
-          offlineTableName, nMsgsSent);
+          tableNameWithType, nMsgsSent);
     }
   }
 
@@ -1770,12 +1770,13 @@ public class PinotHelixResourceManager {
    * The message is sent as session-specific, so if a new zk session is created (e.g. server restarts)
    * it will not get the message.
    *
+   * @param tableNameWithType Table name with type
    * @param segmentZKMetadata is the metadata of the newly arrived segment.
    */
   // NOTE: method should be thread-safe
-  private void sendSegmentRefreshMessage(OfflineSegmentZKMetadata segmentZKMetadata) {
+  private void sendSegmentRefreshMessage(String tableNameWithType, OfflineSegmentZKMetadata segmentZKMetadata) {
     final String segmentName = segmentZKMetadata.getSegmentName();
-    final String rawTableName = segmentZKMetadata.getTableName();
+    final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
     final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
     final int timeoutMs = -1; // Infinite timeout on the recipient.
 
@@ -1842,27 +1843,21 @@ public class PinotHelixResourceManager {
     HelixHelper.addSegmentToIdealState(_helixZkManager, offlineTableName, segmentName, assignedInstances);
   }
 
-  private boolean updateExistedSegment(SegmentZKMetadata segmentZKMetadata) {
-    final String tableName;
-    if (segmentZKMetadata instanceof RealtimeSegmentZKMetadata) {
-      tableName = TableNameBuilder.REALTIME.tableNameWithType(segmentZKMetadata.getTableName());
-    } else {
-      tableName = TableNameBuilder.OFFLINE.tableNameWithType(segmentZKMetadata.getTableName());
-    }
+  private boolean updateExistedSegment(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) {
     final String segmentName = segmentZKMetadata.getSegmentName();
 
     HelixDataAccessor helixDataAccessor = _helixZkManager.getHelixDataAccessor();
-    PropertyKey idealStatePropertyKey = _keyBuilder.idealStates(tableName);
+    PropertyKey idealStatePropertyKey = _keyBuilder.idealStates(tableNameWithType);
 
     // Set all partitions to offline to unload them from the servers
     boolean updateSuccessful;
     do {
-      final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
+      final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
       final Set<String> instanceSet = idealState.getInstanceSet(segmentName);
       if (instanceSet == null || instanceSet.size() == 0) {
         // We are trying to refresh a segment, but there are no instances currently assigned for fielding this segment.
         // When those instances do come up, the segment will be uploaded correctly, so return success but log a warning.
-        LOGGER.warn("No instances as yet for segment {}, table {}", segmentName, tableName);
+        LOGGER.warn("No instances as yet for segment {}, table {}", segmentName, tableNameWithType);
         return true;
       }
       for (final String instance : instanceSet) {
@@ -1872,7 +1867,7 @@ public class PinotHelixResourceManager {
     } while (!updateSuccessful);
 
     // Check that the ideal state has been written to ZK
-    IdealState updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
+    IdealState updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
     Map<String, String> instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName);
     for (String state : instanceStateMap.values()) {
       if (!"OFFLINE".equals(state)) {
@@ -1883,7 +1878,7 @@ public class PinotHelixResourceManager {
 
     // Wait until the partitions are offline in the external view
     LOGGER.info("Wait until segment - " + segmentName + " to be OFFLINE in ExternalView");
-    if (!ifExternalViewChangeReflectedForState(tableName, segmentName, "OFFLINE",
+    if (!ifExternalViewChangeReflectedForState(tableNameWithType, segmentName, "OFFLINE",
         _externalViewOnlineToOfflineTimeoutMillis, false)) {
       LOGGER
           .error("External view for segment {} did not reflect the ideal state of OFFLINE within the {} ms time limit",
@@ -1893,7 +1888,7 @@ public class PinotHelixResourceManager {
 
     // Set all partitions to online so that they load the new segment data
     do {
-      final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
+      final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
       final Set<String> instanceSet = idealState.getInstanceSet(segmentName);
       LOGGER.info("Found {} instances for segment '{}', in ideal state", instanceSet.size(), segmentName);
       for (final String instance : instanceSet) {
@@ -1904,7 +1899,7 @@ public class PinotHelixResourceManager {
     } while (!updateSuccessful);
 
     // Check that the ideal state has been written to ZK
-    updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
+    updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
     instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName);
     LOGGER
         .info("Found {} instances for segment '{}', after updating ideal state", instanceStateMap.size(), segmentName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index b5595e7..7d5182d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -113,7 +113,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
     List<String> segmentsToDelete = new ArrayList<>();
     for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : _pinotHelixResourceManager
         .getOfflineSegmentMetadata(offlineTableName)) {
-      if (retentionStrategy.isPurgeable(offlineSegmentZKMetadata)) {
+      if (retentionStrategy.isPurgeable(offlineTableName, offlineSegmentZKMetadata)) {
         segmentsToDelete.add(offlineSegmentZKMetadata.getSegmentName());
       }
     }
@@ -141,7 +141,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
         }
       } else {
         // Sealed segment
-        if (retentionStrategy.isPurgeable(realtimeSegmentZKMetadata)) {
+        if (retentionStrategy.isPurgeable(realtimeTableName, realtimeSegmentZKMetadata)) {
           segmentsToDelete.add(segmentName);
         }
       }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
index 5917c9a..e8f6336 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
@@ -29,8 +29,9 @@ public interface RetentionStrategy {
   /**
    * Returns whether the segment should be purged
    *
+   * @param tableNameWithType Table name with type
    * @param segmentZKMetadata Segment ZK metadata
    * @return Whether the segment should be purged
    */
-  boolean isPurgeable(SegmentZKMetadata segmentZKMetadata);
+  boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata);
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
index 84373c5..167e030 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
@@ -38,11 +38,11 @@ public class TimeRetentionStrategy implements RetentionStrategy {
   }
 
   @Override
-  public boolean isPurgeable(SegmentZKMetadata segmentZKMetadata) {
+  public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) {
     TimeUnit timeUnit = segmentZKMetadata.getTimeUnit();
     if (timeUnit == null) {
       LOGGER.warn("Time unit is not set for {} segment: {} of table: {}", segmentZKMetadata.getSegmentType(),
-          segmentZKMetadata.getSegmentName(), segmentZKMetadata.getTableName());
+          segmentZKMetadata.getSegmentName(), tableNameWithType);
       return false;
     }
     long endTime = segmentZKMetadata.getEndTime();
@@ -51,7 +51,7 @@ public class TimeRetentionStrategy implements RetentionStrategy {
     // Check that the end time is between 1971 and 2071
     if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
       LOGGER.warn("{} segment: {} of table: {} has invalid end time: {} {}", segmentZKMetadata.getSegmentType(),
-          segmentZKMetadata.getSegmentName(), segmentZKMetadata.getTableName(), endTime, timeUnit);
+          segmentZKMetadata.getSegmentName(), tableNameWithType, endTime, timeUnit);
       return false;
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
index b2533aa..009b397 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
@@ -34,34 +34,35 @@ public class TimeRetentionStrategyTest {
 
   @Test
   public void testTimeRetention() {
+    String tableNameWithType = "myTable_OFFLINE";
     TimeRetentionStrategy retentionStrategy = new TimeRetentionStrategy(TimeUnit.DAYS, 30L);
 
     SegmentZKMetadata metadata = new OfflineSegmentZKMetadata();
 
     // Without setting time unit or end time, should not throw exception
-    assertFalse(retentionStrategy.isPurgeable(metadata));
+    assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
     metadata.setTimeUnit(TimeUnit.DAYS);
-    assertFalse(retentionStrategy.isPurgeable(metadata));
+    assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
 
     // Set end time to Jan 2nd, 1970 (not purgeable due to bogus timestamp)
     metadata.setEndTime(1L);
-    assertFalse(retentionStrategy.isPurgeable(metadata));
+    assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
 
     // Set end time to today
     long today = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
     metadata.setEndTime(today);
-    assertFalse(retentionStrategy.isPurgeable(metadata));
+    assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
 
     // Set end time to two weeks ago
     metadata.setEndTime(today - 14);
-    assertFalse(retentionStrategy.isPurgeable(metadata));
+    assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
 
     // Set end time to two months ago (purgeable due to being past the retention period)
     metadata.setEndTime(today - 60);
-    assertTrue(retentionStrategy.isPurgeable(metadata));
+    assertTrue(retentionStrategy.isPurgeable(tableNameWithType, metadata));
 
     // Set end time to 200 years in the future (not purgeable due to bogus timestamp)
     metadata.setEndTime(today + (365 * 200));
-    assertFalse(retentionStrategy.isPurgeable(metadata));
+    assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 45e0128..9b5c542 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -51,7 +51,6 @@ import org.apache.pinot.core.realtime.stream.StreamConfig;
 import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
 import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -63,7 +62,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(HLRealtimeSegmentDataManager.class);
   private final static long ONE_MINUTE_IN_MILLSEC = 1000 * 60;
 
-  private final String tableName;
+  private final String tableNameWithType;
   private final String segmentName;
   private final Schema schema;
   private final String timeColumnName;
@@ -109,7 +108,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _recordTransformer = CompoundTransformer.getDefaultTransformer(schema);
     this.serverMetrics = serverMetrics;
     this.segmentName = realtimeSegmentZKMetadata.getSegmentName();
-    this.tableName = tableConfig.getTableName();
+    this.tableNameWithType = tableConfig.getTableName();
     this.timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
 
     List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
@@ -164,10 +163,10 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig);
     String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" + _streamConfig.getTopicName();
     _streamLevelConsumer =
-        _streamConsumerFactory.createStreamLevelConsumer(clientId, tableName, schema, instanceMetadata, serverMetrics);
+        _streamConsumerFactory.createStreamLevelConsumer(clientId, tableNameWithType, schema, instanceMetadata, serverMetrics);
     _streamLevelConsumer.start();
 
-    tableStreamName = tableName + "_" + _streamConfig.getTopicName();
+    tableStreamName = tableNameWithType + "_" + _streamConfig.getTopicName();
 
     IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
     if (indexingConfig != null && indexingConfig.isAggregateMetrics()) {
@@ -258,7 +257,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           // lets convert the segment now
           RealtimeSegmentConverter converter =
               new RealtimeSegmentConverter(realtimeSegment, tempSegmentFolder.getAbsolutePath(), schema,
-                  realtimeSegmentZKMetadata.getTableName(), timeColumnName, realtimeSegmentZKMetadata.getSegmentName(),
+                  tableNameWithType, timeColumnName, realtimeSegmentZKMetadata.getSegmentName(),
                   sortedColumn, HLRealtimeSegmentDataManager.this.invertedIndexColumns, noDictionaryColumns,
                   null/*StarTreeIndexSpec*/); // Star tree not supported for HLC.
 
@@ -339,7 +338,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             segmentLogger
                 .error("FATAL: Exception committing or shutting down consumer commitSuccessful={}", commitSuccessful,
                     e);
-            serverMetrics.addMeteredTableValue(tableName, ServerMeter.REALTIME_OFFSET_COMMIT_EXCEPTIONS, 1L);
+            serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.REALTIME_OFFSET_COMMIT_EXCEPTIONS, 1L);
             if (!commitSuccessful) {
               _streamLevelConsumer.shutdown();
             }
@@ -348,7 +347,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           try {
             segmentLogger.info("Marking current segment as completed in Helix");
             RealtimeSegmentZKMetadata metadataToOverwrite = new RealtimeSegmentZKMetadata();
-            metadataToOverwrite.setTableName(realtimeSegmentZKMetadata.getTableName());
+            metadataToOverwrite.setTableName(tableNameWithType);
             metadataToOverwrite.setSegmentName(realtimeSegmentZKMetadata.getSegmentName());
             metadataToOverwrite.setSegmentType(SegmentType.OFFLINE);
             metadataToOverwrite.setStatus(Status.DONE);
@@ -377,7 +376,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     });
 
     indexingThread.start();
-    serverMetrics.addValueToTableGauge(tableName, ServerGauge.SEGMENT_COUNT, 1L);
+    serverMetrics.addValueToTableGauge(tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
     segmentLogger.debug("scheduling keepIndexing timer check");
     // start a schedule timer to keep track of the segment
     TimerService.timer.schedule(segmentStatusTask, ONE_MINUTE_IN_MILLSEC, ONE_MINUTE_IN_MILLSEC);
@@ -416,7 +415,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private void updateCurrentDocumentCountMetrics() {
     int currentRawDocs = realtimeSegment.getNumDocsIndexed();
     serverMetrics
-        .addValueToTableGauge(tableName, ServerGauge.DOCUMENT_COUNT, (currentRawDocs - lastUpdatedRawDocuments.get()));
+        .addValueToTableGauge(tableNameWithType, ServerGauge.DOCUMENT_COUNT, (currentRawDocs - lastUpdatedRawDocuments.get()));
     lastUpdatedRawDocuments.set(currentRawDocs);
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 7fcbd9c..59d2bb8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -68,7 +68,6 @@ import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
 import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
 import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
-import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
 import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
 import org.apache.pinot.core.realtime.stream.TransientConsumerException;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
@@ -234,7 +233,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private PartitionLevelConsumer _partitionLevelConsumer = null;
   private StreamMetadataProvider _streamMetadataProvider = null;
   private final File _resourceTmpDir;
-  private final String _tableName;
+  private final String _tableNameWithType;
   private final String _timeColumnName;
   private final List<String> _invertedIndexColumns;
   private final List<String> _noDictionaryColumns;
@@ -661,7 +660,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       // lets convert the segment now
       RealtimeSegmentConverter converter =
           new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema,
-              _segmentZKMetadata.getTableName(), _timeColumnName, _segmentZKMetadata.getSegmentName(), _sortedColumn,
+              _tableNameWithType, _timeColumnName, _segmentZKMetadata.getSegmentName(), _sortedColumn,
               _invertedIndexColumns, _noDictionaryColumns, _starTreeIndexSpec);
       segmentLogger.info("Trying to build segment");
       try {
@@ -1047,11 +1046,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _segmentName = new LLCSegmentName(_segmentNameStr);
     _streamPartitionId = _segmentName.getPartitionId();
-    _tableName = _tableConfig.getTableName();
+    _tableNameWithType = _tableConfig.getTableName();
     _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
-    _metricKeyName = _tableName + "-" + _streamTopic + "-" + _streamPartitionId;
+    _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId;
     segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
-    _tableStreamName = _tableName + "_" + _streamTopic;
+    _tableStreamName = _tableNameWithType + "_" + _streamTopic;
     _memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
         indexLoadingConfig.isRealtimeOffheapAllocation(), indexLoadingConfig.isDirectRealtimeOffheapAllocation(),
         serverMetrics);
@@ -1193,7 +1192,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // Number of rows indexed should be used for DOCUMENT_COUNT metric, and also for segment flush. Whereas,
     // Number of rows consumed should be used for consumption metric.
     long rowsIndexed = _numRowsIndexed - _lastUpdatedRowsIndexed.get();
-    _serverMetrics.addValueToTableGauge(_tableName, ServerGauge.DOCUMENT_COUNT, rowsIndexed);
+    _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, rowsIndexed);
     _lastUpdatedRowsIndexed.set(_numRowsIndexed);
     final long now = now();
     final int rowsConsumed = _numRowsConsumed - _lastConsumedCount;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
index 5126525..d92fce2 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
@@ -111,7 +111,7 @@ public class SegmentFetcherAndLoader {
             localSegmentMetadata = null;
           }
           try {
-            if (!isNewSegmentMetadata(newSegmentZKMetadata, localSegmentMetadata)) {
+            if (!isNewSegmentMetadata(tableNameWithType, newSegmentZKMetadata, localSegmentMetadata)) {
               LOGGER.info("Segment metadata same as before, loading {} of table {} (crc {}) from disk", segmentName,
                   tableNameWithType, localSegmentMetadata.getCrc());
               _instanceDataManager.addOfflineSegment(tableNameWithType, segmentName, indexDir);
@@ -143,7 +143,7 @@ public class SegmentFetcherAndLoader {
       // If we get here, then either it is the case that we have the segment loaded in memory (and therefore present
       // in disk) or, we need to load from the server. In the former case, we still need to check if the metadata
       // that we have is different from that in zookeeper.
-      if (isNewSegmentMetadata(newSegmentZKMetadata, localSegmentMetadata)) {
+      if (isNewSegmentMetadata(tableNameWithType, newSegmentZKMetadata, localSegmentMetadata)) {
         if (localSegmentMetadata == null) {
           LOGGER.info("Loading new segment {} of table {} from controller", segmentName, tableNameWithType);
         } else {
@@ -172,20 +172,19 @@ public class SegmentFetcherAndLoader {
     }
   }
 
-  private boolean isNewSegmentMetadata(@Nonnull OfflineSegmentZKMetadata newSegmentZKMetadata,
-      @Nullable SegmentMetadata existedSegmentMetadata) {
-    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(newSegmentZKMetadata.getTableName());
+  private boolean isNewSegmentMetadata(@Nonnull String tableNameWithType,
+      @Nonnull OfflineSegmentZKMetadata newSegmentZKMetadata, @Nullable SegmentMetadata existedSegmentMetadata) {
     String segmentName = newSegmentZKMetadata.getSegmentName();
 
     if (existedSegmentMetadata == null) {
-      LOGGER.info("Existed segment metadata is null for segment: {} in table: {}", segmentName, offlineTableName);
+      LOGGER.info("Existed segment metadata is null for segment: {} in table: {}", segmentName, tableNameWithType);
       return true;
     }
 
     long newCrc = newSegmentZKMetadata.getCrc();
     long existedCrc = Long.valueOf(existedSegmentMetadata.getCrc());
     LOGGER.info("New segment CRC: {}, existed segment CRC: {} for segment: {} in table: {}", newCrc, existedCrc,
-        segmentName, offlineTableName);
+        segmentName, tableNameWithType);
     return newCrc != existedCrc;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org