You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ri...@apache.org on 2022/03/21 12:03:38 UTC

[pinot] branch master updated: Refresh ZK metadata when dimension table is updated (#8133)

This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 681da61  Refresh ZK metadata when dimension table is updated (#8133)
681da61 is described below

commit 681da619a3921e6cd92179a0d638d5b4270100b6
Author: Mark Needham <m....@gmail.com>
AuthorDate: Mon Mar 21 12:03:10 2022 +0000

    Refresh ZK metadata when dimension table is updated (#8133)
    
    * Refresh ZK metadata when dimension table is updated
    
    * Update DimensionTableDataManagerTest.java
    
    * all fields into a volatile class in the CAS loop (as per Richard's feedback)
    
    * license missing
    
    * Return DimensionTable instead of passing it in
    
    * Don't mutate the state of DimensionTable
---
 .../core/data/manager/offline/DimensionTable.java  | 57 ++++++++++++++++++++++
 .../manager/offline/DimensionTableDataManager.java | 44 ++++++++++-------
 .../offline/DimensionTableDataManagerTest.java     | 57 ++++++++++++++++++++--
 3 files changed, 136 insertions(+), 22 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
new file mode 100644
index 0000000..d738b5f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTable.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.offline;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.PrimaryKey;
+
+
+class DimensionTable {
+
+  private final Map<PrimaryKey, GenericRow> _lookupTable;
+  private final Schema _tableSchema;
+  private final List<String> _primaryKeyColumns;
+
+  DimensionTable(Schema tableSchema, List<String> primaryKeyColumns) {
+    this(tableSchema, primaryKeyColumns, new HashMap<>());
+  }
+
+  DimensionTable(Schema tableSchema, List<String> primaryKeyColumns, Map<PrimaryKey, GenericRow> lookupTable) {
+    _lookupTable = lookupTable;
+    _tableSchema = tableSchema;
+    _primaryKeyColumns = primaryKeyColumns;
+  }
+
+  List<String> getPrimaryKeyColumns() {
+    return _primaryKeyColumns;
+  }
+
+  GenericRow get(PrimaryKey pk) {
+    return _lookupTable.get(pk);
+  }
+
+  FieldSpec getFieldSpecFor(String columnName) {
+    return _tableSchema.getFieldSpecFor(columnName);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index f9a87c4..46a3b11 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -48,6 +50,7 @@ import org.apache.pinot.spi.data.readers.PrimaryKey;
  */
 @ThreadSafe
 public class DimensionTableDataManager extends OfflineTableDataManager {
+
   // Storing singletons per table in a HashMap
   private static final Map<String, DimensionTableDataManager> INSTANCES = new ConcurrentHashMap<>();
 
@@ -73,19 +76,18 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
   }
 
   @SuppressWarnings("rawtypes")
-  private static final AtomicReferenceFieldUpdater<DimensionTableDataManager, Map> UPDATER =
-      AtomicReferenceFieldUpdater.newUpdater(DimensionTableDataManager.class, Map.class, "_lookupTable");
-  private volatile Map<PrimaryKey, GenericRow> _lookupTable = new HashMap<>();
-  private Schema _tableSchema;
-  private List<String> _primaryKeyColumns;
+  private static final AtomicReferenceFieldUpdater<DimensionTableDataManager, DimensionTable> UPDATER =
+      AtomicReferenceFieldUpdater.newUpdater(DimensionTableDataManager.class,
+          DimensionTable.class, "_dimensionTable");
+
+  private volatile DimensionTable _dimensionTable;
 
   @Override
   protected void doInit() {
     super.doInit();
-
     // dimension tables should always have schemas with primary keys
-    _tableSchema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
-    _primaryKeyColumns = _tableSchema.getPrimaryKeyColumns();
+    Schema tableSchema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
+    _dimensionTable = new DimensionTable(tableSchema, tableSchema.getPrimaryKeyColumns());
   }
 
   @Override
@@ -118,17 +120,17 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
    */
   private void loadLookupTable()
       throws Exception {
-    Map<PrimaryKey, GenericRow> snapshot;
-    Map<PrimaryKey, GenericRow> replacement;
+    DimensionTable snapshot;
+    DimensionTable replacement;
     do {
-      snapshot = _lookupTable;
-      replacement = new HashMap<>(snapshot.size());
-      populate(replacement);
+      snapshot = _dimensionTable;
+      replacement = createDimensionTable();
     } while (!UPDATER.compareAndSet(this, snapshot, replacement));
   }
 
-  private void populate(Map<PrimaryKey, GenericRow> map)
+  private DimensionTable createDimensionTable()
       throws Exception {
+    Map<PrimaryKey, GenericRow> map = new HashMap<>();
     List<SegmentDataManager> segmentManagers = acquireAllSegments();
     try {
       for (SegmentDataManager segmentManager : segmentManagers) {
@@ -137,10 +139,16 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
             indexSegment.getSegmentMetadata().getIndexDir())) {
           while (reader.hasNext()) {
             GenericRow row = reader.next();
-            map.put(row.getPrimaryKey(_primaryKeyColumns), row);
+            map.put(row.getPrimaryKey(_dimensionTable.getPrimaryKeyColumns()), row);
           }
         }
       }
+
+      ZkHelixPropertyStore<ZNRecord> propertyStore = _helixManager.getHelixPropertyStore();
+      Schema tableSchema = ZKMetadataProvider.getTableSchema(propertyStore, _tableNameWithType);
+      List<String> primaryKeyColumns = tableSchema.getPrimaryKeyColumns();
+      return new DimensionTable(tableSchema, primaryKeyColumns, map);
+
     } finally {
       for (SegmentDataManager segmentManager : segmentManagers) {
         releaseSegment(segmentManager);
@@ -149,14 +157,14 @@ public class DimensionTableDataManager extends OfflineTableDataManager {
   }
 
   public GenericRow lookupRowByPrimaryKey(PrimaryKey pk) {
-    return _lookupTable.get(pk);
+    return _dimensionTable.get(pk);
   }
 
   public FieldSpec getColumnFieldSpec(String columnName) {
-    return _tableSchema.getFieldSpecFor(columnName);
+    return _dimensionTable.getFieldSpecFor(columnName);
   }
 
   public List<String> getPrimaryKeyColumns() {
-    return _primaryKeyColumns;
+    return _dimensionTable.getPrimaryKeyColumns();
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 0daf200..dbce8cb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -100,7 +100,22 @@ public class DimensionTableDataManagerTest {
     return propertyStore;
   }
 
-  private DimensionTableDataManager makeTestableManager() {
+  private ZkHelixPropertyStore mockPropertyStoreWithNewColumn() {
+    String baseballTeamsSchemaStr =
+        "{\"schemaName\":\"dimBaseballTeams\",\"dimensionFieldSpecs\":[{\"name\":\"teamID\",\"dataType\":\"STRING\"},"
+            + "{\"name\":\"teamName\",\"dataType\":\"STRING\"}, {\"name\":\"teamCity\",\"dataType\":\"STRING\"}],"
+            + "\"primaryKeyColumns\":[\"teamID\"]}";
+    ZNRecord zkSchemaRec = new ZNRecord("dimBaseballTeams");
+    zkSchemaRec.setSimpleField("schemaJSON", baseballTeamsSchemaStr);
+
+    ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
+    when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)).
+        thenReturn(zkSchemaRec);
+
+    return propertyStore;
+  }
+
+  private DimensionTableDataManager makeTestableManager(HelixManager helixManager) {
     DimensionTableDataManager tableDataManager = DimensionTableDataManager.createInstanceByTableName(TABLE_NAME);
     TableDataManagerConfig config;
     {
@@ -109,7 +124,7 @@ public class DimensionTableDataManagerTest {
       when(config.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
     }
     tableDataManager.init(config, "dummyInstance", mockPropertyStore(),
-        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), mock(HelixManager.class), null);
+        new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), helixManager, null);
     tableDataManager.start();
 
     return tableDataManager;
@@ -118,7 +133,10 @@ public class DimensionTableDataManagerTest {
   @Test
   public void instantiationTests()
       throws Exception {
-    DimensionTableDataManager mgr = makeTestableManager();
+    HelixManager helixManager = mock(HelixManager.class);
+    ZkHelixPropertyStore propertyStore = mockPropertyStore();
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    DimensionTableDataManager mgr = makeTestableManager(helixManager);
     Assert.assertEquals(mgr.getTableName(), TABLE_NAME);
 
     // fetch the same instance via static method
@@ -144,7 +162,10 @@ public class DimensionTableDataManagerTest {
   @Test
   public void lookupTests()
       throws Exception {
-    DimensionTableDataManager mgr = makeTestableManager();
+    HelixManager helixManager = mock(HelixManager.class);
+    ZkHelixPropertyStore propertyStore = mockPropertyStore();
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+    DimensionTableDataManager mgr = makeTestableManager(helixManager);
 
     // try fetching data BEFORE loading segment
     GenericRow resp = mgr.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
@@ -177,4 +198,32 @@ public class DimensionTableDataManagerTest {
     resp = mgr.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
     Assert.assertNull(resp, "Response should be null if no segment is loaded");
   }
+
+  @Test
+  public void onRefreshDimensionTable()
+      throws Exception {
+    HelixManager helixManager = mock(HelixManager.class);
+    ZkHelixPropertyStore<ZNRecord> propertyStore = mockPropertyStoreWithNewColumn();
+    when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+
+    DimensionTableDataManager mgr = makeTestableManager(helixManager);
+
+    mgr.addSegment(_indexDir, _indexLoadingConfig);
+
+    // Confirm table is loaded and available for lookup
+    GenericRow resp = mgr.lookupRowByPrimaryKey(new PrimaryKey(new String[]{"SF"}));
+    Assert.assertNotNull(resp, "Should return response after segment load");
+    Assert.assertEquals(resp.getValue("teamName"), "San Francisco Giants");
+
+    // WHEN (segment is refreshed)
+
+    mgr.addSegment(_indexDir, _indexLoadingConfig);
+
+    // THEN
+    FieldSpec teamCitySpec = mgr.getColumnFieldSpec("teamCity");
+    Assert.assertNotNull(teamCitySpec, "Should return spec for existing column");
+    Assert.assertEquals(teamCitySpec.getDataType(), FieldSpec.DataType.STRING,
+        "Should return correct data type for teamCity column");
+
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org