You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/12/10 20:17:33 UTC
[incubator-pinot] branch master updated: Adding offline dim table
creation and assignment (#6286)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 64d1054 Adding offline dim table creation and assignment (#6286)
64d1054 is described below
commit 64d1054e9f2d6f616fd1baee678a35ec60d5bcf5
Author: Dharak Kharod <dh...@uber.com>
AuthorDate: Thu Dec 10 12:17:13 2020 -0800
Adding offline dim table creation and assignment (#6286)
Add creation and segment assignment of a dim table. Following are the high level changes:
- We are adding a new configuration property `isDimTable` to identify a dim table
- If the table is identified as a dim table the segments will be assigned to all the hosts
---
.../common/utils/config/TableConfigUtils.java | 4 +-
.../segment/OfflineDimTableSegmentAssignment.java | 93 ++++++++++++++++++
.../segment/SegmentAssignmentFactory.java | 2 +-
.../assignment/segment/SegmentAssignmentUtils.java | 1 -
.../OfflineDimTableSegmentAssignmentTest.java | 108 +++++++++++++++++++++
.../manager/config/TableDataManagerConfig.java | 8 +-
.../apache/pinot/spi/config/table/TableConfig.java | 13 ++-
.../spi/utils/builder/TableConfigBuilder.java | 8 +-
8 files changed, 231 insertions(+), 6 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index 1685e74..508ce54 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -58,6 +58,7 @@ public class TableConfigUtils {
String tableName = znRecord.getId();
String tableType = simpleFields.get(TableConfig.TABLE_TYPE_KEY);
+ boolean isDimTable = Boolean.parseBoolean(simpleFields.get(TableConfig.IS_DIM_TABLE_KEY));
Preconditions.checkState(tableType != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.TABLE_TYPE_KEY);
String validationConfigString = simpleFields.get(TableConfig.VALIDATION_CONFIG_KEY);
@@ -140,7 +141,7 @@ public class TableConfigUtils {
return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
- ingestionConfig, tierConfigList);
+ ingestionConfig, tierConfigList, isDimTable);
}
public static ZNRecord toZNRecord(TableConfig tableConfig)
@@ -154,6 +155,7 @@ public class TableConfigUtils {
simpleFields.put(TableConfig.TENANT_CONFIG_KEY, tableConfig.getTenantConfig().toJsonString());
simpleFields.put(TableConfig.INDEXING_CONFIG_KEY, tableConfig.getIndexingConfig().toJsonString());
simpleFields.put(TableConfig.CUSTOM_CONFIG_KEY, tableConfig.getCustomConfig().toJsonString());
+ simpleFields.put(TableConfig.IS_DIM_TABLE_KEY, Boolean.toString(tableConfig.isDimTable()));
// Optional fields
QuotaConfig quotaConfig = tableConfig.getQuotaConfig();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignment.java
new file mode 100644
index 0000000..a14abbe
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignment.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+/**
+ * Segment assignment for an offline dimension table.
+ * <ul>
+ * <li>
+ * <p>This segment assignment strategy is used when {@link TableConfig#IS_DIM_TABLE_KEY}is
+ * set to "true".</p>
+ * </li>
+ * <li>
+ * <p>For a dimension table we assign the segment to all the hosts. Thus for this assignment
+ * strategy we simply return all the hosts under a given tag as the assigned hosts for
+ * a given segment.</p>
+ * </li>
+ * </ul>
+ */
+public class OfflineDimTableSegmentAssignment implements SegmentAssignment {
+
+ private HelixManager _helixManager;
+ private String _offlineTableName;
+ private TenantConfig _tenantConfig;
+
+ @Override
+ public void init(HelixManager helixManager, TableConfig tableConfig) {
+ Preconditions.checkState(tableConfig.isDimTable(), "Not a dimension table: %s" + _offlineTableName);
+ _helixManager = helixManager;
+ _offlineTableName = tableConfig.getTableName();
+ _tenantConfig = tableConfig.getTenantConfig();
+ }
+
+ @Override
+ public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+ String serverTag = TagNameUtils.extractOfflineServerTag(_tenantConfig);
+
+ List<String> instances = HelixHelper.getInstancesWithTag(_helixManager, serverTag);
+ int numInstances = instances.size();
+ Preconditions.checkState(numInstances > 0, "No instance found with tag: %s", serverTag);
+
+ return instances;
+ }
+
+ @Override
+ public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, @Nullable List<Tier> sortedTiers,
+ @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, Configuration config) {
+ String serverTag = TagNameUtils.extractOfflineServerTag(_tenantConfig);
+ List<String> instances = HelixHelper.getInstancesWithTag(_helixManager, serverTag);
+ Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+ for (String segment : currentAssignment.keySet()) {
+ newAssignment.put(segment, SegmentAssignmentUtils
+ .getInstanceStateMap(instances, CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE));
+ }
+ return newAssignment;
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
index a49f7b9..2a24066 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
@@ -33,7 +33,7 @@ public class SegmentAssignmentFactory {
public static SegmentAssignment getSegmentAssignment(HelixManager helixManager, TableConfig tableConfig) {
SegmentAssignment segmentAssignment;
if (tableConfig.getTableType() == TableType.OFFLINE) {
- segmentAssignment = new OfflineSegmentAssignment();
+ segmentAssignment = tableConfig.isDimTable() ? new OfflineDimTableSegmentAssignment() : new OfflineSegmentAssignment();
} else {
segmentAssignment = new RealtimeSegmentAssignment();
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 26b20b2..ed769df 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -31,7 +31,6 @@ import java.util.TreeMap;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.tier.Tier;
-import org.apache.pinot.common.tier.TierSegmentSelector;
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.common.utils.Pairs;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignmentTest.java
new file mode 100644
index 0000000..2876246
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignmentTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.helix.model.InstanceConfig.InstanceConfigProperty.TAG_LIST;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertEqualsNoOrder;
+import static org.testng.Assert.assertTrue;
+
+
+public class OfflineDimTableSegmentAssignmentTest {
+ private static final String INSTANCE_NAME_PREFIX = "instance_";
+ private static final int NUM_INSTANCES = 10;
+ private static final List<String> INSTANCES =
+ SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, NUM_INSTANCES);
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String OFFLINE_SERVER_TAG = "DefaultTenant_OFFLINE";
+ private static final String REALTIME_SERVER_TAG = "DefaultTenant_REALTIME";
+ private static final String BROKER_TAG = "DefaultTenant_Broker";
+ private static final String SEGMENT_NAME = "segment1";
+
+ private SegmentAssignment _segmentAssignment;
+ private HelixManager _helixManager;
+
+ @BeforeClass
+ public void setup() {
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIsDimTable(true).build();
+
+ _helixManager = mock(HelixManager.class);
+ _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
+ }
+
+ @Test
+ public void testFactory() {
+ assertTrue(_segmentAssignment instanceof OfflineDimTableSegmentAssignment);
+ }
+
+ @Test
+ public void testSegmentAssignmentAndRebalance() {
+ List<HelixProperty> instanceConfigList = new ArrayList<>();
+ for (String instance : INSTANCES) {
+ ZNRecord znRecord = new ZNRecord(instance);
+ znRecord.setListField(TAG_LIST.name(), ImmutableList.of(OFFLINE_SERVER_TAG, REALTIME_SERVER_TAG));
+ instanceConfigList.add(new InstanceConfig(znRecord));
+ }
+ HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
+ PropertyKey.Builder builder = new PropertyKey.Builder("cluster");
+ when(dataAccessor.keyBuilder()).thenReturn(builder);
+ when(dataAccessor.getChildValues(builder.instanceConfigs(), true)).thenReturn(instanceConfigList);
+ when(_helixManager.getHelixDataAccessor()).thenReturn(dataAccessor);
+
+ List<String> instances = _segmentAssignment.assignSegment(SEGMENT_NAME, new TreeMap(), new TreeMap());
+ assertEquals(instances.size(), NUM_INSTANCES);
+ assertEqualsNoOrder(instances.toArray(), INSTANCES.toArray());
+
+ // Remove one instance and rebalance table
+ Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+ Map<String, String> segment1Assginment = new TreeMap<>();
+ instances.stream().forEach(instance -> segment1Assginment.put(instance, "ONLINE"));
+ currentAssignment.put(SEGMENT_NAME, segment1Assginment);
+ ZNRecord znRecord = new ZNRecord(instanceConfigList.get(0).getId());
+ znRecord.setListField(TAG_LIST.name(), ImmutableList.of(BROKER_TAG));
+ InstanceConfig newInstanceConfig = new InstanceConfig(znRecord);
+ instanceConfigList.set(0, newInstanceConfig);
+ when(dataAccessor.getChildValues(builder.instanceConfigs(), true)).thenReturn(instanceConfigList);
+
+ Map<String, Map<String, String>> newAssignment =
+ _segmentAssignment.rebalanceTable(currentAssignment, new TreeMap<>(), null, null, null);
+ assertEquals(newAssignment.get(SEGMENT_NAME).size(), NUM_INSTANCES - 1);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
index 5fe390b..ca0de52 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
@@ -35,6 +35,7 @@ public class TableDataManagerConfig {
private static final String TABLE_DATA_MANAGER_DATA_DIRECTORY = "directory";
private static final String TABLE_DATA_MANAGER_CONSUMER_DIRECTORY = "consumerDirectory";
private static final String TABLE_DATA_MANAGER_NAME = "name";
+ private static final String TABLE_IS_DIMENSION = "isDimTable";
private final Configuration _tableDataManagerConfig;
@@ -62,6 +63,10 @@ public class TableDataManagerConfig {
return _tableDataManagerConfig.getString(TABLE_DATA_MANAGER_NAME);
}
+ public boolean isDimTable() {
+ return _tableDataManagerConfig.getBoolean(TABLE_IS_DIMENSION);
+ }
+
public static TableDataManagerConfig getDefaultHelixTableDataManagerConfig(
@Nonnull InstanceDataManagerConfig instanceDataManagerConfig, @Nonnull String tableNameWithType) {
Configuration defaultConfig = new PropertiesConfiguration();
@@ -79,7 +84,8 @@ public class TableDataManagerConfig {
public void overrideConfigs(@Nonnull TableConfig tableConfig) {
// Override table level configs
- // Currently we do not override any table level configs into TableDataManagerConfig
+ _tableDataManagerConfig.addProperty(TABLE_IS_DIMENSION, tableConfig.isDimTable());
+
// If we wish to override some table level configs using table config, override them here
// Note: the configs in TableDataManagerConfig is immutable once the table is created, which mean it will not pick
// up the latest table config
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index f3a6994..537009f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -37,6 +37,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
public class TableConfig extends BaseJsonConfig {
public static final String TABLE_NAME_KEY = "tableName";
public static final String TABLE_TYPE_KEY = "tableType";
+ public static final String IS_DIM_TABLE_KEY = "isDimTable";
public static final String VALIDATION_CONFIG_KEY = "segmentsConfig";
public static final String TENANT_CONFIG_KEY = "tenants";
public static final String INDEXING_CONFIG_KEY = "tableIndexConfig";
@@ -62,6 +63,9 @@ public class TableConfig extends BaseJsonConfig {
@JsonPropertyDescription(value = "The type of the table (OFFLINE|REALTIME) (mandatory)")
private final TableType _tableType;
+ @JsonPropertyDescription("Indicates whether the table is a dimension table or not")
+ private final boolean _dimTable;
+
private SegmentsValidationAndRetentionConfig _validationConfig;
private TenantConfig _tenantConfig;
private IndexingConfig _indexingConfig;
@@ -104,7 +108,8 @@ public class TableConfig extends BaseJsonConfig {
@JsonProperty(FIELD_CONFIG_LIST_KEY) @Nullable List<FieldConfig> fieldConfigList,
@JsonProperty(UPSERT_CONFIG_KEY) @Nullable UpsertConfig upsertConfig,
@JsonProperty(INGESTION_CONFIG_KEY) @Nullable IngestionConfig ingestionConfig,
- @JsonProperty(TIER_CONFIGS_LIST_KEY) @Nullable List<TierConfig> tierConfigsList) {
+ @JsonProperty(TIER_CONFIGS_LIST_KEY) @Nullable List<TierConfig> tierConfigsList,
+ @JsonProperty(IS_DIM_TABLE_KEY) boolean dimTable) {
Preconditions.checkArgument(tableName != null, "'tableName' must be configured");
Preconditions.checkArgument(!tableName.contains(TABLE_NAME_FORBIDDEN_SUBSTRING),
"'tableName' cannot contain double underscore ('__')");
@@ -130,6 +135,7 @@ public class TableConfig extends BaseJsonConfig {
_upsertConfig = upsertConfig;
_ingestionConfig = ingestionConfig;
_tierConfigsList = tierConfigsList;
+ _dimTable = dimTable;
}
@JsonProperty(TABLE_NAME_KEY)
@@ -142,6 +148,11 @@ public class TableConfig extends BaseJsonConfig {
return _tableType;
}
+ @JsonProperty(IS_DIM_TABLE_KEY)
+ public boolean isDimTable() {
+ return _dimTable;
+ }
+
@JsonProperty(VALIDATION_CONFIG_KEY)
public SegmentsValidationAndRetentionConfig getValidationConfig() {
return _validationConfig;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 3a762a5..52f05b4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -55,6 +55,7 @@ public class TableConfigBuilder {
private final TableType _tableType;
private String _tableName;
+ private boolean _isDimTable;
private boolean _isLLC;
// Segments config related
@@ -114,6 +115,11 @@ public class TableConfigBuilder {
return this;
}
+ public TableConfigBuilder setIsDimTable(boolean isDimTable) {
+ _isDimTable = isDimTable;
+ return this;
+ }
+
public TableConfigBuilder setLLC(boolean isLLC) {
Preconditions.checkState(_tableType == TableType.REALTIME);
_isLLC = isLLC;
@@ -380,6 +386,6 @@ public class TableConfigBuilder {
return new TableConfig(_tableName, _tableType.toString(), validationConfig, tenantConfig, indexingConfig,
_customConfig, _quotaConfig, _taskConfig, _routingConfig, _queryConfig, _instanceAssignmentConfigMap,
- _fieldConfigList, _upsertConfig, _ingestionConfig, _tierConfigList);
+ _fieldConfigList, _upsertConfig, _ingestionConfig, _tierConfigList, _isDimTable);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org