You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2020/05/12 15:55:51 UTC
[incubator-pinot] branch master updated: TimeColumnName in
MutableSegmentImpl (#5368)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1b4e461 TimeColumnName in MutableSegmentImpl (#5368)
1b4e461 is described below
commit 1b4e461e25bc5c35f972e55487575ca8623ec5fb
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Tue May 12 08:55:43 2020 -0700
TimeColumnName in MutableSegmentImpl (#5368)
Providing time column to MutableSegmentImpl via RealtimeSegmentConfig. This time column name is used to decide which column should be used to record minTime and maxTime for realtimeSegment.
---
.../realtime/HLRealtimeSegmentDataManager.java | 2 +-
.../realtime/LLRealtimeSegmentDataManager.java | 3 ++-
.../indexsegment/mutable/MutableSegmentImpl.java | 5 +++--
.../core/realtime/impl/RealtimeSegmentConfig.java | 25 ++++++++++++++++------
4 files changed, 25 insertions(+), 10 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 940064b..c6a0fd5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -178,7 +178,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
final int capacity = _streamConfig.getFlushThresholdRows();
RealtimeSegmentConfig realtimeSegmentConfig =
new RealtimeSegmentConfig.Builder().setSegmentName(_segmentName).setStreamName(_streamConfig.getTopicName())
- .setSchema(schema).setCapacity(capacity)
+ .setSchema(schema).setTimeColumnName(_timeColumnName).setCapacity(capacity)
.setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
.setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0daae74..3058252 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1074,6 +1074,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
_protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
+ String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
// TODO Validate configs
IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
_partitionLevelStreamConfig = new PartitionLevelStreamConfig(_tableNameWithType, indexingConfig.getStreamConfigs());
@@ -1143,7 +1144,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
String consumerDir = realtimeTableDataManager.getConsumerDir();
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setSegmentName(_segmentNameStr).setStreamName(_streamTopic)
- .setSchema(_schema).setCapacity(_segmentMaxRowCount)
+ .setSchema(_schema).setTimeColumnName(timeColumnName).setCapacity(_segmentMaxRowCount)
.setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
.setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index bf98521..39d7a30 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -98,6 +98,7 @@ public class MutableSegmentImpl implements MutableSegment {
private final String _segmentName;
private final Schema _schema;
+ private final String _timeColumnName;
private final int _capacity;
private final SegmentMetadata _segmentMetadata;
private final boolean _offHeap;
@@ -143,6 +144,7 @@ public class MutableSegmentImpl implements MutableSegment {
public MutableSegmentImpl(RealtimeSegmentConfig config) {
_segmentName = config.getSegmentName();
_schema = config.getSchema();
+ _timeColumnName = config.getTimeColumnName();
_capacity = config.getCapacity();
_segmentMetadata = new SegmentMetadataImpl(config.getRealtimeSegmentZKMetadata(), _schema) {
@Override
@@ -419,8 +421,7 @@ public class MutableSegmentImpl implements MutableSegment {
}
// Update min/max value for time column
- FieldSpec.FieldType fieldType = fieldSpec.getFieldType();
- if (fieldType.equals(FieldSpec.FieldType.TIME) || fieldType.equals(FieldSpec.FieldType.DATE_TIME)) {
+ if (column.equals(_timeColumnName)) {
long timeValue;
if (value instanceof Number) {
timeValue = ((Number) value).longValue();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
index 6a33b44..0f97048 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
@@ -30,6 +30,7 @@ public class RealtimeSegmentConfig {
private final String _segmentName;
private final String _streamName;
private final Schema _schema;
+ private final String _timeColumnName;
private final int _capacity;
private final int _avgNumMultiValues;
private final Set<String> _noDictionaryColumns;
@@ -47,8 +48,9 @@ public class RealtimeSegmentConfig {
private final boolean _nullHandlingEnabled;
private final String _consumerDir;
- private RealtimeSegmentConfig(String segmentName, String streamName, Schema schema, int capacity,
- int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns,
+ // TODO: Clean up this constructor. Most of these things can be extracted from tableConfig.
+ private RealtimeSegmentConfig(String segmentName, String streamName, Schema schema, String timeColumnName,
+ int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns,
Set<String> invertedIndexColumns, Set<String> textIndexColumns,
RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager,
RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction,
@@ -56,6 +58,7 @@ public class RealtimeSegmentConfig {
_segmentName = segmentName;
_streamName = streamName;
_schema = schema;
+ _timeColumnName = timeColumnName;
_capacity = capacity;
_avgNumMultiValues = avgNumMultiValues;
_noDictionaryColumns = noDictionaryColumns;
@@ -86,6 +89,10 @@ public class RealtimeSegmentConfig {
return _schema;
}
+ public String getTimeColumnName() {
+ return _timeColumnName;
+ }
+
public int getCapacity() {
return _capacity;
}
@@ -159,6 +166,7 @@ public class RealtimeSegmentConfig {
private String _segmentName;
private String _streamName;
private Schema _schema;
+ private String _timeColumnName;
private int _capacity;
private int _avgNumMultiValues;
private Set<String> _noDictionaryColumns;
@@ -194,6 +202,11 @@ public class RealtimeSegmentConfig {
return this;
}
+ public Builder setTimeColumnName(String timeColumnName) {
+ _timeColumnName = timeColumnName;
+ return this;
+ }
+
public Builder setCapacity(int capacity) {
_capacity = capacity;
return this;
@@ -283,10 +296,10 @@ public class RealtimeSegmentConfig {
}
public RealtimeSegmentConfig build() {
- return new RealtimeSegmentConfig(_segmentName, _streamName, _schema, _capacity, _avgNumMultiValues,
- _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns, _textIndexColumns,
- _realtimeSegmentZKMetadata, _offHeap, _memoryManager, _statsHistory, _partitionColumn, _partitionFunction,
- _partitionId, _aggregateMetrics, _nullHandlingEnabled, _consumerDir);
+ return new RealtimeSegmentConfig(_segmentName, _streamName, _schema, _timeColumnName, _capacity,
+ _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
+ _textIndexColumns, _realtimeSegmentZKMetadata, _offHeap, _memoryManager, _statsHistory, _partitionColumn,
+ _partitionFunction, _partitionId, _aggregateMetrics, _nullHandlingEnabled, _consumerDir);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org