You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/12/07 19:59:50 UTC
[pinot] branch master updated: load startree index via segment reader interface (#9828)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d39b844c9c load startree index via segment reader interface (#9828)
d39b844c9c is described below
commit d39b844c9c305b87596359ea7df0372f73673625
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Dec 7 11:59:42 2022 -0800
load startree index via segment reader interface (#9828)
---
.../immutable/ImmutableSegmentLoader.java | 7 ++---
.../local/segment/store/FilePerIndexDirectory.java | 33 +++++++++++++++++++-
.../segment/store/SegmentLocalFSDirectory.java | 13 ++++++++
.../segment/store/SingleFileIndexDirectory.java | 36 ++++++++++++++++++++++
.../startree/v2/store/StarTreeIndexContainer.java | 32 ++++++-------------
.../startree/v2/store/StarTreeIndexMapUtils.java | 9 +++---
.../store/SingleFileIndexDirectoryTest.java | 13 ++++++++
.../segment/spi/store/ColumnIndexDirectory.java | 14 +++++++++
.../pinot/segment/spi/store/SegmentDirectory.java | 14 +++++++++
.../airlineStats_offline_table_config.json | 30 ++++++++++++++++++
10 files changed, 167 insertions(+), 34 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index ee9ef452c8..7764d2ba25 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -197,13 +197,10 @@ public class ImmutableSegmentLoader {
}
}
- // FIXME: star tree only works with local SegmentDirectory
// Load star-tree index if it exists
StarTreeIndexContainer starTreeIndexContainer = null;
- if (segmentMetadata.getStarTreeV2MetadataList() != null && localIndexDir != null) {
- starTreeIndexContainer =
- new StarTreeIndexContainer(SegmentDirectoryPaths.findSegmentDirectory(localIndexDir), segmentMetadata,
- indexContainerMap, indexLoadingConfig.getReadMode());
+ if (segmentMetadata.getStarTreeV2MetadataList() != null) {
+ starTreeIndexContainer = new StarTreeIndexContainer(segmentReader, segmentMetadata, indexContainerMap);
}
ImmutableSegmentImpl segment =
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
index 15b02d6d2f..a85e5b629c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
@@ -21,7 +21,9 @@ package org.apache.pinot.segment.local.segment.store;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteOrder;
import java.util.HashMap;
import java.util.HashSet;
@@ -31,6 +33,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -42,7 +45,9 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
private SegmentMetadataImpl _segmentMetadata;
private final ReadMode _readMode;
private final Map<IndexKey, PinotDataBuffer> _indexBuffers = new HashMap<>();
-
+ // Different from the other column-index entries, starTree index is multi-column index and has its own index map,
+ // thus manage it separately.
+ private PinotDataBuffer _starTreeIndexDataBuffer;
/**
* @param segmentDirectory File pointing to segment directory
* @param segmentMetadata segment metadata. Metadata must be fully initialized
@@ -75,6 +80,29 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
return getReadBufferFor(key);
}
+ @Override
+ public PinotDataBuffer getStarTreeIndex()
+ throws IOException {
+ if (_starTreeIndexDataBuffer != null) {
+ return _starTreeIndexDataBuffer;
+ }
+ File indexFile = new File(_segmentDirectory, StarTreeV2Constants.INDEX_FILE_NAME);
+ if (_readMode == ReadMode.heap) {
+ _starTreeIndexDataBuffer =
+ PinotDataBuffer.loadFile(indexFile, 0, indexFile.length(), ByteOrder.BIG_ENDIAN, "Star-tree V2 data buffer");
+ } else {
+ _starTreeIndexDataBuffer = PinotDataBuffer.mapFile(indexFile, true, 0, indexFile.length(), ByteOrder.BIG_ENDIAN,
+ "Star-tree V2 data buffer");
+ }
+ return _starTreeIndexDataBuffer;
+ }
+
+ @Override
+ public InputStream getStarTreeIndexMap()
+ throws IOException {
+ return new FileInputStream(new File(_segmentDirectory, StarTreeV2Constants.INDEX_MAP_FILE_NAME));
+ }
+
@Override
public PinotDataBuffer newBuffer(String column, ColumnIndexType type, long sizeBytes)
throws IOException {
@@ -94,6 +122,9 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
for (PinotDataBuffer dataBuffer : _indexBuffers.values()) {
dataBuffer.close();
}
+ if (_starTreeIndexDataBuffer != null) {
+ _starTreeIndexDataBuffer.close();
+ }
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
index 42f6cc4cc0..4edfdd2b70 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.util.Collections;
@@ -358,6 +359,18 @@ public class SegmentLocalFSDirectory extends SegmentDirectory {
public String toString() {
return _segmentDirectory.toString();
}
+
+ @Override
+ public PinotDataBuffer getStarTreeIndex()
+ throws IOException {
+ return _columnIndexDirectory.getStarTreeIndex();
+ }
+
+ @Override
+ public InputStream getStarTreeIndexMap()
+ throws IOException {
+ return _columnIndexDirectory.getStarTreeIndexMap();
+ }
}
/*************************** SegmentDirectory Writer *********************/
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
index 8b0faacb9c..f9824dd5a2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
@@ -22,8 +22,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.InputStream;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.nio.ByteOrder;
@@ -41,6 +43,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -84,6 +87,9 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
private final File _indexFile;
private final Map<IndexKey, IndexEntry> _columnEntries;
private final List<PinotDataBuffer> _allocBuffers;
+ // Different from the other column-index entries, starTree index is multi-column index and has its own index map,
+ // thus manage it separately.
+ private PinotDataBuffer _starTreeIndexDataBuffer;
// For V3 segment format, the index cleanup consists of two steps: mark and sweep.
// The removeIndex() method marks an index to be removed; and the index info is
@@ -207,6 +213,21 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
throws IOException, ConfigurationException {
loadMap();
mapBufferEntries();
+ if (_segmentMetadata.getStarTreeV2MetadataList() != null) {
+ loadStarTreeIndex();
+ }
+ }
+
+ private void loadStarTreeIndex()
+ throws IOException {
+ File indexFile = new File(_segmentDirectory, StarTreeV2Constants.INDEX_FILE_NAME);
+ if (_readMode == ReadMode.heap) {
+ _starTreeIndexDataBuffer =
+ PinotDataBuffer.loadFile(indexFile, 0, indexFile.length(), ByteOrder.BIG_ENDIAN, "Star-tree V2 data buffer");
+ } else {
+ _starTreeIndexDataBuffer = PinotDataBuffer.mapFile(indexFile, true, 0, indexFile.length(), ByteOrder.BIG_ENDIAN,
+ "Star-tree V2 data buffer");
+ }
}
private void loadMap()
@@ -346,6 +367,9 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
for (PinotDataBuffer buf : _allocBuffers) {
buf.close();
}
+ if (_starTreeIndexDataBuffer != null) {
+ _starTreeIndexDataBuffer.close();
+ }
// Cleanup removed indices after closing and flushing buffers, so
// that potential index updates can be persisted across cleanups.
if (_shouldCleanupRemovedIndices) {
@@ -389,6 +413,18 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
return columns;
}
+ @Override
+ public PinotDataBuffer getStarTreeIndex()
+ throws IOException {
+ return _starTreeIndexDataBuffer;
+ }
+
+ @Override
+ public InputStream getStarTreeIndexMap()
+ throws IOException {
+ return new FileInputStream(new File(_segmentDirectory, StarTreeV2Constants.INDEX_MAP_FILE_NAME));
+ }
+
@Override
public String toString() {
return _segmentDirectory.toString() + "/" + _indexFile.toString();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexContainer.java
index d1fe668c7e..3a7220c953 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexContainer.java
@@ -19,44 +19,30 @@
package org.apache.pinot.segment.local.startree.v2.store;
import java.io.Closeable;
-import java.io.File;
import java.io.IOException;
-import java.nio.ByteOrder;
+import java.io.InputStream;
import java.util.List;
import java.util.Map;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
-import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants;
-import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
-import org.apache.pinot.spi.utils.ReadMode;
-
-import static org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils.IndexKey;
-import static org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils.IndexValue;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
/**
* The {@code StarTreeIndexContainer} class contains the indexes for multiple star-trees.
*/
public class StarTreeIndexContainer implements Closeable {
- private final PinotDataBuffer _dataBuffer;
private final List<StarTreeV2> _starTrees;
- public StarTreeIndexContainer(File segmentDirectory, SegmentMetadataImpl segmentMetadata,
- Map<String, ColumnIndexContainer> indexContainerMap, ReadMode readMode)
+ public StarTreeIndexContainer(SegmentDirectory.Reader segmentReader, SegmentMetadataImpl segmentMetadata,
+ Map<String, ColumnIndexContainer> indexContainerMap)
throws IOException {
- File indexFile = new File(segmentDirectory, StarTreeV2Constants.INDEX_FILE_NAME);
- if (readMode == ReadMode.heap) {
- _dataBuffer = PinotDataBuffer.loadFile(indexFile, 0, indexFile.length(), ByteOrder.LITTLE_ENDIAN,
- "Star-tree V2 data buffer");
- } else {
- _dataBuffer = PinotDataBuffer.mapFile(indexFile, true, 0, indexFile.length(), ByteOrder.LITTLE_ENDIAN,
- "Star-tree V2 data buffer");
+ try (InputStream inputStream = segmentReader.getStarTreeIndexMap()) {
+ _starTrees = StarTreeLoaderUtils.loadStarTreeV2(segmentReader.getStarTreeIndex(),
+ StarTreeIndexMapUtils.loadFromInputStream(inputStream, segmentMetadata.getStarTreeV2MetadataList().size()),
+ segmentMetadata, indexContainerMap);
}
- File indexMapFile = new File(segmentDirectory, StarTreeV2Constants.INDEX_MAP_FILE_NAME);
- List<Map<IndexKey, IndexValue>> indexMapList =
- StarTreeIndexMapUtils.loadFromFile(indexMapFile, segmentMetadata.getStarTreeV2MetadataList().size());
- _starTrees = StarTreeLoaderUtils.loadStarTreeV2(_dataBuffer, indexMapList, segmentMetadata, indexContainerMap);
}
public List<StarTreeV2> getStarTrees() {
@@ -66,6 +52,6 @@ public class StarTreeIndexContainer implements Closeable {
@Override
public void close()
throws IOException {
- _dataBuffer.close();
+ // The startree index buffer is owned by segment reader now.
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexMapUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexMapUtils.java
index da98506dc1..83747def59 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexMapUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexMapUtils.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -166,17 +167,15 @@ public class StarTreeIndexMapUtils {
}
/**
- * Loads the index maps for multiple star-trees from a file.
+ * Loads the index maps for multiple star-trees from an input stream.
*/
- public static List<Map<IndexKey, IndexValue>> loadFromFile(File indexMapFile, int numStarTrees) {
- Preconditions.checkState(indexMapFile.exists(), "Star-tree index map file does not exist");
-
+ public static List<Map<IndexKey, IndexValue>> loadFromInputStream(InputStream indexMapInputStream, int numStarTrees) {
List<Map<IndexKey, IndexValue>> indexMaps = new ArrayList<>(numStarTrees);
for (int i = 0; i < numStarTrees; i++) {
indexMaps.add(new HashMap<>());
}
- PropertiesConfiguration configuration = CommonsConfigurationUtils.fromFile(indexMapFile);
+ PropertiesConfiguration configuration = CommonsConfigurationUtils.fromInputStream(indexMapInputStream);
for (String key : CommonsConfigurationUtils.getKeys(configuration)) {
String[] split = StringUtils.split(key, KEY_SEPARATOR);
int starTreeId = Integer.parseInt(split[0]);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
index f065a191e5..98c32a93b0 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.store;
import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
@@ -54,6 +55,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
public class SingleFileIndexDirectoryTest {
@@ -82,6 +84,7 @@ public class SingleFileIndexDirectoryTest {
void writeMetadata() {
SegmentMetadataImpl meta = Mockito.mock(SegmentMetadataImpl.class);
Mockito.when(meta.getVersion()).thenReturn(SegmentVersion.v3);
+ Mockito.when(meta.getStarTreeV2MetadataList()).thenReturn(null);
_segmentMetadata = meta;
}
@@ -391,4 +394,14 @@ public class SingleFileIndexDirectoryTest {
new HashSet<>(Collections.singletonList("bar")));
}
}
+
+ @Test(expectedExceptions = FileNotFoundException.class, expectedExceptionsMessageRegExp = ".*star_tree_index.*")
+ public void testLoadStarTreeIndex()
+ throws Exception {
+ Mockito.when(_segmentMetadata.getStarTreeV2MetadataList()).thenReturn(Collections.emptyList());
+ try (SingleFileIndexDirectory ignore = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) {
+ // Trying to load startree index but not able to find the file.
+ fail();
+ }
+ }
}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
index ca343dbfa2..eee9e90990 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.spi.store;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Set;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -104,4 +105,17 @@ public abstract class ColumnIndexDirectory implements Closeable {
*/
public void releaseBuffer(FetchContext fetchContext) {
}
+
+ public PinotDataBuffer getStarTreeIndex()
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * The caller should close the input stream.
+ */
+ public InputStream getStarTreeIndexMap()
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
index 889f1b5a4b..7ba897a01d 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.spi.store;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.util.Set;
@@ -185,6 +186,19 @@ public abstract class SegmentDirectory implements Closeable {
}
public abstract String toString();
+
+ public PinotDataBuffer getStarTreeIndex()
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * The caller should close the input stream.
+ */
+ public InputStream getStarTreeIndexMap()
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
/**
diff --git a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
index b761d9437b..0c5c7bfd2d 100644
--- a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
+++ b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json
@@ -24,6 +24,36 @@
}
],
"tableIndexConfig": {
+ "starTreeIndexConfigs": [
+ {
+ "dimensionsSplitOrder": [
+ "AirlineID",
+ "Origin",
+ "Dest"
+ ],
+ "skipStarNodeCreationForDimensions": [],
+ "functionColumnPairs": [
+ "COUNT__*",
+ "MAX__ArrDelay"
+ ],
+ "maxLeafRecords": 10
+ },
+ {
+ "dimensionsSplitOrder": [
+ "Carrier",
+ "CancellationCode",
+ "Origin",
+ "Dest"
+ ],
+ "skipStarNodeCreationForDimensions": [],
+ "functionColumnPairs": [
+ "MAX__CarrierDelay",
+ "AVG__CarrierDelay"
+ ],
+ "maxLeafRecords": 10
+ }
+ ],
+ "enableDynamicStarTreeCreation": true,
"loadMode": "MMAP"
},
"metadata": {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org