You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2018/12/15 02:05:41 UTC

[incubator-pinot] branch master updated: Filter out virtual-columns in realtime segment conversion path (#3607)

This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f26e482  Filter out virtual-columns in realtime segment conversion path (#3607)
f26e482 is described below

commit f26e48284396f9b6a5f61bff06ced71d7e579aaf
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Fri Dec 14 18:05:36 2018 -0800

    Filter out virtual-columns in realtime segment conversion path (#3607)
    
    * Fix VirtualColumns being written out while segment is built
---
 .../com/linkedin/pinot/common/data/Schema.java     | 15 +++++++
 .../helix/ControllerRequestURLBuilder.java         |  5 +++
 .../immutable/ImmutableSegmentLoader.java          |  9 ++--
 .../converter/RealtimeSegmentConverter.java        | 40 ++++++++++-------
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  2 +-
 .../converter/RealtimeSegmentConverterTest.java    | 50 ++++++++++++++++++++++
 .../pinot/integration/tests/ClusterTest.java       | 15 ++++++-
 .../tests/LLCRealtimeClusterIntegrationTest.java   | 42 ++++++++++++++++++
 8 files changed, 154 insertions(+), 24 deletions(-)

diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java b/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
index 7a6cf7c..f2e6a0c 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
@@ -30,6 +30,7 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -268,6 +269,20 @@ public final class Schema {
 
   @JsonIgnore
   @Nonnull
+  public Set<String> getPhysicalColumnNames() {
+    Set<String> cols = new HashSet<>();
+    cols.addAll(_fieldSpecMap.keySet());
+    for (String col : _fieldSpecMap.keySet()) {
+      // exclude virtual columns
+      if(isVirtualColumn(col)) {
+        cols.remove(col);
+      }
+    }
+    return cols;
+  }
+
+  @JsonIgnore
+  @Nonnull
   public Collection<FieldSpec> getAllFieldSpecs() {
     return _fieldSpecMap.values();
   }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
index 9a79cc3..e7dfa77 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -154,6 +154,11 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, query);
   }
 
+  public String forTableReload(String tableName, String tableType) {
+    String query = "reload?type=" + tableType;
+    return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "segments", query);
+  }
+
   public String forTableUpdateIndexingConfigs(String tableName) {
     return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "indexingConfigs");
   }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
index 64ea1af..b1313bf 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -113,15 +113,12 @@ public class ImmutableSegmentLoader {
           new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), indexLoadingConfig));
     }
 
