You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/23 19:27:35 UTC

[incubator-pinot] 05/09: test build

This is an automated email from the ASF dual-hosted git repository.

jamesshao pushed a commit to branch upsert-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 592fb5f2fdd9ce5c6d3e3275de522c20a4e96f0c
Author: Uber Jenkins <pa...@uber.com>
AuthorDate: Thu Feb 13 23:55:51 2020 +0000

    test build
    
    Summary: test unit test
    
    Differential Revision: https://code.uberinternal.com/D4105485
---
 .../VirtualColumnLongValueReaderWriter.java        |   7 +-
 .../SegmentUpdateLogStorageProvider.java           |   9 +-
 .../storageProvider/UpdateLogStorageExplorer.java  |  25 ++-
 .../UpsertImmutableIndexSegmentCallback.java       |  44 +++--
 .../pinot/core/segment/updater/SegmentUpdater.java |  74 ++++-----
 .../updater/SegmentUpdaterDataManagerHolder.java   | 131 +++++++++++++++
 .../SegmentUpdaterDataManagerHolderTest.java       | 184 +++++++++++++++++++++
 .../server/api/resources/UpsertDebugResource.java  |  17 +-
 8 files changed, 421 insertions(+), 70 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
index 78f3bda..dea717c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
@@ -64,7 +64,12 @@ public abstract class VirtualColumnLongValueReaderWriter extends BaseVirtualColu
 
   @Override
   public long getLong(int row) {
-    return _values[row];
+    if (row >= 0 && row < _totalDocSize) {
+      return _values[row];
+    } else {
+      throw new RuntimeException(String.format("trying to fetch row %d while we only have total row count %d", row,
+          _totalDocSize));
+    }
   }
 
   @Override
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
index e7b035f..fa045d1 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
@@ -32,6 +32,7 @@ import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * provide the storage abstraction of storing upsert update event logs to a local disk so we can reload it
@@ -44,6 +45,7 @@ public class SegmentUpdateLogStorageProvider {
   protected final File _file;
   @VisibleForTesting
   protected final FileOutputStream _outputStream;
+  private AtomicInteger messageCountInFile = new AtomicInteger(0);
 
   public SegmentUpdateLogStorageProvider(File file)
       throws IOException {
@@ -60,6 +62,7 @@ public class SegmentUpdateLogStorageProvider {
       ByteBuffer buffer = ByteBuffer.allocate(fileLength);
       readFullyFromBeginning(_file, buffer);
       int messageCount = fileLength / UpdateLogEntry.SIZE;
+      LOGGER.info("read {} messages from file {}", messageCount, _file.getName());
       return new UpdateLogEntrySet(buffer, messageCount);
     } else {
       return UpdateLogEntrySet.getEmptySet();
@@ -74,7 +77,8 @@ public class SegmentUpdateLogStorageProvider {
     buffer.flip();
     _outputStream.write(buffer.array());
     _outputStream.flush();
-
+    messageCountInFile.getAndAdd(messages.size());
+    LOGGER.debug("file {} message count {}", _file.getName(), messageCountInFile.get());
   }
 
   public synchronized void destroy() throws IOException {
@@ -102,6 +106,7 @@ public class SegmentUpdateLogStorageProvider {
           segmentUpdateFile.length(), newSize);
       channel.truncate(newSize);
       channel.force(false);
+      messageCountInFile.set(Math.toIntExact(newSize / UpdateLogEntry.SIZE));
     }
   }
 
@@ -116,7 +121,7 @@ public class SegmentUpdateLogStorageProvider {
       position += byteRead;
     } while (byteRead != -1 && buffer.hasRemaining());
     buffer.flip();
-    LOGGER.info("read all data from segment update file {} to buffer in {} ms", segmentUpdateFile.getName(),
+    LOGGER.info("read {} bytes from segment update file {} to buffer in {} ms", position, segmentUpdateFile.getName(),
         System.currentTimeMillis() - start);
   }
 
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
index 0e38deb..a2b53ee 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
@@ -27,6 +27,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Scanner;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * command line tools for debug pinot server by allowing us to interatively explore the update log data in pinot server/kc
@@ -52,24 +53,38 @@ public class UpdateLogStorageExplorer {
     String[] inputSplits = input.split(" ");
     Preconditions.checkState(inputSplits.length == 2, "expect input data to be 'tableName segmentName'");
     String tableName = inputSplits[0];
+    if (!tableName.endsWith("_REALTIME")) {
+      tableName = tableName + "_REALTIME";
+    }
     String segmentName = inputSplits[1];
 
     provider.loadTable(tableName);
     UpdateLogEntrySet updateLogEntrySet = provider.getAllMessages(tableName, segmentName);
-    Multimap<Long, UpdateLogEntry> map = ArrayListMultimap.create();
+    Multimap<Long, UpdateLogAndPos> map = ArrayListMultimap.create();
     System.out.println("update log size: " + updateLogEntrySet.size());
+    AtomicInteger pos = new AtomicInteger(0);
     updateLogEntrySet.forEach(u -> {
-      map.put(u.getOffset(), u);
+      map.put(u.getOffset(), new UpdateLogAndPos(u, pos.getAndIncrement()));
     });
 
     while (true) {
       System.out.println("input the offset");
       long offset = reader.nextLong();
-      Collection<UpdateLogEntry> result = map.get(offset);
+      Collection<UpdateLogAndPos> result = map.get(offset);
       System.out.println("associated update logs size: " + result.size());
-      for (UpdateLogEntry entry: result) {
-        System.out.println("content: " + entry.toString());
+      for (UpdateLogAndPos entry: result) {
+        System.out.println("content: " + entry.logEntry.toString() + " pos " + entry.pos);
       }
     }
   }
+
+  static class UpdateLogAndPos {
+    public UpdateLogEntry logEntry;
+    public int pos;
+
+    public UpdateLogAndPos(UpdateLogEntry entry, int pos) {
+      this.logEntry = entry;
+      this.pos = pos;
+    }
+  }
 }
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
index b7fcdb8..d0667b8 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
@@ -130,9 +130,9 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
       throw new RuntimeException("unexpected forward reader type for kafka offset column " + reader.getClass());
     }
     LOGGER.info("built offset to DocId map for segment {} with {} documents in {} ms", _segmentName, _totalDoc, System.currentTimeMillis() - start);
+    LOGGER.info("immutable segment {} built offset map with minOffset {} and maxOffset {}", _segmentName, minOffset, maxOffset);
   }
 
