You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/01/22 21:16:19 UTC

[GitHub] [incubator-pinot] jackjlli opened a new pull request #6479: Support data ingestion for generating offline segment in one pass

jackjlli opened a new pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479


   ## Description
   Currently when generating an offline segment, raw data has to be traversed two times; one time for gathering stats in `RecordReaderSegmentCreationDataSource`, another time for ingesting actual data.
   
   In this PR, we introduce a new way of ingesting offline data by traversing the raw data only once. Similarly to mutable realtime segment, a simplified class called `IntermediateSegment` will be initialized to gather all the data into dictionary and forwarded index. After all the records are ingested, `IntermediateSegmentRecordReader` will be passed into `SegmentIndexCreationDriver` with `SegmentGeneratorConfig`, and the final offline segment will be built like this:
   ```
       // Build the segment from intermediate segment.
       SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
       driver.init(segmentGeneratorConfig, intermediateSegmentRecordReader);
       driver.build();
   ```
   
   This mechanism helps greatly reduce the memory pressure in some environment. E.g. if pinot segments need to be generated directly from spark executors when data is loaded as iterator, the raw data doesn't have to load into memory two times.
   
   
   ## Upgrade Notes
   Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
   * [ ] Yes (Please label as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR fix a zero-downtime upgrade introduced earlier?
   * [ ] Yes (Please label this as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR otherwise need attention when creating release notes? Things to consider:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   * [ ] Yes (Please label this PR as **<code>release-notes</code>** and complete the section on Release Notes)
   ## Release Notes
   If you have tagged this as either backward-incompat or release-notes,
   you MUST add text here that you would like to see appear in release notes of the
   next release.
   
   If you have a series of commits adding or enabling a feature, then
   add this section only in final commit that marks the feature completed.
   Refer to earlier release notes to see examples of text
   
   ## Documentation
   If you have introduced a new feature or configuration, please add it to the documentation as well.
   See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on a change in pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli commented on a change in pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#discussion_r599147765



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.readers;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+public class IntermediateSegmentRecordReader implements RecordReader {

Review comment:
       Sorting can be done from either the reader or the input data. We can do that in later PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli edited a comment on pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli edited a comment on pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#issuecomment-767180093


   > I think this is still two pass, right? The second pass is on IntermediateSegment.
   > Could you elaborate a bit on where's the memory pressure coming from in the existing implementation? And how does this approach solve it (IntermediateSegment) does need its own storage.
   
   This is still two passes, but only one pass on the raw data.
   E.g. in spark job where the data type is on row basis (like Dataset\<Row\>, RDD\<Row\>, Dataset\<T\>, RDD\<T\>), in order to  traverse the record, the API forEachPartition(iterator) is required to be called. While iterator can only provide the data once, thus the raw data has to be cached in the same executor in order to traverse it two times, if we stick on to the existing segment generation code.
   
   The below lists out the possible solutions:
   1) cache all the raw data into executor memory in order to reuse the same data (using existing code of segment generation)
   2) traverse the raw data once and gathers the intermediate stats, and create offline segment based on the intermediate results (current PR)
   3) call forEachPartition(iterator) two times; gathering stats to an extra DF/RDD in the 1st iteration, and ingesting actual raw data based on the former DF/RDD in the 2nd round (code refactor still needed, and one extra DF/RDD involved)
   
   The Approach 1) needs to load all the raw data into the singe executor before initializing the `SegmentIndexCreationDriver`( raw data traversal is encapsulated in that class). The Approach 2) and 3) basically have the same idea that spilt the current code into two parts, while the difference is that 2) works on a stats collector and 3) works on the raw data (one extra data traversal). 
   The Approach 2) works well if the cardinality of columns is not very high (just like mutable segment for realtime segments), while it could consume the same memory resource if cardinality is high. 
   The approach 3) works well if data is cached in the same executor, while the cached raw data itself can still be costly since the deserialized data is stored in the memory.
   
   One optimization for this PR is to reused the dictionaries and forwarded indices stored in intermediateSegment, and pass it to immutable segment.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#issuecomment-771020682


   > Thanks for the details @jackjlli, could you also is IntermediateSegment better than existing MutableSegment? For example, you could stream input data to MutableSegment and flush it as needed. This also solves multiple problems:
   > 
   > * Common code base for offline and RT segment generation (at least for the streaming part).
   > * Sorting can now be done for offline within SegmentGeneration, instead of having users to explicitly do so.
   > * Auto segment sizing that happens in RT will can also be done with offline now.
   > 
   > Thoughts @jackjlli @Jackie-Jiang?
   
   I think this is a good idea to explore, but I suspect memory utilization on the offline side may go up significantly.
   
   Also, the auto-segment sizing in realtime is implemented (in the controller) by learning the history of segments already completed. For offline generation, if we can keep a history or some learning mechanism, then it may be possible to implement approximate segment sizing algorithms -- whether we use MutableSegment to build segments or not.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli commented on pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#issuecomment-771083123


   > > Thanks for the details @jackjlli, could you also is IntermediateSegment better than existing MutableSegment? For example, you could stream input data to MutableSegment and flush it as needed. This also solves multiple problems:
   > > 
   > > * Common code base for offline and RT segment generation (at least for the streaming part).
   > > * Sorting can now be done for offline within SegmentGeneration, instead of having users to explicitly do so.
   > > * Auto segment sizing that happens in RT will can also be done with offline now.
   > > 
   > > Thoughts @jackjlli @Jackie-Jiang?
   > 
   > I think this is a good idea to explore, but I suspect memory utilization on the offline side may go up significantly.
   > 
   > Also, the auto-segment sizing in realtime is implemented (in the controller) by learning the history of segments already completed. For offline generation, if we can keep a history or some learning mechanism, then it may be possible to implement approximate segment sizing algorithms -- whether we use MutableSegment to build segments or not.
   
   1. Yes, memory utilization will go up significantly, that's why I didn't directly use `MutableSegment` but `IntermediateSegment` as the intermediate container here.  In fact, both `IntermediateSegment` and `MutableSegment` share the common minimal piece of logic, which is that both have forwarded index. The slight difference is that `MutableSegment` will have all the indices (if applicable) like inverted index, text index, etc, for querying purposes. `IntermediateSegment` just keep the minimal component like dictionary.
   2. Plus, if we want partitioning/ sorting, these steps can be done in the platform (like mapreduce, spark) before converting the raw data. In fact, we've already had that logic in LinkedIn. Once this PR is committed, we can consider open sourcing that spark code as well.
   3. Auto-segment size is a good idea that historical data can be used to predict the cardinality or buffer size. While offline segment generation is not always done on the same machine, the historical data would be meaningless if they cannot be reused. If historical data is from controller, then all the worker machines have to query pinot controller simultaneously in order to get the historical data, which could bring huge amount of queries to controller. That's why I didn't bring it here in this PR. We can always add it to `IntermediateSegment` in the future, since the structure between `IntermediateSegment` and `MutableSegment` are pretty much the same.
   
   All 3 points above are really good features, but it'd be too much to be in a single PR. It'd be good if we can leave room for those features and pick them up in the following PRs if applicable. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io commented on pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#issuecomment-804512971


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6479?src=pr&el=h1) Report
   > Merging [#6479](https://codecov.io/gh/apache/incubator-pinot/pull/6479?src=pr&el=desc) (3ca9e3f) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `23.08%`.
   > The diff coverage is `42.85%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6479/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6479?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #6479       +/-   ##
   ===========================================
   - Coverage   66.44%   43.36%   -23.09%     
   ===========================================
     Files        1075     1396      +321     
     Lines       54773    67759    +12986     
     Branches     8168     9807     +1639     
   ===========================================
   - Hits        36396    29384     -7012     
   - Misses      15700    35912    +20212     
   + Partials     2677     2463      -214     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `43.36% <42.85%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6479?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6479/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...t/broker/broker/BasicAuthAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6479/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0Jhc2ljQXV0aEFjY2Vzc0NvbnRyb2xGYWN0b3J5LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6479/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...org/apache/pinot/broker/queryquota/HitCounter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6479/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9IaXRDb3VudGVyLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...che/pinot/broker/queryquota/MaxHitRateTracker.java](https://codecov.io/gh/apache/incubator-pinot/pull/6479/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9NYXhIaXRSYXRlVHJhY2tlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ache/pinot/broker/queryquota/QueryQuotaEntity.java](https://codecov.io/gh/apache/incubator-pinot/pull/6479/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9RdWVyeVF1b3RhRW50aXR5LmphdmE=) | `0.00% <0.00%> (-50.00%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6479/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ceselector/StrictReplicaGroupInstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6479/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL1N0cmljdFJlcGxpY2FHcm91cEluc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...roker/routing/segmentpruner/TimeSegmentPruner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6479/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL1RpbWVTZWdtZW50UHJ1bmVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...roker/routing/segmentpruner/interval/Interval.java](https://codecov.io/gh/apache/incubator-pinot/pull/6479/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL2ludGVydmFsL0ludGVydmFsLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | ... and [1422 more](https://codecov.io/gh/apache/incubator-pinot/pull/6479/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6479?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6479?src=pr&el=footer). Last update [36d6db6...3ca9e3f](https://codecov.io/gh/apache/incubator-pinot/pull/6479?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mayankshriv commented on pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
mayankshriv commented on pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#issuecomment-770995816


   Thanks for the details @jackjlli, could you also is IntermediateSegment better than existing MutableSegment? For example, you could stream input data to MutableSegment and flush it as needed. This also solves multiple problems:
   - Common code base for offline and RT segment generation (at least for the streaming part).
   - Sorting can now be done for offline within SegmentGeneration, instead of having users to explicitly do so.
   - Auto segment sizing that happens in RT will can also be done with offline now.
   
   Thoughts @jackjlli @Jackie-Jiang?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on a change in pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli commented on a change in pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#discussion_r599151919



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableColumnStatistics.java
##########
@@ -33,15 +33,15 @@
  *
  * TODO: Gather more info on the fly to avoid scanning the segment
  */
-public class RealtimeColumnStatistics implements ColumnStatistics {
+public class MutableColumnStatistics implements ColumnStatistics {

Review comment:
       The implementation of `ColumnStatistics` is used during segment creation. As long as there is no protocol change, I don't think renaming will cause incompatibility during the deployment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#discussion_r595696885



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
##########
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
+import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteSVMutableForwardIndex;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.column.IntermediateIndexContainer;
+import org.apache.pinot.core.segment.index.column.NumValuesInfo;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Intermediate segment format to store the collected data so far. This segment format will be used to generate the final
+ * offline segment in SegmentIndexCreationDriver.
+ */
+public class IntermediateSegment implements MutableSegment {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegment.class);
+
+  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
+  private static final int DEFAULT_EST_CARDINALITY = 5000;
+
+  private final SegmentGeneratorConfig _segmentGeneratorConfig;
+  private final Schema _schema;
+  private final TableConfig _tableConfig;
+  private final String _segmentName;
+  private final PartitionFunction _partitionFunction;
+  private final String _partitionColumn;
+  private final Map<String, IntermediateIndexContainer> _indexContainerMap = new HashMap<>();
+  private final PinotDataBufferMemoryManager _memoryManager;
+
+  private final int _capacity = DEFAULT_CAPACITY;
+  private volatile int _numDocsIndexed = 0;
+
+  public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
+    _segmentGeneratorConfig = segmentGeneratorConfig;
+    _schema = segmentGeneratorConfig.getSchema();
+    _tableConfig = segmentGeneratorConfig.getTableConfig();
+    _segmentName = _segmentGeneratorConfig.getSegmentName();
+
+    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
+    List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
+    physicalFieldSpecs.addAll(allFieldSpecs);
+    Collection<FieldSpec> physicalFieldSpecs1 = Collections.unmodifiableCollection(physicalFieldSpecs);
+
+    SegmentPartitionConfig segmentPartitionConfig = segmentGeneratorConfig.getSegmentPartitionConfig();
+    if (segmentPartitionConfig != null) {
+      Map<String, ColumnPartitionConfig> segmentPartitionConfigColumnPartitionMap =
+          segmentPartitionConfig.getColumnPartitionMap();
+      _partitionColumn = segmentPartitionConfigColumnPartitionMap.keySet().iterator().next();
+      _partitionFunction = PartitionFunctionFactory
+          .getPartitionFunction(segmentPartitionConfig.getFunctionName(_partitionColumn),
+              segmentPartitionConfig.getNumPartitions(_partitionColumn));
+    } else {
+      _partitionColumn = null;
+      _partitionFunction = null;
+    }
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    boolean offHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();

Review comment:
       `IntermediateSegment` should not use the config from real-time table config. You can just pick to use MMAP.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
##########
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
+import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteSVMutableForwardIndex;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.column.IntermediateIndexContainer;
+import org.apache.pinot.core.segment.index.column.NumValuesInfo;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Intermediate segment format to store the collected data so far. This segment format will be used to generate the final
+ * offline segment in SegmentIndexCreationDriver.
+ */
+public class IntermediateSegment implements MutableSegment {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegment.class);
+
+  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
+  private static final int DEFAULT_EST_CARDINALITY = 5000;
+
+  private final SegmentGeneratorConfig _segmentGeneratorConfig;
+  private final Schema _schema;
+  private final TableConfig _tableConfig;
+  private final String _segmentName;
+  private final PartitionFunction _partitionFunction;
+  private final String _partitionColumn;
+  private final Map<String, IntermediateIndexContainer> _indexContainerMap = new HashMap<>();
+  private final PinotDataBufferMemoryManager _memoryManager;
+
+  private final int _capacity = DEFAULT_CAPACITY;
+  private volatile int _numDocsIndexed = 0;
+
+  public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
+    _segmentGeneratorConfig = segmentGeneratorConfig;
+    _schema = segmentGeneratorConfig.getSchema();
+    _tableConfig = segmentGeneratorConfig.getTableConfig();
+    _segmentName = _segmentGeneratorConfig.getSegmentName();
+
+    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
+    List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
+    physicalFieldSpecs.addAll(allFieldSpecs);
+    Collection<FieldSpec> physicalFieldSpecs1 = Collections.unmodifiableCollection(physicalFieldSpecs);
+
+    SegmentPartitionConfig segmentPartitionConfig = segmentGeneratorConfig.getSegmentPartitionConfig();
+    if (segmentPartitionConfig != null) {
+      Map<String, ColumnPartitionConfig> segmentPartitionConfigColumnPartitionMap =
+          segmentPartitionConfig.getColumnPartitionMap();
+      _partitionColumn = segmentPartitionConfigColumnPartitionMap.keySet().iterator().next();
+      _partitionFunction = PartitionFunctionFactory
+          .getPartitionFunction(segmentPartitionConfig.getFunctionName(_partitionColumn),
+              segmentPartitionConfig.getNumPartitions(_partitionColumn));
+    } else {
+      _partitionColumn = null;
+      _partitionFunction = null;
+    }
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    boolean offHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
+    boolean directOffHeap = indexLoadingConfig.isDirectRealtimeOffHeapAllocation();
+    if (offHeap && !directOffHeap) {
+      _memoryManager = new MmapMemoryManager(null, _segmentName, null);
+    } else {
+      _memoryManager = new DirectMemoryManager(_segmentName, null);
+    }
+
+    // Initialize for each column
+    for (FieldSpec fieldSpec : physicalFieldSpecs1) {
+      String column = fieldSpec.getName();
+
+      // Partition info
+      PartitionFunction partitionFunction = null;
+      Set<Integer> partitions = null;
+      if (column.equals(_partitionColumn)) {
+        partitionFunction = _partitionFunction;
+        partitions = new HashSet<>();
+        partitions.add(segmentGeneratorConfig.getSequenceId());
+      }
+
+      FieldSpec.DataType dataType = fieldSpec.getDataType();
+      boolean isFixedWidthColumn = dataType.isFixedWidth();
+      MutableForwardIndex forwardIndex;
+      MutableDictionary dictionary;
+
+      int dictionaryColumnSize;
+      if (isFixedWidthColumn) {
+        dictionaryColumnSize = dataType.size();
+      } else {
+        dictionaryColumnSize = DEFAULT_EST_AVG_COL_SIZE;
+      }
+      // NOTE: preserve 10% buffer for cardinality to reduce the chance of re-sizing the dictionary
+      int estimatedCardinality = (int) (DEFAULT_EST_CARDINALITY * 1.1);
+      String dictionaryAllocationContext =
+          buildAllocationContext(_segmentName, column, V1Constants.Dict.FILE_EXTENSION);
+      dictionary = MutableDictionaryFactory
+          .getMutableDictionary(dataType, offHeap, _memoryManager, dictionaryColumnSize,
+              Math.min(estimatedCardinality, _capacity), dictionaryAllocationContext);
+
+      if (fieldSpec.isSingleValueField()) {
+        // Single-value dictionary-encoded forward index
+        String allocationContext =
+            buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
+        forwardIndex = new FixedByteSVMutableForwardIndex(true, FieldSpec.DataType.INT, _capacity, _memoryManager,
+            allocationContext);
+      } else {
+        // Multi-value dictionary-encoded forward index
+        String allocationContext =
+            buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+        // TODO: Start with a smaller capacity on FixedByteMVForwardIndexReaderWriter and let it expand
+        forwardIndex = new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW,
+            indexLoadingConfig.getRealtimeAvgMultiValueCount(), _capacity, Integer.BYTES, _memoryManager,

Review comment:
       Same here. We should not depend on the realtime table config

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
##########
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
+import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteSVMutableForwardIndex;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.column.IntermediateIndexContainer;
+import org.apache.pinot.core.segment.index.column.NumValuesInfo;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Intermediate segment format to store the collected data so far. This segment format will be used to generate the final
+ * offline segment in SegmentIndexCreationDriver.
+ */
+public class IntermediateSegment implements MutableSegment {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegment.class);
+
+  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
+  private static final int DEFAULT_EST_CARDINALITY = 5000;
+
+  private final SegmentGeneratorConfig _segmentGeneratorConfig;
+  private final Schema _schema;
+  private final TableConfig _tableConfig;
+  private final String _segmentName;
+  private final PartitionFunction _partitionFunction;
+  private final String _partitionColumn;
+  private final Map<String, IntermediateIndexContainer> _indexContainerMap = new HashMap<>();
+  private final PinotDataBufferMemoryManager _memoryManager;
+
+  private final int _capacity = DEFAULT_CAPACITY;
+  private volatile int _numDocsIndexed = 0;
+
+  public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
+    _segmentGeneratorConfig = segmentGeneratorConfig;
+    _schema = segmentGeneratorConfig.getSchema();
+    _tableConfig = segmentGeneratorConfig.getTableConfig();
+    _segmentName = _segmentGeneratorConfig.getSegmentName();
+
+    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();

Review comment:
       What is the purpose of L92-95? It does the multiple conversions `Collection<FieldSpec> -> List<FieldSpec> -> Collection<FieldSpec>`. Why we need to make this unmodifiable?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
##########
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
+import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteSVMutableForwardIndex;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.column.IntermediateIndexContainer;
+import org.apache.pinot.core.segment.index.column.NumValuesInfo;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Intermediate segment format to store the collected data so far. This segment format will be used to generate the final
+ * offline segment in SegmentIndexCreationDriver.
+ */
+public class IntermediateSegment implements MutableSegment {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegment.class);
+
+  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
+  private static final int DEFAULT_EST_CARDINALITY = 5000;
+
+  private final SegmentGeneratorConfig _segmentGeneratorConfig;
+  private final Schema _schema;
+  private final TableConfig _tableConfig;
+  private final String _segmentName;
+  private final PartitionFunction _partitionFunction;
+  private final String _partitionColumn;
+  private final Map<String, IntermediateIndexContainer> _indexContainerMap = new HashMap<>();
+  private final PinotDataBufferMemoryManager _memoryManager;
+
+  private final int _capacity = DEFAULT_CAPACITY;
+  private volatile int _numDocsIndexed = 0;
+
+  public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
+    _segmentGeneratorConfig = segmentGeneratorConfig;
+    _schema = segmentGeneratorConfig.getSchema();
+    _tableConfig = segmentGeneratorConfig.getTableConfig();
+    _segmentName = _segmentGeneratorConfig.getSegmentName();
+
+    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
+    List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
+    physicalFieldSpecs.addAll(allFieldSpecs);
+    Collection<FieldSpec> physicalFieldSpecs1 = Collections.unmodifiableCollection(physicalFieldSpecs);

Review comment:
       Let's rename this variable `physicalFieldSpecs` -> `unmodifiablePhysicalFieldSpecs`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli commented on pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#issuecomment-767180093


   > I think this is still two pass, right? The second pass is on IntermediateSegment.
   > Could you elaborate a bit on where's the memory pressure coming from in the existing implementation? And how does this approach solve it (IntermediateSegment) does need its own storage.
   
   This is still two passes, but only one pass on the raw data.
   E.g. in spark job where the data type is on row basis (like Dataset\<Row\>, RDD\<Row\>, Dataset\<T\>, RDD\<T\>), in order to  traverse the record, the API forEachPartition(iterator) is required to be called. While iterator can only provide the data once, thus the raw data has to be cached in the same executor in order to traverse it two times, if we stick on to the existing segment generation code.
   
   The below lists out the possible solutions:
   1) cache all the raw data into executor memory in order to reuse the same data (using existing code of segment generation)
   2) traverse the raw data once and gathers the intermediate stats, and create offline segment based on the intermediate results (current PR)
   3) call forEachPartition(iterator) two times; gathering stats to an extra DF/RDD in the 1st iteration, and ingesting actual raw data based on the former DF/RDD in the 2nd round (code refactor still needed, and one extra DF/RDD involved)
   
   The Approach 1) needs to load all the raw data into the singe executor before initializing the `SegmentIndexCreationDriver`( raw data traversal is encapsulated in that class). The Approach 2) and 3) basically have the same idea that spilt the current code into two parts, while the difference is that 2) works on a stats collector and 3) works on the raw data (one extra data traversal).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli commented on pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#issuecomment-767180093


   > I think this is still two pass, right? The second pass is on IntermediateSegment.
   > Could you elaborate a bit on where's the memory pressure coming from in the existing implementation? And how does this approach solve it (IntermediateSegment) does need its own storage.
   
   This is still two passes, but only one pass on the raw data.
   E.g. in spark job where the data type is on row basis (like Dataset\<Row\>, RDD\<Row\>, Dataset\<T\>, RDD\<T\>), in order to  traverse the record, the API forEachPartition(iterator) is required to be called. While iterator can only provide the data once, thus the raw data has to be cached in the same executor in order to traverse it two times, if we stick on to the existing segment generation code.
   
   The below lists out the possible solutions:
   1) cache all the raw data into executor memory in order to reuse the same data (using existing code of segment generation)
   2) traverse the raw data once and gathers the intermediate stats, and create offline segment based on the intermediate results (current PR)
   3) call forEachPartition(iterator) two times; gathering stats to an extra DF/RDD in the 1st iteration, and ingesting actual raw data based on the former DF/RDD in the 2nd round (code refactor still needed, and one extra DF/RDD involved)
   
   The Approach 1) needs to load all the raw data into the singe executor before initializing the `SegmentIndexCreationDriver`( raw data traversal is encapsulated in that class). The Approach 2) and 3) basically have the same idea that spilt the current code into two parts, while the difference is that 2) works on a stats collector and 3) works on the raw data (one extra data traversal).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli edited a comment on pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli edited a comment on pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#issuecomment-767180093


   > I think this is still two pass, right? The second pass is on IntermediateSegment.
   > Could you elaborate a bit on where's the memory pressure coming from in the existing implementation? And how does this approach solve it (IntermediateSegment) does need its own storage.
   
   This is still two passes, but only one pass on the raw data.
   E.g. in spark job where the data type is on row basis (like Dataset\<Row\>, RDD\<Row\>, Dataset\<T\>, RDD\<T\>), in order to  traverse the record, the API forEachPartition(iterator) is required to be called. While iterator can only provide the data once, thus the raw data has to be cached in the same executor in order to traverse it two times, if we stick on to the existing segment generation code.
   
   The below lists out the possible solutions:
   1) cache all the raw data into executor memory in order to reuse the same data (using existing code of segment generation)
   2) traverse the raw data once and gathers the intermediate stats, and create offline segment based on the intermediate results (current PR)
   3) call forEachPartition(iterator) two times; gathering stats to an extra DF/RDD in the 1st iteration, and ingesting actual raw data based on the former DF/RDD in the 2nd round (code refactor still needed, and one extra DF/RDD involved)
   
   The Approach 1) needs to load all the raw data into the singe executor before initializing the `SegmentIndexCreationDriver`( raw data traversal is encapsulated in that class). The Approach 2) and 3) basically have the same idea that spilt the current code into two parts, while the difference is that 2) works on a stats collector and 3) works on the raw data (one extra data traversal). 
   The Approach 2) works well if the cardinality of columns is not very high (just like mutable segment for realtime segments), while it could consume the same memory resource if cardinality is high. 
   The approach 3) works well if data is cached in the same executor, while the cached raw data itself can still be costly since the deserialized data is stored in the memory.
   
   One optimization for this PR is to reused the dictionaries and forwarded indices stored in intermediateSegment, and pass it to immutable segment.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on a change in pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli commented on a change in pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#discussion_r597401003



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
##########
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
+import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteSVMutableForwardIndex;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.column.IntermediateIndexContainer;
+import org.apache.pinot.core.segment.index.column.NumValuesInfo;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Intermediate segment format to store the collected data so far. This segment format will be used to generate the final
+ * offline segment in SegmentIndexCreationDriver.
+ */
+public class IntermediateSegment implements MutableSegment {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegment.class);
+
+  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
+  private static final int DEFAULT_EST_CARDINALITY = 5000;
+
+  private final SegmentGeneratorConfig _segmentGeneratorConfig;
+  private final Schema _schema;
+  private final TableConfig _tableConfig;
+  private final String _segmentName;
+  private final PartitionFunction _partitionFunction;
+  private final String _partitionColumn;
+  private final Map<String, IntermediateIndexContainer> _indexContainerMap = new HashMap<>();
+  private final PinotDataBufferMemoryManager _memoryManager;
+
+  private final int _capacity = DEFAULT_CAPACITY;
+  private volatile int _numDocsIndexed = 0;
+
+  public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
+    _segmentGeneratorConfig = segmentGeneratorConfig;
+    _schema = segmentGeneratorConfig.getSchema();
+    _tableConfig = segmentGeneratorConfig.getTableConfig();
+    _segmentName = _segmentGeneratorConfig.getSegmentName();
+
+    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();

