You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2018/12/14 15:18:00 UTC

[incubator-pinot] 01/01: Fix realtime segment reload

This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch virtualColFix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f6f9c47e1ef978e6d8bdaaf5adda80d7cba3f9ca
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Fri Dec 14 07:17:41 2018 -0800

    Fix realtime segment reload
---
 .../com/linkedin/pinot/common/data/Schema.java     | 15 ++++++++
 .../helix/ControllerRequestURLBuilder.java         |  5 +++
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  2 +-
 .../pinot/integration/tests/ClusterTest.java       | 15 ++++++--
 .../tests/LLCRealtimeClusterIntegrationTest.java   | 42 ++++++++++++++++++++++
 .../tests/OfflineClusterIntegrationTest.java       |  6 ++--
 6 files changed, 79 insertions(+), 6 deletions(-)

diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java b/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
index 7a6cf7c..f2e6a0c 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
@@ -30,6 +30,7 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -268,6 +269,20 @@ public final class Schema {
 
   @JsonIgnore
   @Nonnull
+  public Set<String> getPhysicalColumnNames() {
+    Set<String> cols = new HashSet<>();
+    cols.addAll(_fieldSpecMap.keySet());
+    for (String col : _fieldSpecMap.keySet()) {
+      // exclude virtual columns
+      if(isVirtualColumn(col)) {
+        cols.remove(col);
+      }
+    }
+    return cols;
+  }
+
+  @JsonIgnore
+  @Nonnull
   public Collection<FieldSpec> getAllFieldSpecs() {
     return _fieldSpecMap.values();
   }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
index 9a79cc3..e7dfa77 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -154,6 +154,11 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, query);
   }
 
+  public String forTableReload(String tableName, String tableType) {
+    String query = "reload?type=" + tableType;
+    return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "segments", query);
+  }
+
   public String forTableUpdateIndexingConfigs(String tableName) {
     return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "indexingConfigs");
   }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 5abed29..283af0f 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -150,7 +150,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
     Map<String, DefaultColumnAction> defaultColumnActionMap = new HashMap<>();
 
     // Compute ADD and UPDATE actions.
-    Collection<String> columnsInSchema = _schema.getColumnNames();
+    Collection<String> columnsInSchema = _schema.getPhysicalColumnNames();
     for (String column : columnsInSchema) {
       FieldSpec fieldSpecInSchema = _schema.getFieldSpecFor(column);
       Preconditions.checkNotNull(fieldSpecInSchema);
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
index 7803874..8524196 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.integration.tests;
 import com.linkedin.pinot.broker.broker.BrokerServerBuilder;
 import com.linkedin.pinot.broker.broker.BrokerTestUtils;
 import com.linkedin.pinot.broker.broker.helix.HelixBrokerStarter;
+import com.linkedin.pinot.common.config.IndexingConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.config.TableTaskConfig;
@@ -424,13 +425,23 @@ public abstract class ClusterTest extends ControllerTest {
         .setTaskConfig(taskConfig)
         .build();
 
+    // save the realtime table config
+    _realtimeTableConfig = tableConfig;
+
     if (!isUsingNewConfigFormat()) {
       sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJSONConfigString());
-    } else {
-      _realtimeTableConfig = tableConfig;
     }
   }
 
+  protected void updateRealtimeTableConfig(String tablename, List<String> invertedIndexCols, List<String> bloomFilterCols) throws Exception {
+
+    IndexingConfig config  = _realtimeTableConfig.getIndexingConfig();
+    config.setInvertedIndexColumns(invertedIndexCols);
+    config.setBloomFilterColumns(bloomFilterCols);
+
+    sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tablename), _realtimeTableConfig.toJSONConfigString());
+  }
+
   protected void dropRealtimeTable(String tableName) throws Exception {
     sendDeleteRequest(
         _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName)));
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index f962747..d50cc71 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -15,15 +15,20 @@
  */
 package com.linkedin.pinot.integration.tests;
 
+import com.google.common.base.Function;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.core.indexsegment.generator.SegmentVersion;
+import com.linkedin.pinot.util.TestUtils;
 import java.io.File;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import org.apache.avro.reflect.Nullable;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.ZNRecord;
+import org.json.JSONObject;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -33,6 +38,7 @@ import org.testng.annotations.Test;
  * Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer.
  */
 public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
+
   public static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
   public static final long RANDOM_SEED = System.currentTimeMillis();
   public static final Random RANDOM = new Random(RANDOM_SEED);
@@ -40,6 +46,11 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
   public final boolean _isDirectAlloc = RANDOM.nextBoolean();
   public final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
 
+  private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
+      "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+  private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
+      Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime");
+
   @BeforeClass
   @Override
   public void setUp() throws Exception {
@@ -83,6 +94,7 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
 
   @Test
   public void testSegmentFlushSize() throws Exception {
+
     String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
     List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
     for (String segmentName : segmentNames) {
@@ -92,4 +104,34 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
           "Segment: " + segmentName + " does not have the expected flush size");
     }
   }
+
+  @Test
+  public void testInvertedIndexTriggering() throws Exception {
+
+    final long numTotalDocs = getCountStarResult();
+
+    JSONObject queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+    Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs);
+    // TODO: investigate why assert for a specific value fails intermittently
+    Assert.assertNotSame(queryResponse.getLong("numEntriesScannedInFilter"), 0);
+
+    updateRealtimeTableConfig(getTableName(), UPDATED_INVERTED_INDEX_COLUMNS, null);
+
+    sendPostRequest(_controllerRequestURLBuilder.forTableReload(getTableName(), "realtime"), null);
+
+    TestUtils.waitForCondition(new Function<Void, Boolean>() {
+      @Override
+      public Boolean apply(@javax.annotation.Nullable Void aVoid) {
+        try {
+          JSONObject queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+          // Total docs should not change during reload
+          Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs);
+          return queryResponse.getLong("numEntriesScannedInFilter") == 0;
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }, 600_000L, "Failed to generate inverted index");
+  }
 }
+
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 4e5febf..ad8d86f 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -50,10 +50,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   private static final int NUM_SEGMENTS = 12;
 
   // For inverted index triggering test
-  private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
-      Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime");
   private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
       "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+  private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
+      Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime");
 
   private static final List<String> UPDATED_BLOOM_FLITER_COLUMNS = Arrays.asList("Carrier");
   private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
@@ -200,7 +200,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
           throw new RuntimeException(e);
         }
       }
-    }, 600_000L, "Failed to generate inverted index");
+    }, 600_000L, "Failed to generate bloomfilter");
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org