-  @Override
   public void postProcessRecords(GenericRow row, int docId) {
     // do nothing
   }
@@ -150,7 +150,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
         updateLogEntries.size(), _segmentName, System.currentTimeMillis() - start);
 
     start = System.currentTimeMillis();
-    final long maxOffset = _totalDoc + _minSourceOffset;
+    final long maxOffset = _minSourceOffset + _sourceOffsetToDocIdArray.length;
     int unmatchedLogEntryCount = 0;
     try {
       Map<Integer, Long> partitionToHighestWatermark = new HashMap<>();
@@ -193,13 +193,19 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
   @Override
   public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
     for (UpdateLogEntry logEntry: logEntries) {
-      boolean updated = false;
-      int docId = getDocIdFromSourceOffset(logEntry.getOffset());
-      for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
-        updated = readerWriter.update(docId, logEntry.getValue(), logEntry.getType()) || updated;
-      }
-      if (updated) {
-        _upsertWatermarkManager.processMessage(_tableNameWithType, _segmentName, logEntry);
+      try {
+        boolean updated = false;
+        int docId = getDocIdFromSourceOffset(logEntry.getOffset());
+        if (docId != DEFAULT_DOC_ID_FOR_MISSING_ENTRY) {
+          for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
+            updated = readerWriter.update(docId, logEntry.getValue(), logEntry.getType()) || updated;
+          }
+          if (updated) {
+            _upsertWatermarkManager.processMessage(_tableNameWithType, _segmentName, logEntry);
+          }
+        }
+      } catch (Exception ex) {
+        LOGGER.warn("failed to update virtual column, skipping the current record {}", logEntries.toString());
       }
     }
   }
