You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2019/06/10 20:01:07 UTC
[incubator-pinot] branch master updated: Cleaning up getTableName()
for segment zk metadata (#4288)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new caac81d Cleaning up getTableName() for segment zk metadata (#4288)
caac81d is described below
commit caac81dfca7bb2de927e2c6306b554ac5cf5d6e6
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Mon Jun 10 13:00:59 2019 -0700
Cleaning up getTableName() for segment zk metadata (#4288)
* Cleaning up getTableName() for segment zk metadata
Removing unnecessary calls to segmentZKMetadata.getTableName()
* Updated variable names for HL,LLRealtimeSegmentDataManager
---
.../helix/core/PinotHelixResourceManager.java | 47 ++++++++++------------
.../helix/core/retention/RetentionManager.java | 4 +-
.../core/retention/strategy/RetentionStrategy.java | 3 +-
.../retention/strategy/TimeRetentionStrategy.java | 6 +--
.../strategy/TimeRetentionStrategyTest.java | 15 +++----
.../realtime/HLRealtimeSegmentDataManager.java | 19 +++++----
.../realtime/LLRealtimeSegmentDataManager.java | 13 +++---
.../starter/helix/SegmentFetcherAndLoader.java | 13 +++---
8 files changed, 57 insertions(+), 63 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 877458d..cb57eb4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1632,7 +1632,7 @@ public class PinotHelixResourceManager {
"Failed to update ZK metadata for segment: " + segmentName + " of table: " + offlineTableName);
}
LOGGER.info("Updated segment: {} of table: {} to property store", segmentName, offlineTableName);
- final String rawTableName = offlineSegmentZKMetadata.getTableName();
+ final String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName);
TableConfig tableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, rawTableName);
Preconditions.checkNotNull(tableConfig);
@@ -1640,14 +1640,14 @@ public class PinotHelixResourceManager {
// Send a message to the servers to update the segment.
// We return success even if we are not able to send messages (which can happen if no servers are alive).
// For segment validation errors we would have returned earlier.
- sendSegmentRefreshMessage(offlineSegmentZKMetadata);
+ sendSegmentRefreshMessage(offlineTableName, offlineSegmentZKMetadata);
// Send a message to the brokers to update the table's time boundary info if the segment push type is APPEND.
if (shouldSendTimeboundaryRefreshMsg(rawTableName, tableConfig)) {
- sendTimeboundaryRefreshMessageToBrokers(offlineSegmentZKMetadata);
+ sendTimeboundaryRefreshMessageToBrokers(offlineTableName, offlineSegmentZKMetadata);
}
} else {
// Go through the ONLINE->OFFLINE->ONLINE state transition to update the segment
- if (!updateExistedSegment(offlineSegmentZKMetadata)) {
+ if (!updateExistedSegment(offlineTableName, offlineSegmentZKMetadata)) {
LOGGER.error("Failed to refresh segment: {} of table: {} by the ONLINE->OFFLINE->ONLINE state transition",
segmentName, offlineTableName);
}
@@ -1655,13 +1655,13 @@ public class PinotHelixResourceManager {
}
// Send a message to the pinot brokers to notify them to update its Timeboundary Info.
- private void sendTimeboundaryRefreshMessageToBrokers(OfflineSegmentZKMetadata segmentZKMetadata) {
+ private void sendTimeboundaryRefreshMessageToBrokers(String tableNameWithType,
+ OfflineSegmentZKMetadata segmentZKMetadata) {
final String segmentName = segmentZKMetadata.getSegmentName();
- final String rawTableName = segmentZKMetadata.getTableName();
- final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+ final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
final int timeoutMs = -1; // Infinite timeout on the recipient.
- TimeboundaryRefreshMessage refreshMessage = new TimeboundaryRefreshMessage(offlineTableName, segmentName);
+ TimeboundaryRefreshMessage refreshMessage = new TimeboundaryRefreshMessage(tableNameWithType, segmentName);
Criteria recipientCriteria = new Criteria();
// Currently Helix does not support send message to a Spectator. So we walk around the problem by sending the
@@ -1672,7 +1672,7 @@ public class PinotHelixResourceManager {
recipientCriteria.setResource(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
recipientCriteria.setDataSource(Criteria.DataSource.EXTERNALVIEW);
// The brokerResource field in the EXTERNALVIEW stores the offline table name in the Partition subfield.
- recipientCriteria.setPartition(offlineTableName);
+ recipientCriteria.setPartition(tableNameWithType);
ClusterMessagingService messagingService = _helixZkManager.getMessagingService();
LOGGER.info("Sending timeboundary refresh message for segment {} of table {}:{} to recipients {}", segmentName,
@@ -1686,7 +1686,7 @@ public class PinotHelixResourceManager {
// May be the case when none of the brokers are up yet. That is OK, because when they come up they will get
// the latest time boundary info.
LOGGER.warn("Unable to send timeboundary refresh message for {} of table {}, nMsgs={}", segmentName,
- offlineTableName, nMsgsSent);
+ tableNameWithType, nMsgsSent);
}
}
@@ -1770,12 +1770,13 @@ public class PinotHelixResourceManager {
* The message is sent as session-specific, so if a new zk session is created (e.g. server restarts)
* it will not get the message.
*
+ * @param tableNameWithType Table name with type
* @param segmentZKMetadata is the metadata of the newly arrived segment.
*/
// NOTE: method should be thread-safe
- private void sendSegmentRefreshMessage(OfflineSegmentZKMetadata segmentZKMetadata) {
+ private void sendSegmentRefreshMessage(String tableNameWithType, OfflineSegmentZKMetadata segmentZKMetadata) {
final String segmentName = segmentZKMetadata.getSegmentName();
- final String rawTableName = segmentZKMetadata.getTableName();
+ final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
final int timeoutMs = -1; // Infinite timeout on the recipient.
@@ -1842,27 +1843,21 @@ public class PinotHelixResourceManager {
HelixHelper.addSegmentToIdealState(_helixZkManager, offlineTableName, segmentName, assignedInstances);
}
- private boolean updateExistedSegment(SegmentZKMetadata segmentZKMetadata) {
- final String tableName;
- if (segmentZKMetadata instanceof RealtimeSegmentZKMetadata) {
- tableName = TableNameBuilder.REALTIME.tableNameWithType(segmentZKMetadata.getTableName());
- } else {
- tableName = TableNameBuilder.OFFLINE.tableNameWithType(segmentZKMetadata.getTableName());
- }
+ private boolean updateExistedSegment(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) {
final String segmentName = segmentZKMetadata.getSegmentName();
HelixDataAccessor helixDataAccessor = _helixZkManager.getHelixDataAccessor();
- PropertyKey idealStatePropertyKey = _keyBuilder.idealStates(tableName);
+ PropertyKey idealStatePropertyKey = _keyBuilder.idealStates(tableNameWithType);
// Set all partitions to offline to unload them from the servers
boolean updateSuccessful;
do {
- final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
+ final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
final Set<String> instanceSet = idealState.getInstanceSet(segmentName);
if (instanceSet == null || instanceSet.size() == 0) {
// We are trying to refresh a segment, but there are no instances currently assigned for fielding this segment.
// When those instances do come up, the segment will be uploaded correctly, so return success but log a warning.
- LOGGER.warn("No instances as yet for segment {}, table {}", segmentName, tableName);
+ LOGGER.warn("No instances as yet for segment {}, table {}", segmentName, tableNameWithType);
return true;
}
for (final String instance : instanceSet) {
@@ -1872,7 +1867,7 @@ public class PinotHelixResourceManager {
} while (!updateSuccessful);
// Check that the ideal state has been written to ZK
- IdealState updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
+ IdealState updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
Map<String, String> instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName);
for (String state : instanceStateMap.values()) {
if (!"OFFLINE".equals(state)) {
@@ -1883,7 +1878,7 @@ public class PinotHelixResourceManager {
// Wait until the partitions are offline in the external view
LOGGER.info("Wait until segment - " + segmentName + " to be OFFLINE in ExternalView");
- if (!ifExternalViewChangeReflectedForState(tableName, segmentName, "OFFLINE",
+ if (!ifExternalViewChangeReflectedForState(tableNameWithType, segmentName, "OFFLINE",
_externalViewOnlineToOfflineTimeoutMillis, false)) {
LOGGER
.error("External view for segment {} did not reflect the ideal state of OFFLINE within the {} ms time limit",
@@ -1893,7 +1888,7 @@ public class PinotHelixResourceManager {
// Set all partitions to online so that they load the new segment data
do {
- final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
+ final IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
final Set<String> instanceSet = idealState.getInstanceSet(segmentName);
LOGGER.info("Found {} instances for segment '{}', in ideal state", instanceSet.size(), segmentName);
for (final String instance : instanceSet) {
@@ -1904,7 +1899,7 @@ public class PinotHelixResourceManager {
} while (!updateSuccessful);
// Check that the ideal state has been written to ZK
- updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
+ updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName);
LOGGER
.info("Found {} instances for segment '{}', after updating ideal state", instanceStateMap.size(), segmentName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index b5595e7..7d5182d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -113,7 +113,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
List<String> segmentsToDelete = new ArrayList<>();
for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : _pinotHelixResourceManager
.getOfflineSegmentMetadata(offlineTableName)) {
- if (retentionStrategy.isPurgeable(offlineSegmentZKMetadata)) {
+ if (retentionStrategy.isPurgeable(offlineTableName, offlineSegmentZKMetadata)) {
segmentsToDelete.add(offlineSegmentZKMetadata.getSegmentName());
}
}
@@ -141,7 +141,7 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
}
} else {
// Sealed segment
- if (retentionStrategy.isPurgeable(realtimeSegmentZKMetadata)) {
+ if (retentionStrategy.isPurgeable(realtimeTableName, realtimeSegmentZKMetadata)) {
segmentsToDelete.add(segmentName);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
index 5917c9a..e8f6336 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
@@ -29,8 +29,9 @@ public interface RetentionStrategy {
/**
* Returns whether the segment should be purged
*
+ * @param tableNameWithType Table name with type
* @param segmentZKMetadata Segment ZK metadata
* @return Whether the segment should be purged
*/
- boolean isPurgeable(SegmentZKMetadata segmentZKMetadata);
+ boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
index 84373c5..167e030 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
@@ -38,11 +38,11 @@ public class TimeRetentionStrategy implements RetentionStrategy {
}
@Override
- public boolean isPurgeable(SegmentZKMetadata segmentZKMetadata) {
+ public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) {
TimeUnit timeUnit = segmentZKMetadata.getTimeUnit();
if (timeUnit == null) {
LOGGER.warn("Time unit is not set for {} segment: {} of table: {}", segmentZKMetadata.getSegmentType(),
- segmentZKMetadata.getSegmentName(), segmentZKMetadata.getTableName());
+ segmentZKMetadata.getSegmentName(), tableNameWithType);
return false;
}
long endTime = segmentZKMetadata.getEndTime();
@@ -51,7 +51,7 @@ public class TimeRetentionStrategy implements RetentionStrategy {
// Check that the end time is between 1971 and 2071
if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
LOGGER.warn("{} segment: {} of table: {} has invalid end time: {} {}", segmentZKMetadata.getSegmentType(),
- segmentZKMetadata.getSegmentName(), segmentZKMetadata.getTableName(), endTime, timeUnit);
+ segmentZKMetadata.getSegmentName(), tableNameWithType, endTime, timeUnit);
return false;
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
index b2533aa..009b397 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategyTest.java
@@ -34,34 +34,35 @@ public class TimeRetentionStrategyTest {
@Test
public void testTimeRetention() {
+ String tableNameWithType = "myTable_OFFLINE";
TimeRetentionStrategy retentionStrategy = new TimeRetentionStrategy(TimeUnit.DAYS, 30L);
SegmentZKMetadata metadata = new OfflineSegmentZKMetadata();
// Without setting time unit or end time, should not throw exception
- assertFalse(retentionStrategy.isPurgeable(metadata));
+ assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
metadata.setTimeUnit(TimeUnit.DAYS);
- assertFalse(retentionStrategy.isPurgeable(metadata));
+ assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
// Set end time to Jan 2nd, 1970 (not purgeable due to bogus timestamp)
metadata.setEndTime(1L);
- assertFalse(retentionStrategy.isPurgeable(metadata));
+ assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
// Set end time to today
long today = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
metadata.setEndTime(today);
- assertFalse(retentionStrategy.isPurgeable(metadata));
+ assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
// Set end time to two weeks ago
metadata.setEndTime(today - 14);
- assertFalse(retentionStrategy.isPurgeable(metadata));
+ assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
// Set end time to two months ago (purgeable due to being past the retention period)
metadata.setEndTime(today - 60);
- assertTrue(retentionStrategy.isPurgeable(metadata));
+ assertTrue(retentionStrategy.isPurgeable(tableNameWithType, metadata));
// Set end time to 200 years in the future (not purgeable due to bogus timestamp)
metadata.setEndTime(today + (365 * 200));
- assertFalse(retentionStrategy.isPurgeable(metadata));
+ assertFalse(retentionStrategy.isPurgeable(tableNameWithType, metadata));
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 45e0128..9b5c542 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -51,7 +51,6 @@ import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -63,7 +62,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private static final Logger LOGGER = LoggerFactory.getLogger(HLRealtimeSegmentDataManager.class);
private final static long ONE_MINUTE_IN_MILLSEC = 1000 * 60;
- private final String tableName;
+ private final String tableNameWithType;
private final String segmentName;
private final Schema schema;
private final String timeColumnName;
@@ -109,7 +108,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_recordTransformer = CompoundTransformer.getDefaultTransformer(schema);
this.serverMetrics = serverMetrics;
this.segmentName = realtimeSegmentZKMetadata.getSegmentName();
- this.tableName = tableConfig.getTableName();
+ this.tableNameWithType = tableConfig.getTableName();
this.timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
@@ -164,10 +163,10 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig);
String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" + _streamConfig.getTopicName();
_streamLevelConsumer =
- _streamConsumerFactory.createStreamLevelConsumer(clientId, tableName, schema, instanceMetadata, serverMetrics);
+ _streamConsumerFactory.createStreamLevelConsumer(clientId, tableNameWithType, schema, instanceMetadata, serverMetrics);
_streamLevelConsumer.start();
- tableStreamName = tableName + "_" + _streamConfig.getTopicName();
+ tableStreamName = tableNameWithType + "_" + _streamConfig.getTopicName();
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
if (indexingConfig != null && indexingConfig.isAggregateMetrics()) {
@@ -258,7 +257,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// lets convert the segment now
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(realtimeSegment, tempSegmentFolder.getAbsolutePath(), schema,
- realtimeSegmentZKMetadata.getTableName(), timeColumnName, realtimeSegmentZKMetadata.getSegmentName(),
+ tableNameWithType, timeColumnName, realtimeSegmentZKMetadata.getSegmentName(),
sortedColumn, HLRealtimeSegmentDataManager.this.invertedIndexColumns, noDictionaryColumns,
null/*StarTreeIndexSpec*/); // Star tree not supported for HLC.
@@ -339,7 +338,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
segmentLogger
.error("FATAL: Exception committing or shutting down consumer commitSuccessful={}", commitSuccessful,
e);
- serverMetrics.addMeteredTableValue(tableName, ServerMeter.REALTIME_OFFSET_COMMIT_EXCEPTIONS, 1L);
+ serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.REALTIME_OFFSET_COMMIT_EXCEPTIONS, 1L);
if (!commitSuccessful) {
_streamLevelConsumer.shutdown();
}
@@ -348,7 +347,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
try {
segmentLogger.info("Marking current segment as completed in Helix");
RealtimeSegmentZKMetadata metadataToOverwrite = new RealtimeSegmentZKMetadata();
- metadataToOverwrite.setTableName(realtimeSegmentZKMetadata.getTableName());
+ metadataToOverwrite.setTableName(tableNameWithType);
metadataToOverwrite.setSegmentName(realtimeSegmentZKMetadata.getSegmentName());
metadataToOverwrite.setSegmentType(SegmentType.OFFLINE);
metadataToOverwrite.setStatus(Status.DONE);
@@ -377,7 +376,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
});
indexingThread.start();
- serverMetrics.addValueToTableGauge(tableName, ServerGauge.SEGMENT_COUNT, 1L);
+ serverMetrics.addValueToTableGauge(tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
segmentLogger.debug("scheduling keepIndexing timer check");
// start a schedule timer to keep track of the segment
TimerService.timer.schedule(segmentStatusTask, ONE_MINUTE_IN_MILLSEC, ONE_MINUTE_IN_MILLSEC);
@@ -416,7 +415,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private void updateCurrentDocumentCountMetrics() {
int currentRawDocs = realtimeSegment.getNumDocsIndexed();
serverMetrics
- .addValueToTableGauge(tableName, ServerGauge.DOCUMENT_COUNT, (currentRawDocs - lastUpdatedRawDocuments.get()));
+ .addValueToTableGauge(tableNameWithType, ServerGauge.DOCUMENT_COUNT, (currentRawDocs - lastUpdatedRawDocuments.get()));
lastUpdatedRawDocuments.set(currentRawDocs);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 7fcbd9c..59d2bb8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -68,7 +68,6 @@ import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
-import org.apache.pinot.core.realtime.stream.StreamMessageMetadata;
import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
import org.apache.pinot.core.realtime.stream.TransientConsumerException;
import org.apache.pinot.core.segment.creator.impl.V1Constants;
@@ -234,7 +233,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private PartitionLevelConsumer _partitionLevelConsumer = null;
private StreamMetadataProvider _streamMetadataProvider = null;
private final File _resourceTmpDir;
- private final String _tableName;
+ private final String _tableNameWithType;
private final String _timeColumnName;
private final List<String> _invertedIndexColumns;
private final List<String> _noDictionaryColumns;
@@ -661,7 +660,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// lets convert the segment now
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema,
- _segmentZKMetadata.getTableName(), _timeColumnName, _segmentZKMetadata.getSegmentName(), _sortedColumn,
+ _tableNameWithType, _timeColumnName, _segmentZKMetadata.getSegmentName(), _sortedColumn,
_invertedIndexColumns, _noDictionaryColumns, _starTreeIndexSpec);
segmentLogger.info("Trying to build segment");
try {
@@ -1047,11 +1046,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentNameStr = _segmentZKMetadata.getSegmentName();
_segmentName = new LLCSegmentName(_segmentNameStr);
_streamPartitionId = _segmentName.getPartitionId();
- _tableName = _tableConfig.getTableName();
+ _tableNameWithType = _tableConfig.getTableName();
_timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
- _metricKeyName = _tableName + "-" + _streamTopic + "-" + _streamPartitionId;
+ _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId;
segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
- _tableStreamName = _tableName + "_" + _streamTopic;
+ _tableStreamName = _tableNameWithType + "_" + _streamTopic;
_memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
indexLoadingConfig.isRealtimeOffheapAllocation(), indexLoadingConfig.isDirectRealtimeOffheapAllocation(),
serverMetrics);
@@ -1193,7 +1192,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Number of rows indexed should be used for DOCUMENT_COUNT metric, and also for segment flush. Whereas,
// Number of rows consumed should be used for consumption metric.
long rowsIndexed = _numRowsIndexed - _lastUpdatedRowsIndexed.get();
- _serverMetrics.addValueToTableGauge(_tableName, ServerGauge.DOCUMENT_COUNT, rowsIndexed);
+ _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, rowsIndexed);
_lastUpdatedRowsIndexed.set(_numRowsIndexed);
final long now = now();
final int rowsConsumed = _numRowsConsumed - _lastConsumedCount;
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
index 5126525..d92fce2 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentFetcherAndLoader.java
@@ -111,7 +111,7 @@ public class SegmentFetcherAndLoader {
localSegmentMetadata = null;
}
try {
- if (!isNewSegmentMetadata(newSegmentZKMetadata, localSegmentMetadata)) {
+ if (!isNewSegmentMetadata(tableNameWithType, newSegmentZKMetadata, localSegmentMetadata)) {
LOGGER.info("Segment metadata same as before, loading {} of table {} (crc {}) from disk", segmentName,
tableNameWithType, localSegmentMetadata.getCrc());
_instanceDataManager.addOfflineSegment(tableNameWithType, segmentName, indexDir);
@@ -143,7 +143,7 @@ public class SegmentFetcherAndLoader {
// If we get here, then either it is the case that we have the segment loaded in memory (and therefore present
// in disk) or, we need to load from the server. In the former case, we still need to check if the metadata
// that we have is different from that in zookeeper.
- if (isNewSegmentMetadata(newSegmentZKMetadata, localSegmentMetadata)) {
+ if (isNewSegmentMetadata(tableNameWithType, newSegmentZKMetadata, localSegmentMetadata)) {
if (localSegmentMetadata == null) {
LOGGER.info("Loading new segment {} of table {} from controller", segmentName, tableNameWithType);
} else {
@@ -172,20 +172,19 @@ public class SegmentFetcherAndLoader {
}
}
- private boolean isNewSegmentMetadata(@Nonnull OfflineSegmentZKMetadata newSegmentZKMetadata,
- @Nullable SegmentMetadata existedSegmentMetadata) {
- String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(newSegmentZKMetadata.getTableName());
+ private boolean isNewSegmentMetadata(@Nonnull String tableNameWithType,
+ @Nonnull OfflineSegmentZKMetadata newSegmentZKMetadata, @Nullable SegmentMetadata existedSegmentMetadata) {
String segmentName = newSegmentZKMetadata.getSegmentName();
if (existedSegmentMetadata == null) {
- LOGGER.info("Existed segment metadata is null for segment: {} in table: {}", segmentName, offlineTableName);
+ LOGGER.info("Existed segment metadata is null for segment: {} in table: {}", segmentName, tableNameWithType);
return true;
}
long newCrc = newSegmentZKMetadata.getCrc();
long existedCrc = Long.valueOf(existedSegmentMetadata.getCrc());
LOGGER.info("New segment CRC: {}, existed segment CRC: {} for segment: {} in table: {}", newCrc, existedCrc,
- segmentName, offlineTableName);
+ segmentName, tableNameWithType);
return newCrc != existedCrc;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org