-    // Synthesize schema if necessary, adding virtual columns
-    if (schema != null) {
-      VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSchema(schema);
-    } else {
+    if (schema == null) {
       schema = segmentMetadata.getSchema();
     }
 
-    // Ensure that the schema in the segment metadata also has the virtual columns added
-    VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSchema(segmentMetadata.getSchema());
+    // Ensure that the schema has the virtual columns added
+    VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSchema(schema);
 
     // Instantiate virtual columns
     for (String columnName : schema.getColumnNames()) {
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/converter/RealtimeSegmentConverter.java b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/converter/RealtimeSegmentConverter.java
index 4608e03..af674b1 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/converter/RealtimeSegmentConverter.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/realtime/converter/RealtimeSegmentConverter.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.core.realtime.converter;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.linkedin.pinot.common.config.ColumnPartitionConfig;
 import com.linkedin.pinot.common.config.SegmentPartitionConfig;
 import com.linkedin.pinot.common.data.FieldSpec;
@@ -57,28 +58,14 @@ public class RealtimeSegmentConverter {
     if (new File(outputPath).exists()) {
       throw new IllegalAccessError("path already exists:" + outputPath);
     }
-    TimeFieldSpec original = schema.getTimeFieldSpec();
-    // Use outgoing granularity for creating segment
-    TimeGranularitySpec outgoing = original.getOutgoingGranularitySpec();
-
-    TimeFieldSpec newTimeSpec = new TimeFieldSpec(outgoing);
 
-    Schema newSchema = new Schema();
-    for (String dimension : schema.getDimensionNames()) {
-      newSchema.addField(schema.getFieldSpecFor(dimension));
-    }
-    for (String metric : schema.getMetricNames()) {
-      newSchema.addField(schema.getFieldSpecFor(metric));
-    }
-
-    newSchema.addField(newTimeSpec);
     this.realtimeSegmentImpl = realtimeSegment;
     this.outputPath = outputPath;
     this.invertedIndexColumns = new ArrayList<>(invertedIndexColumns);
     if (sortedColumn != null && this.invertedIndexColumns.contains(sortedColumn)) {
       this.invertedIndexColumns.remove(sortedColumn);
     }
-    this.dataSchema = newSchema;
+    this.dataSchema = getUpdatedSchema(schema);
     this.sortedColumn = sortedColumn;
     this.tableName = tableName;
     this.segmentName = segmentName;
@@ -150,4 +137,27 @@ public class RealtimeSegmentConverter {
       }
     }
   }
+
+  /**
+   * Returns a new schema based on the original one. The new schema removes columns as needed (for ex, virtual cols)
+   * and adds the new timespec to the schema.
+   */
+  @VisibleForTesting
+  public
+  Schema getUpdatedSchema(Schema original) {
+
+    TimeFieldSpec tfs = original.getTimeFieldSpec();
+    // Use outgoing granularity for creating segment
+    TimeGranularitySpec outgoing = tfs.getOutgoingGranularitySpec();
+    TimeFieldSpec newTimeSpec = new TimeFieldSpec(outgoing);
+    Schema newSchema = new Schema();
+    newSchema.addField(newTimeSpec);
+
+    for (String col : original.getPhysicalColumnNames()) {
+      if (!col.equals(tfs.getName())) {
+        newSchema.addField(original.getFieldSpecFor(col));
+      }
+    }
+    return newSchema;
+  }
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 5abed29..283af0f 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -150,7 +150,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
     Map<String, DefaultColumnAction> defaultColumnActionMap = new HashMap<>();
 
     // Compute ADD and UPDATE actions.
-    Collection<String> columnsInSchema = _schema.getColumnNames();
+    Collection<String> columnsInSchema = _schema.getPhysicalColumnNames();
     for (String column : columnsInSchema) {
       FieldSpec fieldSpecInSchema = _schema.getFieldSpecFor(column);
       Preconditions.checkNotNull(fieldSpecInSchema);
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/realtime/converter/RealtimeSegmentConverterTest.java b/pinot-core/src/test/java/com/linkedin/pinot/realtime/converter/RealtimeSegmentConverterTest.java
new file mode 100644
index 0000000..2d094ec
--- /dev/null
+++ b/pinot-core/src/test/java/com/linkedin/pinot/realtime/converter/RealtimeSegmentConverterTest.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.realtime.converter;
+
+import com.linkedin.pinot.common.data.DimensionFieldSpec;
+import com.linkedin.pinot.common.data.FieldSpec;
+import com.linkedin.pinot.common.data.Schema;
+import com.linkedin.pinot.common.data.TimeFieldSpec;
+import com.linkedin.pinot.core.realtime.converter.RealtimeSegmentConverter;
+import com.linkedin.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
+import java.util.concurrent.TimeUnit;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class RealtimeSegmentConverterTest {
+
+  @Test
+  public void testNoVirtualColumnsInSchema() {
+    Schema schema = new Schema();
+    FieldSpec spec = new DimensionFieldSpec("col1", FieldSpec.DataType.STRING, true);
+    schema.addField(spec);
+    TimeFieldSpec tfs = new TimeFieldSpec("col1", FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS,
+        "col2", FieldSpec.DataType.LONG, TimeUnit.DAYS);
+    schema.addField(tfs);
+    VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSchema(schema);
+    Assert.assertEquals(schema.getColumnNames().size(), 5);
+    Assert.assertEquals(schema.getTimeFieldSpec().getIncomingGranularitySpec().getTimeType(), TimeUnit.MILLISECONDS);
+
+    RealtimeSegmentConverter converter = new RealtimeSegmentConverter(null, "", schema,
+        "testTable", "col1", "segment1", "col1");
+
+    Schema newSchema = converter.getUpdatedSchema(schema);
+    Assert.assertEquals(newSchema.getColumnNames().size(), 2);
+    Assert.assertEquals(newSchema.getTimeFieldSpec().getIncomingGranularitySpec().getTimeType(), TimeUnit.DAYS);
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
index 7803874..8524196 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.integration.tests;
 import com.linkedin.pinot.broker.broker.BrokerServerBuilder;
 import com.linkedin.pinot.broker.broker.BrokerTestUtils;
 import com.linkedin.pinot.broker.broker.helix.HelixBrokerStarter;
+import com.linkedin.pinot.common.config.IndexingConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.config.TableTaskConfig;
@@ -424,13 +425,23 @@ public abstract class ClusterTest extends ControllerTest {
         .setTaskConfig(taskConfig)
         .build();
 
+    // save the realtime table config
+    _realtimeTableConfig = tableConfig;
+
     if (!isUsingNewConfigFormat()) {
       sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJSONConfigString());
-    } else {
-      _realtimeTableConfig = tableConfig;
     }
   }
 
+  protected void updateRealtimeTableConfig(String tablename, List<String> invertedIndexCols, List<String> bloomFilterCols) throws Exception {
+
+    IndexingConfig config  = _realtimeTableConfig.getIndexingConfig();
+    config.setInvertedIndexColumns(invertedIndexCols);
+    config.setBloomFilterColumns(bloomFilterCols);
+
+    sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tablename), _realtimeTableConfig.toJSONConfigString());
+  }
+
   protected void dropRealtimeTable(String tableName) throws Exception {
     sendDeleteRequest(
         _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName)));
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index f962747..d50cc71 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -15,15 +15,20 @@
  */
 package com.linkedin.pinot.integration.tests;
 