@@ -207,11 +213,15 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
   @Override
   public String getVirtualColumnInfo(long offset) {
     int docId = getDocIdFromSourceOffset(offset);
-    StringBuilder result = new StringBuilder("matched: ");
-    for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
-      result.append(readerWriter.getInt(docId)).append("; ");
+    if (docId != DEFAULT_DOC_ID_FOR_MISSING_ENTRY) {
+      StringBuilder result = new StringBuilder("matched: ");
+      for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
+        result.append(readerWriter.getLong(docId)).append("; ");
+      }
+      return result.toString();
+    } else {
+      return "cannot found doc matching offset";
     }
-    return result.toString();
   }
 
   /**
@@ -222,14 +232,14 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
    */
   private int getDocIdFromSourceOffset(long offset) throws RuntimeException {
     if (offset < _minSourceOffset || offset - _minSourceOffset >= _sourceOffsetToDocIdArray.length) {
-      LOGGER.error("offset {} is outside range for current segment {} start offset {} size {}",
+      LOGGER.warn("offset {} is outside of range for current segment {} start offset {} size {}",
           offset, _segmentName, _minSourceOffset, _sourceOffsetToDocIdArray.length);
-      throw new RuntimeException("offset outside range");
+      return DEFAULT_DOC_ID_FOR_MISSING_ENTRY;
     } else {
       int position = Math.toIntExact(offset - _minSourceOffset);
       if (_sourceOffsetToDocIdArray[position] == DEFAULT_DOC_ID_FOR_MISSING_ENTRY) {
-        LOGGER.error("no docId associated with offset {} for segment {}", offset, _segmentName);
-        throw new RuntimeException("docId not found");
+        LOGGER.warn("no docId associated with offset {} for segment {}", offset, _segmentName);
+        return DEFAULT_DOC_ID_FOR_MISSING_ENTRY;
       } else {
         return _sourceOffsetToDocIdArray[position];
       }
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
index 5f0aa43..f380afa 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.segment.updater;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Uninterruptibles;
-import io.netty.util.internal.ConcurrentSet;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.utils.CommonConstants;
@@ -36,6 +35,7 @@ import org.apache.pinot.grigio.common.rpcQueue.QueueConsumerRecord;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
 import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogRetentionManager;
+import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogTableRetentionManager;
 import org.apache.pinot.grigio.common.utils.CommonUtils;
 import org.apache.pinot.grigio.servers.SegmentUpdaterProvider;
 import org.slf4j.Logger;
@@ -46,7 +46,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -75,7 +74,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
   private final String _topicPrefix;
   private final ExecutorService _ingestionExecutorService;
   private final QueueConsumer _consumer;
-  private final Map<String, Map<String, Set<DataManagerCallback>>> _tableSegmentMap = new ConcurrentHashMap<>();
+  private final SegmentUpdaterDataManagerHolder _dataManagersHolder = new SegmentUpdaterDataManagerHolder();
   private final Map<String, Map<Integer, Long>> _tablePartitionCreationTime = new ConcurrentHashMap<>();
   private final UpdateLogStorageProvider _updateLogStorageProvider;
   private final UpdateLogRetentionManager _retentionManager;
@@ -105,7 +104,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
 
 
   public void start() {
-    String listOfTables = Joiner.on(",").join(_tableSegmentMap.keySet());
+    String listOfTables = Joiner.on(",").join(_dataManagersHolder.getAllTables());
     LOGGER.info("starting segment updater main loop with the following table in server: {}", listOfTables);
     _ingestionExecutorService.submit(this::updateLoop);
   }
@@ -162,10 +161,9 @@ public class SegmentUpdater implements SegmentDeletionListener {
         for (Map.Entry<String, TableUpdateLogs> entry : tableSegmentToUpdateLogs.entrySet()) {
           String tableName = TableNameBuilder.ensureTableNameWithType(entry.getKey(), CommonConstants.Helix.TableType.REALTIME);
           int tableMessageCount = 0;
-          if (_tableSegmentMap.containsKey(tableName)) {
-            final Map<String, Set<DataManagerCallback>> segmentManagersMap = _tableSegmentMap.get(tableName);
+          if (_dataManagersHolder.hasTable(tableName)) {
             final TableUpdateLogs segment2UpdateLogsMap = entry.getValue();
-            updateSegmentVirtualColumns(tableName, segmentManagersMap, segment2UpdateLogsMap, timeToStoreUpdateLogs);
+            updateSegmentVirtualColumns(tableName, segment2UpdateLogsMap, timeToStoreUpdateLogs);
           } else {
             LOGGER.warn("got messages for table {} not in this server", tableName);
           }
@@ -202,12 +200,11 @@ public class SegmentUpdater implements SegmentDeletionListener {
   /**
    * Update the virtual columns of affected segments of a table.
    */
-  private void updateSegmentVirtualColumns(String tableName, Map<String, Set<DataManagerCallback>> segmentManagersMap,
-                                           TableUpdateLogs segment2UpdateLogsMap, AtomicLong timeToStoreUpdateLogs) throws IOException{
+  private void updateSegmentVirtualColumns(String tableName, TableUpdateLogs segment2UpdateLogsMap,
+                                           AtomicLong timeToStoreUpdateLogs) throws IOException{
     for (Map.Entry<String, List<UpdateLogEntry>> segmentEntry : segment2UpdateLogsMap.getSegments2UpdateLog().entrySet()) {
       final String segmentNameStr = segmentEntry.getKey();
       updateVirtualColumn(tableName, segmentNameStr,
-          segmentManagersMap.computeIfAbsent(segmentNameStr, sn -> new ConcurrentSet<>()),
           segment2UpdateLogsMap.get(segmentNameStr), timeToStoreUpdateLogs);
     }
   }
@@ -217,19 +214,21 @@ public class SegmentUpdater implements SegmentDeletionListener {
    * from consuming to online (mutable segment to immutable segment). In most of cases we expect only one segment manager
    * in this set of UpsertSegmentDataManager
    */
-  private void updateVirtualColumn(String table, String segment, Set<DataManagerCallback> segmentDataManagers,
+  private void updateVirtualColumn(String table, String segment,
                                    List<UpdateLogEntry> messages, AtomicLong timeToStoreUpdateLogs) throws IOException {
+    Set<DataManagerCallback> dataManagers = _dataManagersHolder.getDataManagers(table, segment);
     LOGGER.debug("updating segment {} with {} results for {} data managers", segment, messages.size(),
-        segmentDataManagers.size());
-    if (segmentDataManagers.size() > 0 || _retentionManager.getRetentionManagerForTable(table).shouldIngestForSegment(segment)) {
+        dataManagers.size());
+    if (dataManagers.size() > 0 || _retentionManager.getRetentionManagerForTable(table).shouldIngestForSegment(segment)) {
       storeUpdateLogs(table, segment, messages, timeToStoreUpdateLogs);
     }
     try {
-      for (DataManagerCallback dataManager: segmentDataManagers) {
+      // refetch the data managers from holder in case there are updates
+      for (DataManagerCallback dataManager: _dataManagersHolder.getDataManagers(table, segment)) {
         dataManager.updateVirtualColumns(messages);
       }
     } catch (Exception ex) {
-      LOGGER.error("failed to update virtual column for key ", ex);
+      LOGGER.error("failed to update virtual column for key", ex);
     }
   }
 
@@ -243,14 +242,11 @@ public class SegmentUpdater implements SegmentDeletionListener {
       DataManagerCallback dataManager) {
     // TODO get partition assignment from
     LOGGER.info("segment updater adding table {} segment {}", tableNameWithType, segmentName.getSegmentName());
-    if (!_tableSegmentMap.containsKey(tableNameWithType)) {
-      synchronized (_tableSegmentMap) {
-        _tableSegmentMap.put(tableNameWithType, new ConcurrentHashMap<>());
-      }
+    if (!_dataManagersHolder.hasTable(tableNameWithType)) {
       LOGGER.info("adding table {} to segment updater consumer", tableNameWithType);
       handleNewTableInServer(tableNameWithType);
     }
-    _tableSegmentMap.get(tableNameWithType).computeIfAbsent(segmentName.getSegmentName(), sn -> new HashSet<>()).add(dataManager);
+    _dataManagersHolder.addDataManager(tableNameWithType, segmentName.getSegmentName(), dataManager);
     synchronized (_tablePartitionCreationTime) {
       long creationTime = _tablePartitionCreationTime.computeIfAbsent(tableNameWithType, t -> new ConcurrentHashMap<>())
           .computeIfAbsent(segmentName.getPartitionId(), p -> segmentName.getCreationTimeStamp());
@@ -262,16 +258,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
   public synchronized void removeSegmentDataManager(String tableNameWithType, String segmentName,
       DataManagerCallback toDeleteManager) {
     LOGGER.info("segment updater removing table {} segment {}", tableNameWithType, segmentName);
-    Map<String, Set<DataManagerCallback>> segmentMap = _tableSegmentMap.get(tableNameWithType);
-    if (segmentMap != null) {
-      Set<DataManagerCallback> segmentDataManagers = segmentMap.get(segmentName);
-      if (segmentDataManagers != null) {
-        segmentDataManagers.remove(toDeleteManager);
-        if (segmentDataManagers.size() == 0) {
-          segmentMap.remove(segmentName);
-        }
-      }
-    }
+    _dataManagersHolder.removeDataManager(tableNameWithType, segmentName, toDeleteManager);
   }
 
   /**
@@ -300,22 +287,23 @@ public class SegmentUpdater implements SegmentDeletionListener {
   @Override
   public synchronized void onSegmentDeletion(String tableNameWithType, String segmentName) {
     LOGGER.info("deleting segment virtual column from local storage for table {} segment {}", tableNameWithType, segmentName);
-    Map<String, Set<DataManagerCallback>> segmentManagerMap = _tableSegmentMap.get(tableNameWithType);
-    if (segmentManagerMap != null) {
-      if (segmentManagerMap.containsKey(segmentName) && segmentManagerMap.get(segmentName).size() > 0) {
-        LOGGER.error("trying to remove segment storage with {} segment data manager", segmentManagerMap.get(segmentName).size());
-      }
-      try {
-        segmentManagerMap.remove(segmentName);
-        _retentionManager.getRetentionManagerForTable(tableNameWithType).notifySegmentDeletion(tableNameWithType);
-        _updateLogStorageProvider.removeSegment(tableNameWithType, segmentName);
-      } catch (IOException e) {
-        throw new RuntimeException(String.format("failed to delete table %s segment %s", tableNameWithType, segmentName), e);
+    if (_dataManagersHolder.hasTable(tableNameWithType)) {
+      boolean result = _dataManagersHolder.removeAllDataManagerForSegment(tableNameWithType, segmentName);
+      if (result) {
+        try {
+          UpdateLogTableRetentionManager retentionManager = _retentionManager.getRetentionManagerForTable(tableNameWithType);
+          if (retentionManager != null) {
+            retentionManager.notifySegmentDeletion(segmentName);
+          }
+          _updateLogStorageProvider.removeSegment(tableNameWithType, segmentName);
+        } catch (IOException e) {
+          throw new RuntimeException(String.format("failed to delete table %s segment %s", tableNameWithType, segmentName), e);
+        }
       }
-      if (segmentManagerMap.size() == 0) {
-        _tableSegmentMap.remove(tableNameWithType);
+      if (_dataManagersHolder.maybeRemoveTable(tableNameWithType)) {
         handleTableRemovalInServer(tableNameWithType);
       }
+
     } else {
       LOGGER.error("deleting a segment {}:{} from current server but don't have segment map on updater",
           tableNameWithType, segmentName);
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolder.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolder.java
new file mode 100644
index 0000000..4f6bd50
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolder.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.updater;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * class to manage list of data managers and their associated table/segment for segment updater
+ */
+@ThreadSafe
+public class SegmentUpdaterDataManagerHolder {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentUpdaterDataManagerHolder.class);
+
+  private final Map<String, Map<String, Set<DataManagerCallback>>> _tableSegmentMap = new ConcurrentHashMap<>();
+
+  public SegmentUpdaterDataManagerHolder() {}
+
+  /**
+   * fetch all tables containing at least one data manager on this server
+   * @return list of all pinot tables name for the current segment updater
+   */
+  public Set<String> getAllTables() {
+    return ImmutableSet.copyOf(_tableSegmentMap.keySet());
+  }
+
+  /**
+   * check if there is any data manager associated with the given table
+   */
+  public boolean hasTable(String tableName) {
+    return _tableSegmentMap.containsKey(tableName);
+  }
+
+  /**
+   * get a set of data manager for the given table name and segment name
+   */
+  public synchronized Set<DataManagerCallback> getDataManagers(String tableName, String segmentName) {
+    if (!_tableSegmentMap.containsKey(tableName)) {
+      LOGGER.error("try to fetch data manager for non-existing table {} segment {}", tableName, segmentName);
+    } else {
+      final Map<String, Set<DataManagerCallback>> segmentDataManagerMap = _tableSegmentMap.get(tableName);
+      if (segmentDataManagerMap.containsKey(segmentName)) {
+        return ImmutableSet.copyOf(segmentDataManagerMap.get(segmentName));
+      }
+    }
+    return ImmutableSet.of();
+  }
+
+  /**
+   * add a data manager for a given table and segment name
+   */
+  public synchronized void addDataManager(String tableName, String segmentName, DataManagerCallback dataManager) {
+    LOGGER.info("adding new data manager to updater for table {}, segment {}", tableName, segmentName);
+    if (!_tableSegmentMap.containsKey(tableName)) {
+      _tableSegmentMap.put(tableName, new ConcurrentHashMap<>());
+    }
+    _tableSegmentMap.get(tableName).computeIfAbsent(segmentName, sn -> ConcurrentHashMap.newKeySet()).add(dataManager);
+  }
+
+  /**
+   * remove a specific data manager for a given table and segment name.
+   * do nothing if there is no such data manager for the given table/segment name
+   */
+  public synchronized void removeDataManager(String tableName, String segmentName,
+                                             DataManagerCallback toDeleteManager) {
+    Map<String, Set<DataManagerCallback>> segmentMap = _tableSegmentMap.get(tableName);
+    if (segmentMap != null) {
+      Set<DataManagerCallback> segmentDataManagers = segmentMap.get(segmentName);
+      if (segmentDataManagers != null) {
+        segmentDataManagers.remove(toDeleteManager);
+        LOGGER.info("removing data manager for table {} segment {}", tableName, segmentName);
+        if (segmentDataManagers.size() == 0) {
+          segmentMap.remove(segmentName);
+        }
+      }
+    }
+  }
+
+  /**
+   * remove all data managers for a table and segment
+   * @return true if we indeed remove any data manager, false otherwise
+   */
+  public synchronized boolean removeAllDataManagerForSegment(String tableName, String segmentName) {
+    Map<String, Set<DataManagerCallback>> segmentManagerMap = _tableSegmentMap.get(tableName);
+    if (segmentManagerMap != null) {
+      if (segmentManagerMap.containsKey(segmentName)) {
+        LOGGER.error("trying to remove segment storage with {} segment data manager", segmentManagerMap.get(segmentName).size());
+      }
+      Set<DataManagerCallback> result = segmentManagerMap.remove(segmentName);
+      return result != null;
+    }
+    return false;
+  }
+
+  /**
+   * check if the table still has any associated data manager. If there is no data managers, then removed it from cached
+   * @return true if the given table is removed, false otherwise
+   */
+  public synchronized boolean maybeRemoveTable(String tableName) {
+    Map<String, Set<DataManagerCallback>> segmentManagerMap = _tableSegmentMap.get(tableName);
+    if (segmentManagerMap != null && segmentManagerMap.size() == 0) {
+      _tableSegmentMap.remove(tableName);
+      return true;
+    }
+    return false;
+  }
+
+}
diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolderTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolderTest.java
new file mode 100644
index 0000000..ca7aceb
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolderTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.updater;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+
+public class SegmentUpdaterDataManagerHolderTest {
+  private SegmentUpdaterDataManagerHolder emptyHolder;
+  private SegmentUpdaterDataManagerHolder dataManagerHolder;
+
+  private DataManagerCallback dummyManager1;
+  private DataManagerCallback dummyManager2;
+  private DataManagerCallback dummyManager3;
+
+  @BeforeMethod
+  public void setUp() {
+    emptyHolder = new SegmentUpdaterDataManagerHolder();
+    dataManagerHolder = new SegmentUpdaterDataManagerHolder();
+    dummyManager1 = mock(DataManagerCallback.class);
+    dummyManager2 = mock(DataManagerCallback.class);
+    dummyManager3 = mock(DataManagerCallback.class);
+    dataManagerHolder.addDataManager("table", "segment1", dummyManager1);
+    dataManagerHolder.addDataManager("table", "segment2", dummyManager2);
+    dataManagerHolder.addDataManager("table2", "segment3", dummyManager3);
+  }
+
+  @Test
+  public void testGetAllTables() {
+    Set<String> tables = dataManagerHolder.getAllTables();
+    ensureSetEqual(tables, ImmutableSet.of("table", "table2"));
+
+    Assert.assertEquals(emptyHolder.getAllTables().size(), 0);
+  }
+
+  @Test
+  public void testHasTable() {
+    Assert.assertFalse(emptyHolder.hasTable("table"));
+
+    Assert.assertTrue(dataManagerHolder.hasTable("table"));
+    Assert.assertTrue(dataManagerHolder.hasTable("table2"));
+    Assert.assertFalse(dataManagerHolder.hasTable("table3"));
+  }
+
+  @Test
+  public void testGetDataManagers() {
+    Set<DataManagerCallback> dataManagers = dataManagerHolder.getDataManagers("table", "segment1");
+    ensureSetEqual(dataManagers, ImmutableSet.of(dummyManager1));
+
+    dataManagers = dataManagerHolder.getDataManagers("table", "segment2");
+    ensureSetEqual(dataManagers, ImmutableSet.of(dummyManager2));
+
+    dataManagers = dataManagerHolder.getDataManagers("table2", "segment3");
+    ensureSetEqual(dataManagers, ImmutableSet.of(dummyManager3));
+
+    // non exist tables/segments
+    dataManagers = dataManagerHolder.getDataManagers("table2", "segment1");
+    ensureSetEqual(dataManagers, ImmutableSet.of());
+
+    dataManagers = dataManagerHolder.getDataManagers("table3", "segment1");
+    ensureSetEqual(dataManagers, ImmutableSet.of());
+  }
+
+  @Test
+  public void testAddDataManager() {
+    DataManagerCallback dummyManager4 = mock(DataManagerCallback.class);
+    DataManagerCallback dummyManager5 = mock(DataManagerCallback.class);
+    DataManagerCallback dummyManager6 = mock(DataManagerCallback.class);
+    dataManagerHolder.addDataManager("table", "segment1", dummyManager4);
+    dataManagerHolder.addDataManager("table", "segment2", dummyManager5);
+    dataManagerHolder.addDataManager("table2", "segment1", dummyManager6);
+
+    Set<DataManagerCallback> tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1");
+    ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager1, dummyManager4));
+
+    tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment2");
+    ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager2, dummyManager5));
+
+    tableSegmentDMs = dataManagerHolder.getDataManagers("table2", "segment1");
+    ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager6));
+  }
+
+  @Test
+  public void testRemoveDataManager() {
+    DataManagerCallback dummyManager4 = mock(DataManagerCallback.class);
+    DataManagerCallback dummyManager5 = mock(DataManagerCallback.class);
+    DataManagerCallback dummyManager6 = mock(DataManagerCallback.class);
+    dataManagerHolder.addDataManager("table", "segment1", dummyManager4);
+    dataManagerHolder.addDataManager("table", "segment2", dummyManager5);
+    dataManagerHolder.addDataManager("table2", "segment1", dummyManager6);
+    Set<DataManagerCallback> tableSegmentDMs;
+
+    // start deleting
+    dataManagerHolder.removeDataManager("table", "segment1", dummyManager1);
+    dataManagerHolder.removeDataManager("table", "segment1", dummyManager2);
+    dataManagerHolder.removeDataManager("table", "segment", dummyManager2);
+    tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1");
+    ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager4));
+
+    // delete all segment
+    dataManagerHolder.removeDataManager("table", "segment1", dummyManager4);
+    tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1");
+    ensureSetEqual(tableSegmentDMs, ImmutableSet.of());
+
+    // delete some more
+    dataManagerHolder.removeDataManager("table", "segment1", dummyManager4);
+    tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1");
+    ensureSetEqual(tableSegmentDMs, ImmutableSet.of());
+
+    // add some back and delete
+    dataManagerHolder.addDataManager("table", "segment1", dummyManager4);
+    ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of(dummyManager4));
+
+    dataManagerHolder.removeDataManager("table", "segment1", dummyManager4);
+    ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of());
+  }
+
+  @Test
+  public void testRemoveAllDataManagerForSegment() {
+    DataManagerCallback dummyManager4 = mock(DataManagerCallback.class);
+    DataManagerCallback dummyManager5 = mock(DataManagerCallback.class);
+    DataManagerCallback dummyManager6 = mock(DataManagerCallback.class);
+    dataManagerHolder.addDataManager("table", "segment1", dummyManager4);
+    dataManagerHolder.addDataManager("table", "segment2", dummyManager5);
+    dataManagerHolder.addDataManager("table2", "segment1", dummyManager6);
+
+    boolean result = dataManagerHolder.removeAllDataManagerForSegment("table", "segment1");
+    ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of());
+    Assert.assertTrue(result);
+
+    result = dataManagerHolder.removeAllDataManagerForSegment("table", "segment1");
+    ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of());
+    Assert.assertFalse(result);
+
+    result = dataManagerHolder.removeAllDataManagerForSegment("table3", "segment1");
+    Assert.assertFalse(result);
+  }
+
+  @Test
+  public void testMaybeRemoveTable() {
+
+    Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table"));
+    Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table2"));
+    Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table4"));
+
+    dataManagerHolder.removeAllDataManagerForSegment("table2", "segment3");
+    Assert.assertTrue(dataManagerHolder.maybeRemoveTable("table2"));
+
+    dataManagerHolder.removeAllDataManagerForSegment("table", "segment1");
+    Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table"));
+    dataManagerHolder.removeAllDataManagerForSegment("table", "segment2");
+    Assert.assertTrue(dataManagerHolder.maybeRemoveTable("table"));
+  }
+
+  private <T> void ensureSetEqual(Set<T> set1, Set<T> set2) {
+    Assert.assertEquals(set2.size(), set1.size());
+    for (T o: set1) {
+      Assert.assertTrue(set2.contains(o));
+    }
+  }
+}
\ No newline at end of file
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java
index 13a7612..eb1f3ea 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java
@@ -26,19 +26,23 @@ import io.swagger.annotations.ApiResponses;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.TableDataManager;
-import org.apache.pinot.core.data.manager.UpsertSegmentDataManager;
 import org.apache.pinot.server.starter.ServerInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 
 @Api(tags = "UpsertDebug")
 @Path("/")
 public class UpsertDebugResource {
+  private static final Logger LOGGER = LoggerFactory.getLogger(UpsertDebugResource.class);
 
   @Inject
   ServerInstance serverInstance;
@@ -64,6 +68,7 @@ public class UpsertDebugResource {
     if (tableDataManager == null) {
       return "no table for " + tableName;
     }
+    /*
     SegmentDataManager segmentDataManager = null;
     try {
       segmentDataManager = tableDataManager.acquireSegment(segmentName);
@@ -73,12 +78,20 @@ public class UpsertDebugResource {
       if (!(segmentDataManager instanceof UpsertSegmentDataManager)) {
         return "it is not an upsert table";
       } else {
-        return ((UpsertSegmentDataManager) segmentDataManager).getVirtualColumnInfo(Long.parseLong(offsetStr));
+        long offset = Long.parseLong(offsetStr);
+        LOGGER.info("getting virtual column for table {} segment {} offset {}", tableName, segmentName, offset);
+        return ( segmentDataManager).getVirtualColumnInfo(Long.parseLong(offsetStr));
       }
+    } catch (Exception ex) {
+      LOGGER.error("failed to fetch virtual column info", ex);
+      throw new WebApplicationException("Failed to fetch virtual column info" + ex.getMessage(),
+          Response.Status.INTERNAL_SERVER_ERROR);
     } finally {
       if (segmentDataManager != null) {
         tableDataManager.releaseSegment(segmentDataManager);
       }
     }
+    */
+    return "";
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org