You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ri...@apache.org on 2022/03/21 12:03:38 UTC
[pinot] branch master updated: Refresh ZK metadata when dimension table is updated (#8133)
This is an automated email from the ASF dual-hosted git repository.
richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 681da61 Refresh ZK metadata when dimension table is updated (#8133)
681da61 is described below
commit 681da619a3921e6cd92179a0d638d5b4270100b6
Author: Mark Needham <m....@gmail.com>
AuthorDate: Mon Mar 21 12:03:10 2022 +0000
Refresh ZK metadata when dimension table is updated (#8133)
* Refresh ZK metadata when dimension table is updated
* Update DimensionTableDataManagerTest.java
* all fields into a volatile class in the CAS loop (as per Richard's feedback)
* license missing
* Return DimensionTable instead of passing it in
* Don't mutate the state of DimensionTable
---
.../core/data/manager/offline/DimensionTable.java | 57 ++++++++++++++++++++++
.../manager/offline/DimensionTableDataManager.java | 44 ++++++++++-------
.../offline/DimensionTableDataManagerTest.java | 57 ++++++++++++++++++++--
3 files changed, 136 insertions(+), 22 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
new file mode 100644
index 0000000..d738b5f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.offline;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+
+
+class DimensionTable {
+
+ private final Map<PrimaryKey, GenericRow> _lookupTable;
+ private final Schema _tableSchema;
+ private final List<String> _primaryKeyColumns;
+
+ DimensionTable(Schema tableSchema, List<String> primaryKeyColumns) {
+ this(tableSchema, primaryKeyColumns, new HashMap<>());
+ }
+
+ DimensionTable(Schema tableSchema, List<String> primaryKeyColumns, Map<PrimaryKey, GenericRow> lookupTable) {
+ _lookupTable = lookupTable;
+ _tableSchema = tableSchema;
+ _primaryKeyColumns = primaryKeyColumns;
+ }
+
+ List<String> getPrimaryKeyColumns() {
+ return _primaryKeyColumns;
+ }
+
+ GenericRow get(PrimaryKey pk) {
+ return _lookupTable.get(pk);
+ }
+
+ FieldSpec getFieldSpecFor(String columnName) {
+ return _tableSchema.getFieldSpecFor(columnName);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index f9a87c4..46a3b11 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -26,6 +26,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -48,6 +50,7 @@ import org.apache.pinot.spi.data.readers.PrimaryKey;
*/
@ThreadSafe
public class DimensionTableDataManager extends OfflineTableDataManager {
+
// Storing singletons per table in a HashMap
private static final Map<String, DimensionTableDataManager> INSTANCES = new ConcurrentHashMap<>();
@@ -73,19 +76,18 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
}
@SuppressWarnings("rawtypes")
- private static final AtomicReferenceFieldUpdater<DimensionTableDataManager, Map> UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(DimensionTableDataManager.class, Map.class, "_lookupTable");
- private volatile Map<PrimaryKey, GenericRow> _lookupTable = new HashMap<>();
- private Schema _tableSchema;
- private List<String> _primaryKeyColumns;
+ private static final AtomicReferenceFieldUpdater<DimensionTableDataManager, DimensionTable> UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DimensionTableDataManager.class,
+ DimensionTable.class, "_dimensionTable");
+
+ private volatile DimensionTable _dimensionTable;
@Override
protected void doInit() {
super.doInit();
-
// dimension tables should always have schemas with primary keys
- _tableSchema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
- _primaryKeyColumns = _tableSchema.getPrimaryKeyColumns();
+ Schema tableSchema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
+ _dimensionTable = new DimensionTable(tableSchema, tableSchema.getPrimaryKeyColumns());
}
@Override
@@ -118,17 +120,17 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
*/
private void loadLookupTable()
throws Exception {
- Map<PrimaryKey, GenericRow> snapshot;
- Map<PrimaryKey, GenericRow> replacement;
+ DimensionTable snapshot;
+ DimensionTable replacement;
do {
- snapshot = _lookupTable;
- replacement = new HashMap<>(snapshot.size());
- populate(replacement);
+ snapshot = _dimensionTable;
+ replacement = createDimensionTable();
} while (!UPDATER.compareAndSet(this, snapshot, replacement));
}
- private void populate(Map<PrimaryKey, GenericRow> map)
+ private DimensionTable createDimensionTable()
throws Exception {
+ Map<PrimaryKey, GenericRow> map = new HashMap<>();
List<SegmentDataManager> segmentManagers = acquireAllSegments();
try {
for (SegmentDataManager segmentManager : segmentManagers) {
@@ -137,10 +139,16 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
indexSegment.getSegmentMetadata().getIndexDir())) {
while (reader.hasNext()) {
GenericRow row = reader.next();
- map.put(row.getPrimaryKey(_primaryKeyColumns), row);
+ map.put(row.getPrimaryKey(_dimensionTable.getPrimaryKeyColumns()), row);
}
}
}
+
+ ZkHelixPropertyStore<ZNRecord> propertyStore = _helixManager.getHelixPropertyStore();
+ Schema tableSchema = ZKMetadataProvider.getTableSchema(propertyStore, _tableNameWithType);
+ List<String> primaryKeyColumns = tableSchema.getPrimaryKeyColumns();
+ return new DimensionTable(tableSchema, primaryKeyColumns, map);
+
} finally {
for (SegmentDataManager segmentManager : segmentManagers) {
releaseSegment(segmentManager);
@@ -149,14 +157,14 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
}
public GenericRow lookupRowByPrimaryKey(PrimaryKey pk) {
- return _lookupTable.get(pk);
+ return _dimensionTable.get(pk);
}
public FieldSpec getColumnFieldSpec(String columnName) {
- return _tableSchema.getFieldSpecFor(columnName);
+ return _dimensionTable.getFieldSpecFor(columnName);
}
public List<String> getPrimaryKeyColumns() {
- return _primaryKeyColumns;
+ return _dimensionTable.getPrimaryKeyColumns();
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 0daf200..dbce8cb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -100,7 +100,22 @@ public class DimensionTableDataManagerTest {
return propertyStore;
}
- private DimensionTableDataManager makeTestableManager() {
+ private ZkHelixPropertyStore mockPropertyStoreWithNewColumn() {
+ String baseballTeamsSchemaStr =
+ "{\"schemaName\":\"dimBaseballTeams\",\"dimensionFieldSpecs\":[{\"name\":\"teamID\",\"dataType\":\"STRING\"},"
+ + "{\"name\":\"teamName\",\"dataType\":\"STRING\"}, {\"name\":\"teamCity\",\"dataType\":\"STRING\"}],"
+ + "\"primaryKeyColumns\":[\"teamID\"]}";
+ ZNRecord zkSchemaRec = new ZNRecord("dimBaseballTeams");
+ zkSchemaRec.setSimpleField("schemaJSON", baseballTeamsSchemaStr);
+
+ ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+ when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)).
+ thenReturn(zkSchemaRec);
+
+ return propertyStore;
+ }
+
+ private DimensionTableDataManager makeTestableManager(HelixManager helixManager) {
DimensionTableDataManager tableDataManager = DimensionTableDataManager.createInstanceByTableName(TABLE_NAME);
TableDataManagerConfig config;
{
@@ -109,7 +124,7 @@ public class DimensionTableDataManagerTest {
when(config.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
}
tableDataManager.init(config, "dummyInstance", mockPropertyStore(),
- new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null);
+ new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null);
tableDataManager.start();
return tableDataManager;
@@ -118,7 +133,10 @@ public class DimensionTableDataManagerTest {
@Test
public void instantiationTests()
throws Exception {
- DimensionTableDataManager mgr = makeTestableManager();
+ HelixManager helixManager = mock(HelixManager.class);
+ ZkHelixPropertyStore propertyStore = mockPropertyStore();
+ when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+ DimensionTableDataManager mgr = makeTestableManager(helixManager);
Assert.assertEquals(mgr.getTableName(), TABLE_NAME);
// fetch the same instance via static method
@@ -144,7 +162,10 @@ public class DimensionTableDataManagerTest {
@Test
public void lookupTests()
throws Exception {
- DimensionTableDataManager mgr = makeTestableManager();
+ HelixManager helixManager = mock(HelixManager.class);
+ ZkHelixPropertyStore propertyStore = mockPropertyStore();
+ when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+ DimensionTableDataManager mgr = makeTestableManager(helixManager);
// try fetching data BEFORE loading segment
GenericRow resp = mgr.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
@@ -177,4 +198,32 @@ public class DimensionTableDataManagerTest {
resp = mgr.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
Assert.assertNull(resp, "Response should be null if no segment is loaded");
}
+
+ @Test
+ public void onRefreshDimensionTable()
+ throws Exception {
+ HelixManager helixManager = mock(HelixManager.class);
+ ZkHelixPropertyStore<ZNRecord> propertyStore = mockPropertyStoreWithNewColumn();
+ when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+
+ DimensionTableDataManager mgr = makeTestableManager(helixManager);
+
+ mgr.addSegment(_indexDir, _indexLoadingConfig);
+
+ // Confirm table is loaded and available for lookup
+ GenericRow resp = mgr.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
+ Assert.assertNotNull(resp, "Should return response after segment load");
+ Assert.assertEquals(resp.getValue("teamName"), "San Francisco Giants");
+
+ // WHEN (segment is refreshed)
+
+ mgr.addSegment(_indexDir, _indexLoadingConfig);
+
+ // THEN
+ FieldSpec teamCitySpec = mgr.getColumnFieldSpec("teamCity");
+ Assert.assertNotNull(teamCitySpec, "Should return spec for existing column");
+ Assert.assertEquals(teamCitySpec.getDataType(), FieldSpec.DataType.STRING,
+ "Should return correct data type for teamCity column");
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org