You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2020/05/12 15:55:51 UTC

[incubator-pinot] branch master updated: TimeColumnName in MutableSegmentImpl (#5368)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b4e461  TimeColumnName in MutableSegmentImpl (#5368)
1b4e461 is described below

commit 1b4e461e25bc5c35f972e55487575ca8623ec5fb
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Tue May 12 08:55:43 2020 -0700

    TimeColumnName in MutableSegmentImpl (#5368)
    
    Providing time column to MutableSegmentImpl via RealtimeSegmentConfig. This time column name is used to decide which column should be used to record minTime and maxTime for realtimeSegment.
---
 .../realtime/HLRealtimeSegmentDataManager.java     |  2 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |  3 ++-
 .../indexsegment/mutable/MutableSegmentImpl.java   |  5 +++--
 .../core/realtime/impl/RealtimeSegmentConfig.java  | 25 ++++++++++++++++------
 4 files changed, 25 insertions(+), 10 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 940064b..c6a0fd5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -178,7 +178,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     final int capacity = _streamConfig.getFlushThresholdRows();
     RealtimeSegmentConfig realtimeSegmentConfig =
         new RealtimeSegmentConfig.Builder().setSegmentName(_segmentName).setStreamName(_streamConfig.getTopicName())
-            .setSchema(schema).setCapacity(capacity)
+            .setSchema(schema).setTimeColumnName(_timeColumnName).setCapacity(capacity)
             .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
             .setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
             .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 0daae74..3058252 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1074,6 +1074,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
     _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
 
+    String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
     // TODO Validate configs
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
     _partitionLevelStreamConfig = new PartitionLevelStreamConfig(_tableNameWithType, indexingConfig.getStreamConfigs());
@@ -1143,7 +1144,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     String consumerDir = realtimeTableDataManager.getConsumerDir();
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
         new RealtimeSegmentConfig.Builder().setSegmentName(_segmentNameStr).setStreamName(_streamTopic)
-            .setSchema(_schema).setCapacity(_segmentMaxRowCount)
+            .setSchema(_schema).setTimeColumnName(timeColumnName).setCapacity(_segmentMaxRowCount)
             .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
             .setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
             .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index bf98521..39d7a30 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -98,6 +98,7 @@ public class MutableSegmentImpl implements MutableSegment {
 
   private final String _segmentName;
   private final Schema _schema;
+  private final String _timeColumnName;
   private final int _capacity;
   private final SegmentMetadata _segmentMetadata;
   private final boolean _offHeap;
@@ -143,6 +144,7 @@ public class MutableSegmentImpl implements MutableSegment {
   public MutableSegmentImpl(RealtimeSegmentConfig config) {
     _segmentName = config.getSegmentName();
     _schema = config.getSchema();
+    _timeColumnName = config.getTimeColumnName();
     _capacity = config.getCapacity();
     _segmentMetadata = new SegmentMetadataImpl(config.getRealtimeSegmentZKMetadata(), _schema) {
       @Override
@@ -419,8 +421,7 @@ public class MutableSegmentImpl implements MutableSegment {
       }
 
       // Update min/max value for time column
-      FieldSpec.FieldType fieldType = fieldSpec.getFieldType();
-      if (fieldType.equals(FieldSpec.FieldType.TIME) || fieldType.equals(FieldSpec.FieldType.DATE_TIME)) {
+      if (column.equals(_timeColumnName)) {
         long timeValue;
         if (value instanceof Number) {
           timeValue = ((Number) value).longValue();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
index 6a33b44..0f97048 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
@@ -30,6 +30,7 @@ public class RealtimeSegmentConfig {
   private final String _segmentName;
   private final String _streamName;
   private final Schema _schema;
+  private final String _timeColumnName;
   private final int _capacity;
   private final int _avgNumMultiValues;
   private final Set<String> _noDictionaryColumns;
@@ -47,8 +48,9 @@ public class RealtimeSegmentConfig {
   private final boolean _nullHandlingEnabled;
   private final String _consumerDir;
 
-  private RealtimeSegmentConfig(String segmentName, String streamName, Schema schema, int capacity,
-      int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns,
+  // TODO: Clean up this constructor. Most of these things can be extracted from tableConfig.
+  private RealtimeSegmentConfig(String segmentName, String streamName, Schema schema, String timeColumnName,
+      int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns,
       Set<String> invertedIndexColumns, Set<String> textIndexColumns,
       RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager,
       RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction,
@@ -56,6 +58,7 @@ public class RealtimeSegmentConfig {
     _segmentName = segmentName;
     _streamName = streamName;
     _schema = schema;
+    _timeColumnName = timeColumnName;
     _capacity = capacity;
     _avgNumMultiValues = avgNumMultiValues;
     _noDictionaryColumns = noDictionaryColumns;
@@ -86,6 +89,10 @@ public class RealtimeSegmentConfig {
     return _schema;
   }
 
+  public String getTimeColumnName() {
+    return _timeColumnName;
+  }
+
   public int getCapacity() {
     return _capacity;
   }
@@ -159,6 +166,7 @@ public class RealtimeSegmentConfig {
     private String _segmentName;
     private String _streamName;
     private Schema _schema;
+    private String _timeColumnName;
     private int _capacity;
     private int _avgNumMultiValues;
     private Set<String> _noDictionaryColumns;
@@ -194,6 +202,11 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
+    public Builder setTimeColumnName(String timeColumnName) {
+      _timeColumnName = timeColumnName;
+      return this;
+    }
+
     public Builder setCapacity(int capacity) {
       _capacity = capacity;
       return this;
@@ -283,10 +296,10 @@ public class RealtimeSegmentConfig {
     }
 
     public RealtimeSegmentConfig build() {
-      return new RealtimeSegmentConfig(_segmentName, _streamName, _schema, _capacity, _avgNumMultiValues,
-          _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns, _textIndexColumns,
-          _realtimeSegmentZKMetadata, _offHeap, _memoryManager, _statsHistory, _partitionColumn, _partitionFunction,
-          _partitionId, _aggregateMetrics, _nullHandlingEnabled, _consumerDir);
+      return new RealtimeSegmentConfig(_segmentName, _streamName, _schema, _timeColumnName, _capacity,
+          _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
+          _textIndexColumns, _realtimeSegmentZKMetadata, _offHeap, _memoryManager, _statsHistory, _partitionColumn,
+          _partitionFunction, _partitionId, _aggregateMetrics, _nullHandlingEnabled, _consumerDir);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org