+import com.google.common.base.Function;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.core.indexsegment.generator.SegmentVersion;
+import com.linkedin.pinot.util.TestUtils;
 import java.io.File;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import org.apache.avro.reflect.Nullable;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.ZNRecord;
+import org.json.JSONObject;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -33,6 +38,7 @@ import org.testng.annotations.Test;
  * Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer.
  */
 public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
+
   public static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
   public static final long RANDOM_SEED = System.currentTimeMillis();
   public static final Random RANDOM = new Random(RANDOM_SEED);
@@ -40,6 +46,11 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
   public final boolean _isDirectAlloc = RANDOM.nextBoolean();
   public final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
 
+  private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
+      "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+  private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
+      Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime");
+
   @BeforeClass
   @Override
   public void setUp() throws Exception {
@@ -83,6 +94,7 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
 
   @Test
   public void testSegmentFlushSize() throws Exception {
+
     String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
     List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
     for (String segmentName : segmentNames) {
@@ -92,4 +104,34 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
           "Segment: " + segmentName + " does not have the expected flush size");
     }
   }
+
+  @Test
+  public void testInvertedIndexTriggering() throws Exception {
+
+    final long numTotalDocs = getCountStarResult();
+
+    JSONObject queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+    Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs);
+    // TODO: investigate why assert for a specific value fails intermittently
+    Assert.assertNotSame(queryResponse.getLong("numEntriesScannedInFilter"), 0);
+
+    updateRealtimeTableConfig(getTableName(), UPDATED_INVERTED_INDEX_COLUMNS, null);
+
+    sendPostRequest(_controllerRequestURLBuilder.forTableReload(getTableName(), "realtime"), null);
+
+    TestUtils.waitForCondition(new Function<Void, Boolean>() {
+      @Override
+      public Boolean apply(@javax.annotation.Nullable Void aVoid) {
+        try {
+          JSONObject queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+          // Total docs should not change during reload
+          Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs);
+          return queryResponse.getLong("numEntriesScannedInFilter") == 0;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }, 600_000L, "Failed to generate inverted index");
+  }
 }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org