You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/11/04 03:53:28 UTC
[pinot] branch master updated: Fix TIMESTAMP index handling in SegmentMapper (#9722)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5988ff3671 Fix TIMESTAMP index handling in SegmentMapper (#9722)
5988ff3671 is described below
commit 5988ff3671a660ce37663bdd2fba4036b2432782
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Nov 3 20:53:23 2022 -0700
Fix TIMESTAMP index handling in SegmentMapper (#9722)
---
.../segment/processing/mapper/SegmentMapper.java | 8 +-
.../processing/framework/SegmentMapperTest.java | 30 ++--
...fflineSegmentsMinionClusterIntegrationTest.java | 187 +++++++++++----------
3 files changed, 118 insertions(+), 107 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index f20b8631f6..1caca8be5d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -39,6 +39,7 @@ import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerFactory;
import org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -81,9 +82,10 @@ public class SegmentMapper {
_mapperOutputDir = mapperOutputDir;
TableConfig tableConfig = processorConfig.getTableConfig();
- Schema schema = processorConfig.getSchema();
- Pair<List<FieldSpec>, Integer> pair = SegmentProcessorUtils
- .getFieldSpecs(schema, processorConfig.getMergeType(), tableConfig.getIndexingConfig().getSortedColumn());
+ Schema schema = SegmentGeneratorConfig.updateSchemaWithTimestampIndexes(processorConfig.getSchema(),
+ SegmentGeneratorConfig.extractTimestampIndexConfigsFromTableConfig(tableConfig));
+ Pair<List<FieldSpec>, Integer> pair = SegmentProcessorUtils.getFieldSpecs(schema, processorConfig.getMergeType(),
+ tableConfig.getIndexingConfig().getSortedColumn());
_fieldSpecs = pair.getLeft();
_numSortFields = pair.getRight();
_includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 5e83d715b5..748740b666 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -39,9 +39,12 @@ import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.config.table.TimestampConfig;
+import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
@@ -63,13 +66,16 @@ public class SegmentMapperTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SegmentMapperTest");
private final TableConfig _tableConfig =
- new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("timeValue")
- .setNullHandlingEnabled(true).build();
- private final Schema _schema = new Schema.SchemaBuilder().setSchemaName("myTable")
- .addSingleValueDimension("campaign", FieldSpec.DataType.STRING, "xyz").addMetric("clicks", FieldSpec.DataType.INT)
- .addDateTime("timeValue", FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
- private final List<Object[]> _rawData = Arrays
- .asList(new Object[]{"abc", 1000, 1597719600000L}, new Object[]{"pqr", 2000, 1597773600000L},
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("ts")
+ .setNullHandlingEnabled(true).setFieldConfigList(Collections.singletonList(
+ new FieldConfig("ts", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.TIMESTAMP, null, null,
+ new TimestampConfig(Collections.singletonList(TimestampIndexGranularity.DAY)), null))).build();
+ private final Schema _schema =
+ new Schema.SchemaBuilder().setSchemaName("myTable").addSingleValueDimension("campaign", DataType.STRING, "xyz")
+ .addMetric("clicks", DataType.INT).addDateTime("ts", DataType.TIMESTAMP, "TIMESTAMP", "1:MILLISECONDS")
+ .build();
+ private final List<Object[]> _rawData =
+ Arrays.asList(new Object[]{"abc", 1000, 1597719600000L}, new Object[]{"pqr", 2000, 1597773600000L},
new Object[]{"abc", 1000, 1597777200000L}, new Object[]{"abc", 4000, 1597795200000L},
new Object[]{"abc", 3000, 1597802400000L}, new Object[]{"pqr", 1000, 1597838400000L},
new Object[]{null, 4000, 1597856400000L}, new Object[]{"pqr", 1000, 1597878000000L},
@@ -97,7 +103,7 @@ public class SegmentMapperTest {
row.putDefaultNullValue("campaign", "xyz");
}
row.putValue("clicks", rawRow[1]);
- row.putValue("timeValue", rawRow[2]);
+ row.putValue("ts", rawRow[2]);
inputRows.add(row);
}
@@ -154,7 +160,9 @@ public class SegmentMapperTest {
Object[] expectedValues = expectedRecords.get(i);
assertEquals(buffer.getValue("campaign"), expectedValues[0]);
assertEquals(buffer.getValue("clicks"), expectedValues[1]);
- assertEquals(buffer.getValue("timeValue"), expectedValues[2]);
+ assertEquals(buffer.getValue("ts"), expectedValues[2]);
+ // TIMESTAMP index
+ assertEquals(buffer.getValue("$ts$DAY"), (long) expectedValues[2] / 86400000 * 86400000);
// Default null value
if (expectedValues[0].equals("xyz")) {
assertEquals(buffer.getNullValueFields(), Collections.singleton("campaign"));
@@ -214,7 +222,7 @@ public class SegmentMapperTest {
new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setPartitionerConfigs(
Collections.singletonList(new PartitionerConfig.Builder()
.setPartitionerType(PartitionerFactory.PartitionerType.TRANSFORM_FUNCTION)
- .setTransformFunction("toEpochDays(timeValue)").build())).build();
+ .setTransformFunction("toEpochDays(ts)").build())).build();
Map<String, List<Object[]>> expectedRecords3 =
outputData.stream().collect(Collectors.groupingBy(r -> "0_" + ((long) r[2] / 86400000), Collectors.toList()));
inputs.add(new Object[]{config3, expectedRecords3});
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 218ab7e01a..a8eb0a4c40 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -18,71 +18,124 @@
*/
package org.apache.pinot.integration.tests;
-import java.io.IOException;
+import java.io.File;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.task.TaskState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TimestampConfig;
+import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
/**
* Integration test for minion task of type "RealtimeToOfflineSegmentsTask"
* With every task run, a new segment is created in the offline table for 1 day. Watermark also keeps progressing
* accordingly.
*/
-public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest {
-
- private PinotHelixTaskResourceManager _helixTaskResourceManager;
+public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseClusterIntegrationTestSet {
+ private PinotHelixTaskResourceManager _taskResourceManager;
private PinotTaskManager _taskManager;
- private PinotHelixResourceManager _pinotHelixResourceManager;
-
- private long _dataSmallestTimeMs;
private String _realtimeTableName;
private String _offlineTableName;
+ private long _dataSmallestTimeMs;
@Override
- protected TableTaskConfig getTaskConfig() {
- return new TableTaskConfig(
- Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>()));
+ protected SegmentPartitionConfig getSegmentPartitionConfig() {
+ Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
+ ColumnPartitionConfig columnOneConfig = new ColumnPartitionConfig("murmur", 3);
+ columnPartitionConfigMap.put("AirlineID", columnOneConfig);
+ ColumnPartitionConfig columnTwoConfig = new ColumnPartitionConfig("hashcode", 2);
+ columnPartitionConfigMap.put("OriginAirportID", columnTwoConfig);
+ return new SegmentPartitionConfig(columnPartitionConfigMap);
}
@BeforeClass
public void setUp()
throws Exception {
- // Setup realtime table, and blank offline table
- super.setUp();
- addTableConfig(createOfflineTableConfig());
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServer();
startMinion();
- _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
+ // Start Kafka
+ startKafka();
+
+ // Unpack the Avro files
+ List<File> avroFiles = unpackAvroData(_tempDir);
+
+ // Create and upload the schema and table configs with a TIMESTAMP field
+ Schema schema = createSchema();
+ schema.addField(new DateTimeFieldSpec("ts", DataType.TIMESTAMP, "TIMESTAMP", "1:MILLISECONDS"));
+ addSchema(schema);
+
+ TableConfig realtimeTableConfig = createRealtimeTableConfig(avroFiles.get(0));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(
+ Collections.singletonList(new TransformConfig("ts", "fromEpochDays(DaysSinceEpoch)")));
+ realtimeTableConfig.setIngestionConfig(ingestionConfig);
+ FieldConfig tsFieldConfig =
+ new FieldConfig("ts", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.TIMESTAMP, null, null,
+ new TimestampConfig(Arrays.asList(TimestampIndexGranularity.HOUR, TimestampIndexGranularity.DAY,
+ TimestampIndexGranularity.WEEK, TimestampIndexGranularity.MONTH)), null);
+ realtimeTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
+ realtimeTableConfig.setTaskConfig(new TableTaskConfig(
+ Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>())));
+ addTableConfig(realtimeTableConfig);
+
+ TableConfig offlineTableConfig = createOfflineTableConfig();
+ offlineTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
+ addTableConfig(offlineTableConfig);
+
+ // Push data into Kafka
+ pushAvroIntoKafka(avroFiles);
+
+ // Set up the H2 connection
+ setUpH2Connection(avroFiles);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
+
+ _taskResourceManager = _controllerStarter.getHelixTaskResourceManager();
_taskManager = _controllerStarter.getTaskManager();
- _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
-
_realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
_offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
- List<SegmentZKMetadata> segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(_realtimeTableName);
+ List<SegmentZKMetadata> segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_realtimeTableName);
long minSegmentTimeMs = Long.MAX_VALUE;
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
if (segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
@@ -94,9 +147,9 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseR
@Test
public void testRealtimeToOfflineSegmentsTask()
- throws IOException {
- List<SegmentZKMetadata> segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(_offlineTableName);
- Assert.assertTrue(segmentsZKMetadata.isEmpty());
+ throws Exception {
+ List<SegmentZKMetadata> segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_offlineTableName);
+ assertTrue(segmentsZKMetadata.isEmpty());
// The number of offline segments would be equal to the product of number of partitions for all the
// partition columns if segment partitioning is configured.
@@ -110,68 +163,41 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseR
long expectedWatermark = _dataSmallestTimeMs + 86400000;
for (int i = 0; i < 3; i++) {
// Schedule task
- Assert.assertNotNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
- Assert.assertTrue(_helixTaskResourceManager.getTaskQueues().contains(
+ assertNotNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertTrue(_taskResourceManager.getTaskQueues().contains(
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
// Should not generate more tasks
- Assert.assertNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ assertNull(_taskManager.scheduleTasks().get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
// Wait at most 600 seconds for all tasks COMPLETED
waitForTaskToComplete(expectedWatermark);
// check segment is in offline
- segmentsZKMetadata = _pinotHelixResourceManager.getSegmentsZKMetadata(_offlineTableName);
- Assert.assertEquals(segmentsZKMetadata.size(), (numOfflineSegmentsPerTask * (i + 1)));
+ segmentsZKMetadata = _helixResourceManager.getSegmentsZKMetadata(_offlineTableName);
+ assertEquals(segmentsZKMetadata.size(), (numOfflineSegmentsPerTask * (i + 1)));
long expectedOfflineSegmentTimeMs = expectedWatermark - 86400000;
for (int j = (numOfflineSegmentsPerTask * i); j < segmentsZKMetadata.size(); j++) {
SegmentZKMetadata segmentZKMetadata = segmentsZKMetadata.get(j);
- Assert.assertEquals(segmentZKMetadata.getStartTimeMs(), expectedOfflineSegmentTimeMs);
- Assert.assertEquals(segmentZKMetadata.getEndTimeMs(), expectedOfflineSegmentTimeMs);
+ assertEquals(segmentZKMetadata.getStartTimeMs(), expectedOfflineSegmentTimeMs);
+ assertEquals(segmentZKMetadata.getEndTimeMs(), expectedOfflineSegmentTimeMs);
if (segmentPartitionConfig != null) {
- Assert.assertEquals(segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().keySet(),
+ assertEquals(segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().keySet(),
segmentPartitionConfig.getColumnPartitionMap().keySet());
for (String partitionColumn : segmentPartitionConfig.getColumnPartitionMap().keySet()) {
- Assert.assertEquals(segmentZKMetadata.getPartitionMetadata().getPartitions(partitionColumn).size(), 1);
+ assertEquals(segmentZKMetadata.getPartitionMetadata().getPartitions(partitionColumn).size(), 1);
}
}
}
expectedWatermark += 86400000;
}
- this.testHardcodedQueries();
-
- // Delete the table
- dropRealtimeTable(_realtimeTableName);
-
- // Check if the metadata is cleaned up on table deletion
- verifyTableDelete(_realtimeTableName);
- }
-
- @Nullable
- @Override
- protected SegmentPartitionConfig getSegmentPartitionConfig() {
- Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
- ColumnPartitionConfig columnOneConfig = new ColumnPartitionConfig("murmur", 3);
- columnPartitionConfigMap.put("AirlineID", columnOneConfig);
- ColumnPartitionConfig columnTwoConfig = new ColumnPartitionConfig("hashcode", 2);
- columnPartitionConfigMap.put("OriginAirportID", columnTwoConfig);
- return new SegmentPartitionConfig(columnPartitionConfigMap);
- }
- protected void verifyTableDelete(String tableNameWithType) {
- TestUtils.waitForCondition(input -> {
- // Check if the task metadata is cleaned up
- if (MinionTaskMetadataUtils.fetchTaskMetadata(_propertyStore,
- MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, tableNameWithType) != null) {
- return false;
- }
- return true;
- }, 1_000L, 60_000L, "Failed to delete table");
+ testHardcodedQueries();
}
private void waitForTaskToComplete(long expectedWatermark) {
TestUtils.waitForCondition(input -> {
// Check task state
- for (TaskState taskState : _helixTaskResourceManager.getTaskStates(
+ for (TaskState taskState : _taskResourceManager.getTaskStates(
MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE).values()) {
if (taskState != TaskState.COMPLETED) {
return false;
@@ -185,43 +211,18 @@ public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseR
.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, _realtimeTableName);
RealtimeToOfflineSegmentsTaskMetadata minionTaskMetadata =
znRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
- Assert.assertNotNull(minionTaskMetadata);
- Assert.assertEquals(minionTaskMetadata.getWatermarkMs(), expectedWatermark);
- }
-
- @Test(enabled = false)
- @Override
- public void testDictionaryBasedQueries() {
- }
-
- @Test(enabled = false)
- @Override
- public void testHardcodedQueries() {
- }
-
- @Test(enabled = false)
- @Override
- public void testQueriesFromQueryFile() {
- }
-
- @Test(enabled = false)
- @Override
- public void testGeneratedQueries() {
- }
-
- @Test(enabled = false)
- @Override
- public void testQueryExceptions() {
- }
-
- @Test(enabled = false)
- @Override
- public void testInstanceShutdown() {
+ assertNotNull(minionTaskMetadata);
+ assertEquals(minionTaskMetadata.getWatermarkMs(), expectedWatermark);
}
@AfterClass
public void tearDown()
throws Exception {
+ dropRealtimeTable(_realtimeTableName);
+ assertNull(MinionTaskMetadataUtils.fetchTaskMetadata(_propertyStore,
+ MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, _realtimeTableName));
+ dropOfflineTable(_offlineTableName);
+
stopMinion();
stopServer();
stopBroker();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org