Review comment:
       Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#discussion_r599121158



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.readers;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+public class IntermediateSegmentRecordReader implements RecordReader {

Review comment:
       Do we plan to support the `sortedColumn` in the record reader? 
   
   For instance, realtime segments can sort the data by itself without depending on the input's sorting order.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentSegmentCreationDataSource.java
##########
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator;
+
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.core.data.readers.IntermediateSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class IntermediateSegmentSegmentCreationDataSource implements SegmentCreationDataSource {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegmentSegmentCreationDataSource.class);
+  private final IntermediateSegment _intermediateSegment;
+  private final IntermediateSegmentRecordReader _intermediateSegmentRecordReader;
+
+
+  public IntermediateSegmentSegmentCreationDataSource(IntermediateSegmentRecordReader intermediateSegmentRecordReader) {
+    _intermediateSegmentRecordReader = intermediateSegmentRecordReader;
+    _intermediateSegment = _intermediateSegmentRecordReader.getIntermediateSegment();
+  }
+
+  @Override
+  public SegmentPreIndexStatsContainer gatherStats(StatsCollectorConfig statsCollectorConfig) {
+

Review comment:
       remove line

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
##########
@@ -679,4 +680,12 @@ public boolean isFailOnEmptySegment() {
   public void setFailOnEmptySegment(boolean failOnEmptySegment) {
     _failOnEmptySegment = failOnEmptySegment;
   }
+
+  public boolean isIntermediateSegmentRecordReader() {

Review comment:
       Why do we need this new config to `SegmentGeneratorConfig`? We can simply pass `IntermediateSegmentRecordReader` when we need to use the intermediate segment based record reader?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableColumnStatistics.java
##########
@@ -33,15 +33,15 @@
  *
  * TODO: Gather more info on the fly to avoid scanning the segment
  */
-public class RealtimeColumnStatistics implements ColumnStatistics {
+public class MutableColumnStatistics implements ColumnStatistics {

Review comment:
       Do we ever read this from the controller? Can you confirm that this is only used by the server?
   
   If this is used by multiple components, renaming may cause incompatibility during the deployement.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableNoDictionaryColStatistics.java
##########
@@ -28,11 +28,11 @@
 import static org.apache.pinot.core.common.Constants.UNKNOWN_CARDINALITY;
 
 
-public class RealtimeNoDictionaryColStatistics implements ColumnStatistics {
+public class MutableNoDictionaryColStatistics implements ColumnStatistics {

Review comment:
       Do we ever read this from the controller? Can you confirm that this is only used by the server?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on a change in pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli commented on a change in pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#discussion_r599155234



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
##########
@@ -679,4 +680,12 @@ public boolean isFailOnEmptySegment() {
   public void setFailOnEmptySegment(boolean failOnEmptySegment) {
     _failOnEmptySegment = failOnEmptySegment;
   }
+
+  public boolean isIntermediateSegmentRecordReader() {

Review comment:
       Good point. Removed this config.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#discussion_r599155534



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java
##########
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.readers;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+public class IntermediateSegmentRecordReader implements RecordReader {

Review comment:
       Sure, let's address it when we unify all the offline ingestion path to use intermediate segment 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on a change in pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli commented on a change in pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#discussion_r597401088



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
##########
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
+import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteSVMutableForwardIndex;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.column.IntermediateIndexContainer;
+import org.apache.pinot.core.segment.index.column.NumValuesInfo;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Intermediate segment format to store the collected data so far. This segment format will be used to generate the final
+ * offline segment in SegmentIndexCreationDriver.
+ */
+public class IntermediateSegment implements MutableSegment {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegment.class);
+
+  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
+  private static final int DEFAULT_EST_CARDINALITY = 5000;
+
+  private final SegmentGeneratorConfig _segmentGeneratorConfig;
+  private final Schema _schema;
+  private final TableConfig _tableConfig;
+  private final String _segmentName;
+  private final PartitionFunction _partitionFunction;
+  private final String _partitionColumn;
+  private final Map<String, IntermediateIndexContainer> _indexContainerMap = new HashMap<>();
+  private final PinotDataBufferMemoryManager _memoryManager;
+
+  private final int _capacity = DEFAULT_CAPACITY;
+  private volatile int _numDocsIndexed = 0;
+
+  public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
+    _segmentGeneratorConfig = segmentGeneratorConfig;
+    _schema = segmentGeneratorConfig.getSchema();
+    _tableConfig = segmentGeneratorConfig.getTableConfig();
+    _segmentName = _segmentGeneratorConfig.getSegmentName();
+
+    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
+    List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
+    physicalFieldSpecs.addAll(allFieldSpecs);
+    Collection<FieldSpec> physicalFieldSpecs1 = Collections.unmodifiableCollection(physicalFieldSpecs);

Review comment:
       We don't need two copies here. Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli merged pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli merged pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] jackjlli commented on a change in pull request #6479: Support data ingestion for generating offline segment in one pass

Posted by GitBox <gi...@apache.org>.
jackjlli commented on a change in pull request #6479:
URL: https://github.com/apache/incubator-pinot/pull/6479#discussion_r597400849



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
##########
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
+import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteSVMutableForwardIndex;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.column.IntermediateIndexContainer;
+import org.apache.pinot.core.segment.index.column.NumValuesInfo;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Intermediate segment format to store the collected data so far. This segment format will be used to generate the final
+ * offline segment in SegmentIndexCreationDriver.
+ */
+public class IntermediateSegment implements MutableSegment {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegment.class);
+
+  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
+  private static final int DEFAULT_EST_CARDINALITY = 5000;
+
+  private final SegmentGeneratorConfig _segmentGeneratorConfig;
+  private final Schema _schema;
+  private final TableConfig _tableConfig;
+  private final String _segmentName;
+  private final PartitionFunction _partitionFunction;
+  private final String _partitionColumn;
+  private final Map<String, IntermediateIndexContainer> _indexContainerMap = new HashMap<>();
+  private final PinotDataBufferMemoryManager _memoryManager;
+
+  private final int _capacity = DEFAULT_CAPACITY;
+  private volatile int _numDocsIndexed = 0;
+
+  public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
+    _segmentGeneratorConfig = segmentGeneratorConfig;
+    _schema = segmentGeneratorConfig.getSchema();
+    _tableConfig = segmentGeneratorConfig.getTableConfig();
+    _segmentName = _segmentGeneratorConfig.getSegmentName();
+
+    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
+    List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
+    physicalFieldSpecs.addAll(allFieldSpecs);
+    Collection<FieldSpec> physicalFieldSpecs1 = Collections.unmodifiableCollection(physicalFieldSpecs);
+
+    SegmentPartitionConfig segmentPartitionConfig = segmentGeneratorConfig.getSegmentPartitionConfig();
+    if (segmentPartitionConfig != null) {
+      Map<String, ColumnPartitionConfig> segmentPartitionConfigColumnPartitionMap =
+          segmentPartitionConfig.getColumnPartitionMap();
+      _partitionColumn = segmentPartitionConfigColumnPartitionMap.keySet().iterator().next();
+      _partitionFunction = PartitionFunctionFactory
+          .getPartitionFunction(segmentPartitionConfig.getFunctionName(_partitionColumn),
+              segmentPartitionConfig.getNumPartitions(_partitionColumn));
+    } else {
+      _partitionColumn = null;
+      _partitionFunction = null;
+    }
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    boolean offHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();

Review comment:
       Updated to straightly use offheap

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
##########
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
+import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteSVMutableForwardIndex;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.column.IntermediateIndexContainer;
+import org.apache.pinot.core.segment.index.column.NumValuesInfo;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Intermediate segment format to store the collected data so far. This segment format will be used to generate the final
+ * offline segment in SegmentIndexCreationDriver.
+ */
+public class IntermediateSegment implements MutableSegment {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegment.class);
+
+  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
+  private static final int DEFAULT_EST_CARDINALITY = 5000;
+
+  private final SegmentGeneratorConfig _segmentGeneratorConfig;
+  private final Schema _schema;
+  private final TableConfig _tableConfig;
+  private final String _segmentName;
+  private final PartitionFunction _partitionFunction;
+  private final String _partitionColumn;
+  private final Map<String, IntermediateIndexContainer> _indexContainerMap = new HashMap<>();
+  private final PinotDataBufferMemoryManager _memoryManager;
+
+  private final int _capacity = DEFAULT_CAPACITY;
+  private volatile int _numDocsIndexed = 0;
+
+  public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
+    _segmentGeneratorConfig = segmentGeneratorConfig;
+    _schema = segmentGeneratorConfig.getSchema();
+    _tableConfig = segmentGeneratorConfig.getTableConfig();
+    _segmentName = _segmentGeneratorConfig.getSegmentName();
+
+    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
+    List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
+    physicalFieldSpecs.addAll(allFieldSpecs);
+    Collection<FieldSpec> physicalFieldSpecs1 = Collections.unmodifiableCollection(physicalFieldSpecs);
+
+    SegmentPartitionConfig segmentPartitionConfig = segmentGeneratorConfig.getSegmentPartitionConfig();
+    if (segmentPartitionConfig != null) {
+      Map<String, ColumnPartitionConfig> segmentPartitionConfigColumnPartitionMap =
+          segmentPartitionConfig.getColumnPartitionMap();
+      _partitionColumn = segmentPartitionConfigColumnPartitionMap.keySet().iterator().next();
+      _partitionFunction = PartitionFunctionFactory
+          .getPartitionFunction(segmentPartitionConfig.getFunctionName(_partitionColumn),
+              segmentPartitionConfig.getNumPartitions(_partitionColumn));
+    } else {
+      _partitionColumn = null;
+      _partitionFunction = null;
+    }
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    boolean offHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
+    boolean directOffHeap = indexLoadingConfig.isDirectRealtimeOffHeapAllocation();
+    if (offHeap && !directOffHeap) {
+      _memoryManager = new MmapMemoryManager(null, _segmentName, null);
+    } else {
+      _memoryManager = new DirectMemoryManager(_segmentName, null);
+    }
+
+    // Initialize for each column
+    for (FieldSpec fieldSpec : physicalFieldSpecs1) {
+      String column = fieldSpec.getName();
+
+      // Partition info
+      PartitionFunction partitionFunction = null;
+      Set<Integer> partitions = null;
+      if (column.equals(_partitionColumn)) {
+        partitionFunction = _partitionFunction;
+        partitions = new HashSet<>();
+        partitions.add(segmentGeneratorConfig.getSequenceId());
+      }
+
+      FieldSpec.DataType dataType = fieldSpec.getDataType();
+      boolean isFixedWidthColumn = dataType.isFixedWidth();
+      MutableForwardIndex forwardIndex;
+      MutableDictionary dictionary;
+
+      int dictionaryColumnSize;
+      if (isFixedWidthColumn) {
+        dictionaryColumnSize = dataType.size();
+      } else {
+        dictionaryColumnSize = DEFAULT_EST_AVG_COL_SIZE;
+      }
+      // NOTE: preserve 10% buffer for cardinality to reduce the chance of re-sizing the dictionary
+      int estimatedCardinality = (int) (DEFAULT_EST_CARDINALITY * 1.1);
+      String dictionaryAllocationContext =
+          buildAllocationContext(_segmentName, column, V1Constants.Dict.FILE_EXTENSION);
+      dictionary = MutableDictionaryFactory
+          .getMutableDictionary(dataType, offHeap, _memoryManager, dictionaryColumnSize,
+              Math.min(estimatedCardinality, _capacity), dictionaryAllocationContext);
+
+      if (fieldSpec.isSingleValueField()) {
+        // Single-value dictionary-encoded forward index
+        String allocationContext =
+            buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
+        forwardIndex = new FixedByteSVMutableForwardIndex(true, FieldSpec.DataType.INT, _capacity, _memoryManager,
+            allocationContext);
+      } else {
+        // Multi-value dictionary-encoded forward index
+        String allocationContext =
+            buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+        // TODO: Start with a smaller capacity on FixedByteMVForwardIndexReaderWriter and let it expand
+        forwardIndex = new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW,
+            indexLoadingConfig.getRealtimeAvgMultiValueCount(), _capacity, Integer.BYTES, _memoryManager,

Review comment:
       Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org