You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2018/12/15 02:05:41 UTC
[incubator-pinot] branch master updated: Filter out virtual-columns
in realtime segment conversion path (#3607)
This is an automated email from the ASF dual-hosted git repository.
sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f26e482 Filter out virtual-columns in realtime segment conversion path (#3607)
f26e482 is described below
commit f26e48284396f9b6a5f61bff06ced71d7e579aaf
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Fri Dec 14 18:05:36 2018 -0800
Filter out virtual-columns in realtime segment conversion path (#3607)
* Fix VirtualColumns being written out while segment is built
---
.../com/linkedin/pinot/common/data/Schema.java | 15 +++++++
.../helix/ControllerRequestURLBuilder.java | 5 +++
.../immutable/ImmutableSegmentLoader.java | 9 ++--
.../converter/RealtimeSegmentConverter.java | 40 ++++++++++-------
.../defaultcolumn/BaseDefaultColumnHandler.java | 2 +-
.../converter/RealtimeSegmentConverterTest.java | 50 ++++++++++++++++++++++
.../pinot/integration/tests/ClusterTest.java | 15 ++++++-
.../tests/LLCRealtimeClusterIntegrationTest.java | 42 ++++++++++++++++++
8 files changed, 154 insertions(+), 24 deletions(-)
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java b/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
index 7a6cf7c..f2e6a0c 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
@@ -30,6 +30,7 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -268,6 +269,20 @@ public final class Schema {
@JsonIgnore
@Nonnull
+ public Set<String> getPhysicalColumnNames() {
+ Set<String> cols = new HashSet<>();
+ cols.addAll(_fieldSpecMap.keySet());
+ for (String col : _fieldSpecMap.keySet()) {
+ // exclude virtual columns
+ if(isVirtualColumn(col)) {
+ cols.remove(col);
+ }
+ }
+ return cols;
+ }
+
+ @JsonIgnore
+ @Nonnull
public Collection<FieldSpec> getAllFieldSpecs() {
return _fieldSpecMap.values();
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
index 9a79cc3..e7dfa77 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -154,6 +154,11 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, query);
}
+ public String forTableReload(String tableName, String tableType) {
+ String query = "reload?type=" + tableType;
+ return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "segments", query);
+ }
+
public String forTableUpdateIndexingConfigs(String tableName) {
return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "indexingConfigs");
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
index 64ea1af..b1313bf 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -113,15 +113,12 @@ public class ImmutableSegmentLoader {
new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), indexLoadingConfig));
}
- // Synthesize schema if necessary, adding virtual columns
- if (schema != null) {
- VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSchema(schema);
- } else {
+ if (schema == null) {
schema = segmentMetadata.getSchema();
}
- // Ensure that the schema in the segment metadata also has the virtual columns added
- VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSchema(segmentMetadata.getSchema());
+ // Ensure that the schema has the virtual columns added
+ VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSchema(schema);
// Instantiate virtual columns
for (String columnName : schema.getColumnNames()) {
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/converter/RealtimeSegmentConverter.java b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/converter/RealtimeSegmentConverter.java
index 4608e03..af674b1 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/converter/RealtimeSegmentConverter.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/converter/RealtimeSegmentConverter.java
@@ -15,6 +15,7 @@
*/
package com.linkedin.pinot.core.realtime.converter;
+import com.google.common.annotations.VisibleForTesting;
import com.linkedin.pinot.common.config.ColumnPartitionConfig;
import com.linkedin.pinot.common.config.SegmentPartitionConfig;
import com.linkedin.pinot.common.data.FieldSpec;
@@ -57,28 +58,14 @@ public class RealtimeSegmentConverter {
if (new File(outputPath).exists()) {
throw new IllegalAccessError("path already exists:" + outputPath);
}
- TimeFieldSpec original = schema.getTimeFieldSpec();
- // Use outgoing granularity for creating segment
- TimeGranularitySpec outgoing = original.getOutgoingGranularitySpec();
-
- TimeFieldSpec newTimeSpec = new TimeFieldSpec(outgoing);
- Schema newSchema = new Schema();
- for (String dimension : schema.getDimensionNames()) {
- newSchema.addField(schema.getFieldSpecFor(dimension));
- }
- for (String metric : schema.getMetricNames()) {
- newSchema.addField(schema.getFieldSpecFor(metric));
- }
-
- newSchema.addField(newTimeSpec);
this.realtimeSegmentImpl = realtimeSegment;
this.outputPath = outputPath;
this.invertedIndexColumns = new ArrayList<>(invertedIndexColumns);
if (sortedColumn != null && this.invertedIndexColumns.contains(sortedColumn)) {
this.invertedIndexColumns.remove(sortedColumn);
}
- this.dataSchema = newSchema;
+ this.dataSchema = getUpdatedSchema(schema);
this.sortedColumn = sortedColumn;
this.tableName = tableName;
this.segmentName = segmentName;
@@ -150,4 +137,27 @@ public class RealtimeSegmentConverter {
}
}
}
+
+ /**
+ * Returns a new schema based on the original one. The new schema removes columns as needed (for ex, virtual cols)
+ * and adds the new timespec to the schema.
+ */
+ @VisibleForTesting
+ public
+ Schema getUpdatedSchema(Schema original) {
+
+ TimeFieldSpec tfs = original.getTimeFieldSpec();
+ // Use outgoing granularity for creating segment
+ TimeGranularitySpec outgoing = tfs.getOutgoingGranularitySpec();
+ TimeFieldSpec newTimeSpec = new TimeFieldSpec(outgoing);
+ Schema newSchema = new Schema();
+ newSchema.addField(newTimeSpec);
+
+ for (String col : original.getPhysicalColumnNames()) {
+ if (!col.equals(tfs.getName())) {
+ newSchema.addField(original.getFieldSpecFor(col));
+ }
+ }
+ return newSchema;
+ }
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 5abed29..283af0f 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -150,7 +150,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
Map<String, DefaultColumnAction> defaultColumnActionMap = new HashMap<>();
// Compute ADD and UPDATE actions.
- Collection<String> columnsInSchema = _schema.getColumnNames();
+ Collection<String> columnsInSchema = _schema.getPhysicalColumnNames();
for (String column : columnsInSchema) {
FieldSpec fieldSpecInSchema = _schema.getFieldSpecFor(column);
Preconditions.checkNotNull(fieldSpecInSchema);
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/realtime/converter/RealtimeSegmentConverterTest.java b/pinot-core/src/test/java/com/linkedin/pinot/realtime/converter/RealtimeSegmentConverterTest.java
new file mode 100644
index 0000000..2d094ec
--- /dev/null
+++ b/pinot-core/src/test/java/com/linkedin/pinot/realtime/converter/RealtimeSegmentConverterTest.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.realtime.converter;
+
+import com.linkedin.pinot.common.data.DimensionFieldSpec;
+import com.linkedin.pinot.common.data.FieldSpec;
+import com.linkedin.pinot.common.data.Schema;
+import com.linkedin.pinot.common.data.TimeFieldSpec;
+import com.linkedin.pinot.core.realtime.converter.RealtimeSegmentConverter;
+import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
+import java.util.concurrent.TimeUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class RealtimeSegmentConverterTest {
+
+ @Test
+ public void testNoVirtualColumnsInSchema() {
+ Schema schema = new Schema();
+ FieldSpec spec = new DimensionFieldSpec("col1", FieldSpec.DataType.STRING, true);
+ schema.addField(spec);
+ TimeFieldSpec tfs = new TimeFieldSpec("col1", FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS,
+ "col2", FieldSpec.DataType.LONG, TimeUnit.DAYS);
+ schema.addField(tfs);
+ VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSchema(schema);
+ Assert.assertEquals(schema.getColumnNames().size(), 5);
+ Assert.assertEquals(schema.getTimeFieldSpec().getIncomingGranularitySpec().getTimeType(), TimeUnit.MILLISECONDS);
+
+ RealtimeSegmentConverter converter = new RealtimeSegmentConverter(null, "", schema,
+ "testTable", "col1", "segment1", "col1");
+
+ Schema newSchema = converter.getUpdatedSchema(schema);
+ Assert.assertEquals(newSchema.getColumnNames().size(), 2);
+ Assert.assertEquals(newSchema.getTimeFieldSpec().getIncomingGranularitySpec().getTimeType(), TimeUnit.DAYS);
+ }
+}
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
index 7803874..8524196 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.integration.tests;
import com.linkedin.pinot.broker.broker.BrokerServerBuilder;
import com.linkedin.pinot.broker.broker.BrokerTestUtils;
import com.linkedin.pinot.broker.broker.helix.HelixBrokerStarter;
+import com.linkedin.pinot.common.config.IndexingConfig;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.config.TableTaskConfig;
@@ -424,13 +425,23 @@ public abstract class ClusterTest extends ControllerTest {
.setTaskConfig(taskConfig)
.build();
+ // save the realtime table config
+ _realtimeTableConfig = tableConfig;
+
if (!isUsingNewConfigFormat()) {
sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJSONConfigString());
- } else {
- _realtimeTableConfig = tableConfig;
}
}
+ protected void updateRealtimeTableConfig(String tablename, List<String> invertedIndexCols, List<String> bloomFilterCols) throws Exception {
+
+ IndexingConfig config = _realtimeTableConfig.getIndexingConfig();
+ config.setInvertedIndexColumns(invertedIndexCols);
+ config.setBloomFilterColumns(bloomFilterCols);
+
+ sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tablename), _realtimeTableConfig.toJSONConfigString());
+ }
+
protected void dropRealtimeTable(String tableName) throws Exception {
sendDeleteRequest(
_controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName)));
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index f962747..d50cc71 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -15,15 +15,20 @@
*/
package com.linkedin.pinot.integration.tests;
+import com.google.common.base.Function;
import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.core.indexsegment.generator.SegmentVersion;
+import com.linkedin.pinot.util.TestUtils;
import java.io.File;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.avro.reflect.Nullable;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.ZNRecord;
+import org.json.JSONObject;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -33,6 +38,7 @@ import org.testng.annotations.Test;
* Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer.
*/
public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
+
public static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
public static final long RANDOM_SEED = System.currentTimeMillis();
public static final Random RANDOM = new Random(RANDOM_SEED);
@@ -40,6 +46,11 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
public final boolean _isDirectAlloc = RANDOM.nextBoolean();
public final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
+ private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
+ "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+ private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
+ Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime");
+
@BeforeClass
@Override
public void setUp() throws Exception {
@@ -83,6 +94,7 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
@Test
public void testSegmentFlushSize() throws Exception {
+
String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
for (String segmentName : segmentNames) {
@@ -92,4 +104,34 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
"Segment: " + segmentName + " does not have the expected flush size");
}
}
+
+ @Test
+ public void testInvertedIndexTriggering() throws Exception {
+
+ final long numTotalDocs = getCountStarResult();
+
+ JSONObject queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+ Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs);
+ // TODO: investigate why assert for a specific value fails intermittently
+ Assert.assertNotSame(queryResponse.getLong("numEntriesScannedInFilter"), 0);
+
+ updateRealtimeTableConfig(getTableName(), UPDATED_INVERTED_INDEX_COLUMNS, null);
+
+ sendPostRequest(_controllerRequestURLBuilder.forTableReload(getTableName(), "realtime"), null);
+
+ TestUtils.waitForCondition(new Function<Void, Boolean>() {
+ @Override
+ public Boolean apply(@javax.annotation.Nullable Void aVoid) {
+ try {
+ JSONObject queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+ // Total docs should not change during reload
+ Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs);
+ return queryResponse.getLong("numEntriesScannedInFilter") == 0;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 600_000L, "Failed to generate inverted index");
+ }
}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org