You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2018/12/14 15:18:00 UTC
[incubator-pinot] 01/01: Fix realtime segment reload
This is an automated email from the ASF dual-hosted git repository.
sunithabeeram pushed a commit to branch virtualColFix
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f6f9c47e1ef978e6d8bdaaf5adda80d7cba3f9ca
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Fri Dec 14 07:17:41 2018 -0800
Fix realtime segment reload
---
.../com/linkedin/pinot/common/data/Schema.java | 15 ++++++++
.../helix/ControllerRequestURLBuilder.java | 5 +++
.../defaultcolumn/BaseDefaultColumnHandler.java | 2 +-
.../pinot/integration/tests/ClusterTest.java | 15 ++++++--
.../tests/LLCRealtimeClusterIntegrationTest.java | 42 ++++++++++++++++++++++
.../tests/OfflineClusterIntegrationTest.java | 6 ++--
6 files changed, 79 insertions(+), 6 deletions(-)
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java b/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
index 7a6cf7c..f2e6a0c 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java
@@ -30,6 +30,7 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -268,6 +269,20 @@ public final class Schema {
@JsonIgnore
@Nonnull
+ public Set<String> getPhysicalColumnNames() {
+ Set<String> cols = new HashSet<>();
+ cols.addAll(_fieldSpecMap.keySet());
+ for (String col : _fieldSpecMap.keySet()) {
+ // exclude virtual columns
+ if(isVirtualColumn(col)) {
+ cols.remove(col);
+ }
+ }
+ return cols;
+ }
+
+ @JsonIgnore
+ @Nonnull
public Collection<FieldSpec> getAllFieldSpecs() {
return _fieldSpecMap.values();
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
index 9a79cc3..e7dfa77 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -154,6 +154,11 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, query);
}
+ public String forTableReload(String tableName, String tableType) {
+ String query = "reload?type=" + tableType;
+ return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "segments", query);
+ }
+
public String forTableUpdateIndexingConfigs(String tableName) {
return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "indexingConfigs");
}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 5abed29..283af0f 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -150,7 +150,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
Map<String, DefaultColumnAction> defaultColumnActionMap = new HashMap<>();
// Compute ADD and UPDATE actions.
- Collection<String> columnsInSchema = _schema.getColumnNames();
+ Collection<String> columnsInSchema = _schema.getPhysicalColumnNames();
for (String column : columnsInSchema) {
FieldSpec fieldSpecInSchema = _schema.getFieldSpecFor(column);
Preconditions.checkNotNull(fieldSpecInSchema);
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
index 7803874..8524196 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java
@@ -18,6 +18,7 @@ package com.linkedin.pinot.integration.tests;
import com.linkedin.pinot.broker.broker.BrokerServerBuilder;
import com.linkedin.pinot.broker.broker.BrokerTestUtils;
import com.linkedin.pinot.broker.broker.helix.HelixBrokerStarter;
+import com.linkedin.pinot.common.config.IndexingConfig;
import com.linkedin.pinot.common.config.TableConfig;
import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.config.TableTaskConfig;
@@ -424,13 +425,23 @@ public abstract class ClusterTest extends ControllerTest {
.setTaskConfig(taskConfig)
.build();
+ // save the realtime table config
+ _realtimeTableConfig = tableConfig;
+
if (!isUsingNewConfigFormat()) {
sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJSONConfigString());
- } else {
- _realtimeTableConfig = tableConfig;
}
}
+ protected void updateRealtimeTableConfig(String tablename, List<String> invertedIndexCols, List<String> bloomFilterCols) throws Exception {
+
+ IndexingConfig config = _realtimeTableConfig.getIndexingConfig();
+ config.setInvertedIndexColumns(invertedIndexCols);
+ config.setBloomFilterColumns(bloomFilterCols);
+
+ sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tablename), _realtimeTableConfig.toJSONConfigString());
+ }
+
protected void dropRealtimeTable(String tableName) throws Exception {
sendDeleteRequest(
_controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName)));
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index f962747..d50cc71 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -15,15 +15,20 @@
*/
package com.linkedin.pinot.integration.tests;
+import com.google.common.base.Function;
import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.core.indexsegment.generator.SegmentVersion;
+import com.linkedin.pinot.util.TestUtils;
import java.io.File;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.avro.reflect.Nullable;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.FileUtils;
import org.apache.helix.ZNRecord;
+import org.json.JSONObject;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -33,6 +38,7 @@ import org.testng.annotations.Test;
* Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer.
*/
public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest {
+
public static final String CONSUMER_DIRECTORY = "/tmp/consumer-test";
public static final long RANDOM_SEED = System.currentTimeMillis();
public static final Random RANDOM = new Random(RANDOM_SEED);
@@ -40,6 +46,11 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
public final boolean _isDirectAlloc = RANDOM.nextBoolean();
public final boolean _isConsumerDirConfigured = RANDOM.nextBoolean();
+ private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
+ "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+ private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
+ Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime");
+
@BeforeClass
@Override
public void setUp() throws Exception {
@@ -83,6 +94,7 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
@Test
public void testSegmentFlushSize() throws Exception {
+
String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName());
List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0);
for (String segmentName : segmentNames) {
@@ -92,4 +104,34 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
"Segment: " + segmentName + " does not have the expected flush size");
}
}
+
+ @Test
+ public void testInvertedIndexTriggering() throws Exception {
+
+ final long numTotalDocs = getCountStarResult();
+
+ JSONObject queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+ Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs);
+ // TODO: investigate why assert for a specific value fails intermittently
+ Assert.assertNotSame(queryResponse.getLong("numEntriesScannedInFilter"), 0);
+
+ updateRealtimeTableConfig(getTableName(), UPDATED_INVERTED_INDEX_COLUMNS, null);
+
+ sendPostRequest(_controllerRequestURLBuilder.forTableReload(getTableName(), "realtime"), null);
+
+ TestUtils.waitForCondition(new Function<Void, Boolean>() {
+ @Override
+ public Boolean apply(@javax.annotation.Nullable Void aVoid) {
+ try {
+ JSONObject queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY);
+ // Total docs should not change during reload
+ Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs);
+ return queryResponse.getLong("numEntriesScannedInFilter") == 0;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }, 600_000L, "Failed to generate inverted index");
+ }
}
+
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 4e5febf..ad8d86f 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -50,10 +50,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
private static final int NUM_SEGMENTS = 12;
// For inverted index triggering test
- private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
- Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime");
private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
"SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
+ private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS =
+ Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime");
private static final List<String> UPDATED_BLOOM_FLITER_COLUMNS = Arrays.asList("Carrier");
private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
@@ -200,7 +200,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
throw new RuntimeException(e);
}
}
- }, 600_000L, "Failed to generate inverted index");
+ }, 600_000L, "Failed to generate bloomfilter");
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org