You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/12/10 20:17:33 UTC

[incubator-pinot] branch master updated: Adding offline dim table creation and assignment (#6286)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 64d1054  Adding offline dim table creation and assignment (#6286)
64d1054 is described below

commit 64d1054e9f2d6f616fd1baee678a35ec60d5bcf5
Author: Dharak Kharod <dh...@uber.com>
AuthorDate: Thu Dec 10 12:17:13 2020 -0800

    Adding offline dim table creation and assignment (#6286)
    
    Add creation and segment assignment of a dim table. Following are the high level changes:
    - We are adding a new configuration property `isDimTable` to identify a dim table
    - If the table is identified as a dim table the segments will be assigned to all the hosts
---
 .../common/utils/config/TableConfigUtils.java      |   4 +-
 .../segment/OfflineDimTableSegmentAssignment.java  |  93 ++++++++++++++++++
 .../segment/SegmentAssignmentFactory.java          |   2 +-
 .../assignment/segment/SegmentAssignmentUtils.java |   1 -
 .../OfflineDimTableSegmentAssignmentTest.java      | 108 +++++++++++++++++++++
 .../manager/config/TableDataManagerConfig.java     |   8 +-
 .../apache/pinot/spi/config/table/TableConfig.java |  13 ++-
 .../spi/utils/builder/TableConfigBuilder.java      |   8 +-
 8 files changed, 231 insertions(+), 6 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index 1685e74..508ce54 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -58,6 +58,7 @@ public class TableConfigUtils {
     String tableName = znRecord.getId();
 
     String tableType = simpleFields.get(TableConfig.TABLE_TYPE_KEY);
+    boolean isDimTable = Boolean.parseBoolean(simpleFields.get(TableConfig.IS_DIM_TABLE_KEY));
     Preconditions.checkState(tableType != null, FIELD_MISSING_MESSAGE_TEMPLATE, TableConfig.TABLE_TYPE_KEY);
 
     String validationConfigString = simpleFields.get(TableConfig.VALIDATION_CONFIG_KEY);
@@ -140,7 +141,7 @@ public class TableConfigUtils {
 
     return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
         quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
-        ingestionConfig, tierConfigList);
+        ingestionConfig, tierConfigList, isDimTable);
   }
 
   public static ZNRecord toZNRecord(TableConfig tableConfig)
@@ -154,6 +155,7 @@ public class TableConfigUtils {
     simpleFields.put(TableConfig.TENANT_CONFIG_KEY, tableConfig.getTenantConfig().toJsonString());
     simpleFields.put(TableConfig.INDEXING_CONFIG_KEY, tableConfig.getIndexingConfig().toJsonString());
     simpleFields.put(TableConfig.CUSTOM_CONFIG_KEY, tableConfig.getCustomConfig().toJsonString());
+    simpleFields.put(TableConfig.IS_DIM_TABLE_KEY, Boolean.toString(tableConfig.isDimTable()));
 
     // Optional fields
     QuotaConfig quotaConfig = tableConfig.getQuotaConfig();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignment.java
new file mode 100644
index 0000000..a14abbe
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignment.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.configuration.Configuration;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.tier.Tier;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+/**
+ * Segment assignment for an offline dimension table.
+ * <ul>
+ *   <li>
+ *     <p>This segment assignment strategy is used when {@link TableConfig#IS_DIM_TABLE_KEY}is
+ *     set to "true".</p>
+ *   </li>
+ *   <li>
+ *     <p>For a dimension table we assign the segment to all the hosts. Thus for this assignment
+ *     strategy we simply return all the hosts under a given tag as the assigned hosts for
+ *     a given segment.</p>
+ *   </li>
+ * </ul>
+ */
+public class OfflineDimTableSegmentAssignment implements SegmentAssignment {
+
+  private HelixManager _helixManager;
+  private String _offlineTableName;
+  private TenantConfig _tenantConfig;
+
+  @Override
+  public void init(HelixManager helixManager, TableConfig tableConfig) {
+    Preconditions.checkState(tableConfig.isDimTable(), "Not a dimension table: %s" + _offlineTableName);
+    _helixManager = helixManager;
+    _offlineTableName = tableConfig.getTableName();
+    _tenantConfig = tableConfig.getTenantConfig();
+  }
+
+  @Override
+  public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
+    String serverTag = TagNameUtils.extractOfflineServerTag(_tenantConfig);
+
+    List<String> instances = HelixHelper.getInstancesWithTag(_helixManager, serverTag);
+    int numInstances = instances.size();
+    Preconditions.checkState(numInstances > 0, "No instance found with tag: %s", serverTag);
+
+    return instances;
+  }
+
+  @Override
+  public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment,
+      Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, @Nullable List<Tier> sortedTiers,
+      @Nullable Map<String, InstancePartitions> tierInstancePartitionsMap, Configuration config) {
+    String serverTag = TagNameUtils.extractOfflineServerTag(_tenantConfig);
+    List<String> instances = HelixHelper.getInstancesWithTag(_helixManager, serverTag);
+    Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+    for (String segment : currentAssignment.keySet()) {
+      newAssignment.put(segment, SegmentAssignmentUtils
+          .getInstanceStateMap(instances, CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE));
+    }
+    return newAssignment;
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
index a49f7b9..2a24066 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentFactory.java
@@ -33,7 +33,7 @@ public class SegmentAssignmentFactory {
   public static SegmentAssignment getSegmentAssignment(HelixManager helixManager, TableConfig tableConfig) {
     SegmentAssignment segmentAssignment;
     if (tableConfig.getTableType() == TableType.OFFLINE) {
-      segmentAssignment = new OfflineSegmentAssignment();
+      segmentAssignment = tableConfig.isDimTable() ? new OfflineDimTableSegmentAssignment() : new OfflineSegmentAssignment();
     } else {
       segmentAssignment = new RealtimeSegmentAssignment();
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 26b20b2..ed769df 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -31,7 +31,6 @@ import java.util.TreeMap;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.tier.Tier;
-import org.apache.pinot.common.tier.TierSegmentSelector;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.common.utils.Pairs;
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignmentTest.java
new file mode 100644
index 0000000..2876246
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineDimTableSegmentAssignmentTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.helix.model.InstanceConfig.InstanceConfigProperty.TAG_LIST;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertEqualsNoOrder;
+import static org.testng.Assert.assertTrue;
+
+
+public class OfflineDimTableSegmentAssignmentTest {
+  private static final String INSTANCE_NAME_PREFIX = "instance_";
+  private static final int NUM_INSTANCES = 10;
+  private static final List<String> INSTANCES =
+      SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, NUM_INSTANCES);
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String OFFLINE_SERVER_TAG = "DefaultTenant_OFFLINE";
+  private static final String REALTIME_SERVER_TAG = "DefaultTenant_REALTIME";
+  private static final String BROKER_TAG = "DefaultTenant_Broker";
+  private static final String SEGMENT_NAME = "segment1";
+
+  private SegmentAssignment _segmentAssignment;
+  private HelixManager _helixManager;
+
+  @BeforeClass
+  public void setup() {
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIsDimTable(true).build();
+
+    _helixManager = mock(HelixManager.class);
+    _segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
+  }
+
+  @Test
+  public void testFactory() {
+    assertTrue(_segmentAssignment instanceof OfflineDimTableSegmentAssignment);
+  }
+
+  @Test
+  public void testSegmentAssignmentAndRebalance() {
+    List<HelixProperty> instanceConfigList = new ArrayList<>();
+    for (String instance : INSTANCES) {
+      ZNRecord znRecord = new ZNRecord(instance);
+      znRecord.setListField(TAG_LIST.name(), ImmutableList.of(OFFLINE_SERVER_TAG, REALTIME_SERVER_TAG));
+      instanceConfigList.add(new InstanceConfig(znRecord));
+    }
+    HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class);
+    PropertyKey.Builder builder = new PropertyKey.Builder("cluster");
+    when(dataAccessor.keyBuilder()).thenReturn(builder);
+    when(dataAccessor.getChildValues(builder.instanceConfigs(), true)).thenReturn(instanceConfigList);
+    when(_helixManager.getHelixDataAccessor()).thenReturn(dataAccessor);
+
+    List<String> instances = _segmentAssignment.assignSegment(SEGMENT_NAME, new TreeMap(), new TreeMap());
+    assertEquals(instances.size(), NUM_INSTANCES);
+    assertEqualsNoOrder(instances.toArray(), INSTANCES.toArray());
+
+    // Remove one instance and rebalance table
+    Map<String, Map<String, String>> currentAssignment = new TreeMap<>();
+    Map<String, String> segment1Assginment = new TreeMap<>();
+    instances.stream().forEach(instance -> segment1Assginment.put(instance, "ONLINE"));
+    currentAssignment.put(SEGMENT_NAME, segment1Assginment);
+    ZNRecord znRecord = new ZNRecord(instanceConfigList.get(0).getId());
+    znRecord.setListField(TAG_LIST.name(), ImmutableList.of(BROKER_TAG));
+    InstanceConfig newInstanceConfig = new InstanceConfig(znRecord);
+    instanceConfigList.set(0, newInstanceConfig);
+    when(dataAccessor.getChildValues(builder.instanceConfigs(), true)).thenReturn(instanceConfigList);
+
+    Map<String, Map<String, String>> newAssignment =
+        _segmentAssignment.rebalanceTable(currentAssignment, new TreeMap<>(), null, null, null);
+    assertEquals(newAssignment.get(SEGMENT_NAME).size(), NUM_INSTANCES - 1);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
index 5fe390b..ca0de52 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/config/TableDataManagerConfig.java
@@ -35,6 +35,7 @@ public class TableDataManagerConfig {
   private static final String TABLE_DATA_MANAGER_DATA_DIRECTORY = "directory";
   private static final String TABLE_DATA_MANAGER_CONSUMER_DIRECTORY = "consumerDirectory";
   private static final String TABLE_DATA_MANAGER_NAME = "name";
+  private static final String TABLE_IS_DIMENSION = "isDimTable";
 
   private final Configuration _tableDataManagerConfig;
 
@@ -62,6 +63,10 @@ public class TableDataManagerConfig {
     return _tableDataManagerConfig.getString(TABLE_DATA_MANAGER_NAME);
   }
 
+  public boolean isDimTable() {
+    return _tableDataManagerConfig.getBoolean(TABLE_IS_DIMENSION);
+  }
+
   public static TableDataManagerConfig getDefaultHelixTableDataManagerConfig(
       @Nonnull InstanceDataManagerConfig instanceDataManagerConfig, @Nonnull String tableNameWithType) {
     Configuration defaultConfig = new PropertiesConfiguration();
@@ -79,7 +84,8 @@ public class TableDataManagerConfig {
   public void overrideConfigs(@Nonnull TableConfig tableConfig) {
     // Override table level configs
 
-    // Currently we do not override any table level configs into TableDataManagerConfig
+    _tableDataManagerConfig.addProperty(TABLE_IS_DIMENSION, tableConfig.isDimTable());
+
     // If we wish to override some table level configs using table config, override them here
     // Note: the configs in TableDataManagerConfig is immutable once the table is created, which mean it will not pick
     // up the latest table config
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index f3a6994..537009f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -37,6 +37,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 public class TableConfig extends BaseJsonConfig {
   public static final String TABLE_NAME_KEY = "tableName";
   public static final String TABLE_TYPE_KEY = "tableType";
+  public static final String IS_DIM_TABLE_KEY = "isDimTable";
   public static final String VALIDATION_CONFIG_KEY = "segmentsConfig";
   public static final String TENANT_CONFIG_KEY = "tenants";
   public static final String INDEXING_CONFIG_KEY = "tableIndexConfig";
@@ -62,6 +63,9 @@ public class TableConfig extends BaseJsonConfig {
   @JsonPropertyDescription(value = "The type of the table (OFFLINE|REALTIME) (mandatory)")
   private final TableType _tableType;
 
+  @JsonPropertyDescription("Indicates whether the table is a dimension table or not")
+  private final boolean _dimTable;
+
   private SegmentsValidationAndRetentionConfig _validationConfig;
   private TenantConfig _tenantConfig;
   private IndexingConfig _indexingConfig;
@@ -104,7 +108,8 @@ public class TableConfig extends BaseJsonConfig {
       @JsonProperty(FIELD_CONFIG_LIST_KEY) @Nullable List<FieldConfig> fieldConfigList,
       @JsonProperty(UPSERT_CONFIG_KEY) @Nullable UpsertConfig upsertConfig,
       @JsonProperty(INGESTION_CONFIG_KEY) @Nullable IngestionConfig ingestionConfig,
-      @JsonProperty(TIER_CONFIGS_LIST_KEY) @Nullable List<TierConfig> tierConfigsList) {
+      @JsonProperty(TIER_CONFIGS_LIST_KEY) @Nullable List<TierConfig> tierConfigsList,
+      @JsonProperty(IS_DIM_TABLE_KEY) boolean dimTable) {
     Preconditions.checkArgument(tableName != null, "'tableName' must be configured");
     Preconditions.checkArgument(!tableName.contains(TABLE_NAME_FORBIDDEN_SUBSTRING),
         "'tableName' cannot contain double underscore ('__')");
@@ -130,6 +135,7 @@ public class TableConfig extends BaseJsonConfig {
     _upsertConfig = upsertConfig;
     _ingestionConfig = ingestionConfig;
     _tierConfigsList = tierConfigsList;
+    _dimTable = dimTable;
   }
 
   @JsonProperty(TABLE_NAME_KEY)
@@ -142,6 +148,11 @@ public class TableConfig extends BaseJsonConfig {
     return _tableType;
   }
 
+  @JsonProperty(IS_DIM_TABLE_KEY)
+  public boolean isDimTable() {
+    return _dimTable;
+  }
+
   @JsonProperty(VALIDATION_CONFIG_KEY)
   public SegmentsValidationAndRetentionConfig getValidationConfig() {
     return _validationConfig;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 3a762a5..52f05b4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -55,6 +55,7 @@ public class TableConfigBuilder {
 
   private final TableType _tableType;
   private String _tableName;
+  private boolean _isDimTable;
   private boolean _isLLC;
 
   // Segments config related
@@ -114,6 +115,11 @@ public class TableConfigBuilder {
     return this;
   }
 
+  public TableConfigBuilder setIsDimTable(boolean isDimTable) {
+    _isDimTable = isDimTable;
+    return this;
+  }
+
   public TableConfigBuilder setLLC(boolean isLLC) {
     Preconditions.checkState(_tableType == TableType.REALTIME);
     _isLLC = isLLC;
@@ -380,6 +386,6 @@ public class TableConfigBuilder {
 
     return new TableConfig(_tableName, _tableType.toString(), validationConfig, tenantConfig, indexingConfig,
         _customConfig, _quotaConfig, _taskConfig, _routingConfig, _queryConfig, _instanceAssignmentConfigMap,
-        _fieldConfigList, _upsertConfig, _ingestionConfig, _tierConfigList);
+        _fieldConfigList, _upsertConfig, _ingestionConfig, _tierConfigList, _isDimTable);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org