You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/23 19:27:35 UTC
[incubator-pinot] 05/09: test build
This is an automated email from the ASF dual-hosted git repository.
jamesshao pushed a commit to branch upsert-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 592fb5f2fdd9ce5c6d3e3275de522c20a4e96f0c
Author: Uber Jenkins <pa...@uber.com>
AuthorDate: Thu Feb 13 23:55:51 2020 +0000
test build
Summary: test unit test
Differential Revision: https://code.uberinternal.com/D4105485
---
.../VirtualColumnLongValueReaderWriter.java | 7 +-
.../SegmentUpdateLogStorageProvider.java | 9 +-
.../storageProvider/UpdateLogStorageExplorer.java | 25 ++-
.../UpsertImmutableIndexSegmentCallback.java | 44 +++--
.../pinot/core/segment/updater/SegmentUpdater.java | 74 ++++-----
.../updater/SegmentUpdaterDataManagerHolder.java | 131 +++++++++++++++
.../SegmentUpdaterDataManagerHolderTest.java | 184 +++++++++++++++++++++
.../server/api/resources/UpsertDebugResource.java | 17 +-
8 files changed, 421 insertions(+), 70 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
index 78f3bda..dea717c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
@@ -64,7 +64,12 @@ public abstract class VirtualColumnLongValueReaderWriter extends BaseVirtualColu
@Override
public long getLong(int row) {
- return _values[row];
+ if (row >= 0 && row < _totalDocSize) {
+ return _values[row];
+ } else {
+ throw new RuntimeException(String.format("trying to fetch row %d while we only have total row count %d", row,
+ _totalDocSize));
+ }
}
@Override
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
index e7b035f..fa045d1 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
@@ -32,6 +32,7 @@ import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* provide the storage abstraction of storing upsert update event logs to a local disk so we can reload it
@@ -44,6 +45,7 @@ public class SegmentUpdateLogStorageProvider {
protected final File _file;
@VisibleForTesting
protected final FileOutputStream _outputStream;
+ private AtomicInteger messageCountInFile = new AtomicInteger(0);
public SegmentUpdateLogStorageProvider(File file)
throws IOException {
@@ -60,6 +62,7 @@ public class SegmentUpdateLogStorageProvider {
ByteBuffer buffer = ByteBuffer.allocate(fileLength);
readFullyFromBeginning(_file, buffer);
int messageCount = fileLength / UpdateLogEntry.SIZE;
+ LOGGER.info("read {} messages from file {}", messageCount, _file.getName());
return new UpdateLogEntrySet(buffer, messageCount);
} else {
return UpdateLogEntrySet.getEmptySet();
@@ -74,7 +77,8 @@ public class SegmentUpdateLogStorageProvider {
buffer.flip();
_outputStream.write(buffer.array());
_outputStream.flush();
-
+ messageCountInFile.getAndAdd(messages.size());
+ LOGGER.debug("file {} message count {}", _file.getName(), messageCountInFile.get());
}
public synchronized void destroy() throws IOException {
@@ -102,6 +106,7 @@ public class SegmentUpdateLogStorageProvider {
segmentUpdateFile.length(), newSize);
channel.truncate(newSize);
channel.force(false);
+ messageCountInFile.set(Math.toIntExact(newSize / UpdateLogEntry.SIZE));
}
}
@@ -116,7 +121,7 @@ public class SegmentUpdateLogStorageProvider {
position += byteRead;
} while (byteRead != -1 && buffer.hasRemaining());
buffer.flip();
- LOGGER.info("read all data from segment update file {} to buffer in {} ms", segmentUpdateFile.getName(),
+ LOGGER.info("read {} bytes from segment update file {} to buffer in {} ms", position, segmentUpdateFile.getName(),
System.currentTimeMillis() - start);
}
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
index 0e38deb..a2b53ee 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageExplorer.java
@@ -27,6 +27,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.IOException;
import java.util.Collection;
import java.util.Scanner;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* command line tools for debug pinot server by allowing us to interatively explore the update log data in pinot server/kc
@@ -52,24 +53,38 @@ public class UpdateLogStorageExplorer {
String[] inputSplits = input.split(" ");
Preconditions.checkState(inputSplits.length == 2, "expect input data to be 'tableName segmentName'");
String tableName = inputSplits[0];
+ if (!tableName.endsWith("_REALTIME")) {
+ tableName = tableName + "_REALTIME";
+ }
String segmentName = inputSplits[1];
provider.loadTable(tableName);
UpdateLogEntrySet updateLogEntrySet = provider.getAllMessages(tableName, segmentName);
- Multimap<Long, UpdateLogEntry> map = ArrayListMultimap.create();
+ Multimap<Long, UpdateLogAndPos> map = ArrayListMultimap.create();
System.out.println("update log size: " + updateLogEntrySet.size());
+ AtomicInteger pos = new AtomicInteger(0);
updateLogEntrySet.forEach(u -> {
- map.put(u.getOffset(), u);
+ map.put(u.getOffset(), new UpdateLogAndPos(u, pos.getAndIncrement()));
});
while (true) {
System.out.println("input the offset");
long offset = reader.nextLong();
- Collection<UpdateLogEntry> result = map.get(offset);
+ Collection<UpdateLogAndPos> result = map.get(offset);
System.out.println("associated update logs size: " + result.size());
- for (UpdateLogEntry entry: result) {
- System.out.println("content: " + entry.toString());
+ for (UpdateLogAndPos entry: result) {
+ System.out.println("content: " + entry.logEntry.toString() + " pos " + entry.pos);
}
}
}
+
+ static class UpdateLogAndPos {
+ public UpdateLogEntry logEntry;
+ public int pos;
+
+ public UpdateLogAndPos(UpdateLogEntry entry, int pos) {
+ this.logEntry = entry;
+ this.pos = pos;
+ }
+ }
}
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
index b7fcdb8..d0667b8 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java
@@ -130,9 +130,9 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
throw new RuntimeException("unexpected forward reader type for kafka offset column " + reader.getClass());
}
LOGGER.info("built offset to DocId map for segment {} with {} documents in {} ms", _segmentName, _totalDoc, System.currentTimeMillis() - start);
+ LOGGER.info("immutable segment {} built offset map with minOffset {} and maxOffset {}", _segmentName, minOffset, maxOffset);
}
- @Override
public void postProcessRecords(GenericRow row, int docId) {
// do nothing
}
@@ -150,7 +150,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
updateLogEntries.size(), _segmentName, System.currentTimeMillis() - start);
start = System.currentTimeMillis();
- final long maxOffset = _totalDoc + _minSourceOffset;
+ final long maxOffset = _minSourceOffset + _sourceOffsetToDocIdArray.length;
int unmatchedLogEntryCount = 0;
try {
Map<Integer, Long> partitionToHighestWatermark = new HashMap<>();
@@ -193,13 +193,19 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
@Override
public void updateVirtualColumn(Iterable<UpdateLogEntry> logEntries) {
for (UpdateLogEntry logEntry: logEntries) {
- boolean updated = false;
- int docId = getDocIdFromSourceOffset(logEntry.getOffset());
- for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
- updated = readerWriter.update(docId, logEntry.getValue(), logEntry.getType()) || updated;
- }
- if (updated) {
- _upsertWatermarkManager.processMessage(_tableNameWithType, _segmentName, logEntry);
+ try {
+ boolean updated = false;
+ int docId = getDocIdFromSourceOffset(logEntry.getOffset());
+ if (docId != DEFAULT_DOC_ID_FOR_MISSING_ENTRY) {
+ for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
+ updated = readerWriter.update(docId, logEntry.getValue(), logEntry.getType()) || updated;
+ }
+ if (updated) {
+ _upsertWatermarkManager.processMessage(_tableNameWithType, _segmentName, logEntry);
+ }
+ }
+ } catch (Exception ex) {
+ LOGGER.warn("failed to update virtual column, skipping the current record {}", logEntries.toString());
}
}
}
@@ -207,11 +213,15 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
@Override
public String getVirtualColumnInfo(long offset) {
int docId = getDocIdFromSourceOffset(offset);
- StringBuilder result = new StringBuilder("matched: ");
- for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
- result.append(readerWriter.getInt(docId)).append("; ");
+ if (docId != DEFAULT_DOC_ID_FOR_MISSING_ENTRY) {
+ StringBuilder result = new StringBuilder("matched: ");
+ for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
+ result.append(readerWriter.getLong(docId)).append("; ");
+ }
+ return result.toString();
+ } else {
+ return "cannot found doc matching offset";
}
- return result.toString();
}
/**
@@ -222,14 +232,14 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback
*/
private int getDocIdFromSourceOffset(long offset) throws RuntimeException {
if (offset < _minSourceOffset || offset - _minSourceOffset >= _sourceOffsetToDocIdArray.length) {
- LOGGER.error("offset {} is outside range for current segment {} start offset {} size {}",
+ LOGGER.warn("offset {} is outside of range for current segment {} start offset {} size {}",
offset, _segmentName, _minSourceOffset, _sourceOffsetToDocIdArray.length);
- throw new RuntimeException("offset outside range");
+ return DEFAULT_DOC_ID_FOR_MISSING_ENTRY;
} else {
int position = Math.toIntExact(offset - _minSourceOffset);
if (_sourceOffsetToDocIdArray[position] == DEFAULT_DOC_ID_FOR_MISSING_ENTRY) {
- LOGGER.error("no docId associated with offset {} for segment {}", offset, _segmentName);
- throw new RuntimeException("docId not found");
+ LOGGER.warn("no docId associated with offset {} for segment {}", offset, _segmentName);
+ return DEFAULT_DOC_ID_FOR_MISSING_ENTRY;
} else {
return _sourceOffsetToDocIdArray[position];
}
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
index 5f0aa43..f380afa 100644
--- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.segment.updater;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
-import io.netty.util.internal.ConcurrentSet;
import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.utils.CommonConstants;
@@ -36,6 +35,7 @@ import org.apache.pinot.grigio.common.rpcQueue.QueueConsumerRecord;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogRetentionManager;
+import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogTableRetentionManager;
import org.apache.pinot.grigio.common.utils.CommonUtils;
import org.apache.pinot.grigio.servers.SegmentUpdaterProvider;
import org.slf4j.Logger;
@@ -46,7 +46,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -75,7 +74,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
private final String _topicPrefix;
private final ExecutorService _ingestionExecutorService;
private final QueueConsumer _consumer;
- private final Map<String, Map<String, Set<DataManagerCallback>>> _tableSegmentMap = new ConcurrentHashMap<>();
+ private final SegmentUpdaterDataManagerHolder _dataManagersHolder = new SegmentUpdaterDataManagerHolder();
private final Map<String, Map<Integer, Long>> _tablePartitionCreationTime = new ConcurrentHashMap<>();
private final UpdateLogStorageProvider _updateLogStorageProvider;
private final UpdateLogRetentionManager _retentionManager;
@@ -105,7 +104,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
public void start() {
- String listOfTables = Joiner.on(",").join(_tableSegmentMap.keySet());
+ String listOfTables = Joiner.on(",").join(_dataManagersHolder.getAllTables());
LOGGER.info("starting segment updater main loop with the following table in server: {}", listOfTables);
_ingestionExecutorService.submit(this::updateLoop);
}
@@ -162,10 +161,9 @@ public class SegmentUpdater implements SegmentDeletionListener {
for (Map.Entry<String, TableUpdateLogs> entry : tableSegmentToUpdateLogs.entrySet()) {
String tableName = TableNameBuilder.ensureTableNameWithType(entry.getKey(), CommonConstants.Helix.TableType.REALTIME);
int tableMessageCount = 0;
- if (_tableSegmentMap.containsKey(tableName)) {
- final Map<String, Set<DataManagerCallback>> segmentManagersMap = _tableSegmentMap.get(tableName);
+ if (_dataManagersHolder.hasTable(tableName)) {
final TableUpdateLogs segment2UpdateLogsMap = entry.getValue();
- updateSegmentVirtualColumns(tableName, segmentManagersMap, segment2UpdateLogsMap, timeToStoreUpdateLogs);
+ updateSegmentVirtualColumns(tableName, segment2UpdateLogsMap, timeToStoreUpdateLogs);
} else {
LOGGER.warn("got messages for table {} not in this server", tableName);
}
@@ -202,12 +200,11 @@ public class SegmentUpdater implements SegmentDeletionListener {
/**
* Update the virtual columns of affected segments of a table.
*/
- private void updateSegmentVirtualColumns(String tableName, Map<String, Set<DataManagerCallback>> segmentManagersMap,
- TableUpdateLogs segment2UpdateLogsMap, AtomicLong timeToStoreUpdateLogs) throws IOException{
+ private void updateSegmentVirtualColumns(String tableName, TableUpdateLogs segment2UpdateLogsMap,
+ AtomicLong timeToStoreUpdateLogs) throws IOException{
for (Map.Entry<String, List<UpdateLogEntry>> segmentEntry : segment2UpdateLogsMap.getSegments2UpdateLog().entrySet()) {
final String segmentNameStr = segmentEntry.getKey();
updateVirtualColumn(tableName, segmentNameStr,
- segmentManagersMap.computeIfAbsent(segmentNameStr, sn -> new ConcurrentSet<>()),
segment2UpdateLogsMap.get(segmentNameStr), timeToStoreUpdateLogs);
}
}
@@ -217,19 +214,21 @@ public class SegmentUpdater implements SegmentDeletionListener {
* from consuming to online (mutable segment to immutable segment). In most of cases we expect only one segment manager
* in this set of UpsertSegmentDataManager
*/
- private void updateVirtualColumn(String table, String segment, Set<DataManagerCallback> segmentDataManagers,
+ private void updateVirtualColumn(String table, String segment,
List<UpdateLogEntry> messages, AtomicLong timeToStoreUpdateLogs) throws IOException {
+ Set<DataManagerCallback> dataManagers = _dataManagersHolder.getDataManagers(table, segment);
LOGGER.debug("updating segment {} with {} results for {} data managers", segment, messages.size(),
- segmentDataManagers.size());
- if (segmentDataManagers.size() > 0 || _retentionManager.getRetentionManagerForTable(table).shouldIngestForSegment(segment)) {
+ dataManagers.size());
+ if (dataManagers.size() > 0 || _retentionManager.getRetentionManagerForTable(table).shouldIngestForSegment(segment)) {
storeUpdateLogs(table, segment, messages, timeToStoreUpdateLogs);
}
try {
- for (DataManagerCallback dataManager: segmentDataManagers) {
+ // refetch the data managers from holder in case there are updates
+ for (DataManagerCallback dataManager: _dataManagersHolder.getDataManagers(table, segment)) {
dataManager.updateVirtualColumns(messages);
}
} catch (Exception ex) {
- LOGGER.error("failed to update virtual column for key ", ex);
+ LOGGER.error("failed to update virtual column for key", ex);
}
}
@@ -243,14 +242,11 @@ public class SegmentUpdater implements SegmentDeletionListener {
DataManagerCallback dataManager) {
// TODO get partition assignment from
LOGGER.info("segment updater adding table {} segment {}", tableNameWithType, segmentName.getSegmentName());
- if (!_tableSegmentMap.containsKey(tableNameWithType)) {
- synchronized (_tableSegmentMap) {
- _tableSegmentMap.put(tableNameWithType, new ConcurrentHashMap<>());
- }
+ if (!_dataManagersHolder.hasTable(tableNameWithType)) {
LOGGER.info("adding table {} to segment updater consumer", tableNameWithType);
handleNewTableInServer(tableNameWithType);
}
- _tableSegmentMap.get(tableNameWithType).computeIfAbsent(segmentName.getSegmentName(), sn -> new HashSet<>()).add(dataManager);
+ _dataManagersHolder.addDataManager(tableNameWithType, segmentName.getSegmentName(), dataManager);
synchronized (_tablePartitionCreationTime) {
long creationTime = _tablePartitionCreationTime.computeIfAbsent(tableNameWithType, t -> new ConcurrentHashMap<>())
.computeIfAbsent(segmentName.getPartitionId(), p -> segmentName.getCreationTimeStamp());
@@ -262,16 +258,7 @@ public class SegmentUpdater implements SegmentDeletionListener {
public synchronized void removeSegmentDataManager(String tableNameWithType, String segmentName,
DataManagerCallback toDeleteManager) {
LOGGER.info("segment updater removing table {} segment {}", tableNameWithType, segmentName);
- Map<String, Set<DataManagerCallback>> segmentMap = _tableSegmentMap.get(tableNameWithType);
- if (segmentMap != null) {
- Set<DataManagerCallback> segmentDataManagers = segmentMap.get(segmentName);
- if (segmentDataManagers != null) {
- segmentDataManagers.remove(toDeleteManager);
- if (segmentDataManagers.size() == 0) {
- segmentMap.remove(segmentName);
- }
- }
- }
+ _dataManagersHolder.removeDataManager(tableNameWithType, segmentName, toDeleteManager);
}
/**
@@ -300,22 +287,23 @@ public class SegmentUpdater implements SegmentDeletionListener {
@Override
public synchronized void onSegmentDeletion(String tableNameWithType, String segmentName) {
LOGGER.info("deleting segment virtual column from local storage for table {} segment {}", tableNameWithType, segmentName);
- Map<String, Set<DataManagerCallback>> segmentManagerMap = _tableSegmentMap.get(tableNameWithType);
- if (segmentManagerMap != null) {
- if (segmentManagerMap.containsKey(segmentName) && segmentManagerMap.get(segmentName).size() > 0) {
- LOGGER.error("trying to remove segment storage with {} segment data manager", segmentManagerMap.get(segmentName).size());
- }
- try {
- segmentManagerMap.remove(segmentName);
- _retentionManager.getRetentionManagerForTable(tableNameWithType).notifySegmentDeletion(tableNameWithType);
- _updateLogStorageProvider.removeSegment(tableNameWithType, segmentName);
- } catch (IOException e) {
- throw new RuntimeException(String.format("failed to delete table %s segment %s", tableNameWithType, segmentName), e);
+ if (_dataManagersHolder.hasTable(tableNameWithType)) {
+ boolean result = _dataManagersHolder.removeAllDataManagerForSegment(tableNameWithType, segmentName);
+ if (result) {
+ try {
+ UpdateLogTableRetentionManager retentionManager = _retentionManager.getRetentionManagerForTable(tableNameWithType);
+ if (retentionManager != null) {
+ retentionManager.notifySegmentDeletion(segmentName);
+ }
+ _updateLogStorageProvider.removeSegment(tableNameWithType, segmentName);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("failed to delete table %s segment %s", tableNameWithType, segmentName), e);
+ }
}
- if (segmentManagerMap.size() == 0) {
- _tableSegmentMap.remove(tableNameWithType);
+ if (_dataManagersHolder.maybeRemoveTable(tableNameWithType)) {
handleTableRemovalInServer(tableNameWithType);
}
+
} else {
LOGGER.error("deleting a segment {}:{} from current server but don't have segment map on updater",
tableNameWithType, segmentName);
diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolder.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolder.java
new file mode 100644
index 0000000..4f6bd50
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolder.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.updater;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * class to manage list of data managers and their associated table/segment for segment updater
+ */
+@ThreadSafe
+public class SegmentUpdaterDataManagerHolder {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentUpdaterDataManagerHolder.class);
+
+ private final Map<String, Map<String, Set<DataManagerCallback>>> _tableSegmentMap = new ConcurrentHashMap<>();
+
+ public SegmentUpdaterDataManagerHolder() {}
+
+ /**
+ * fetch all tables containing at least one data manager on this server
+ * @return list of all pinot tables name for the current segment updater
+ */
+ public Set<String> getAllTables() {
+ return ImmutableSet.copyOf(_tableSegmentMap.keySet());
+ }
+
+ /**
+ * check if there is any data manager associated with the given table
+ */
+ public boolean hasTable(String tableName) {
+ return _tableSegmentMap.containsKey(tableName);
+ }
+
+ /**
+ * get a set of data manager for the given table name and segment name
+ */
+ public synchronized Set<DataManagerCallback> getDataManagers(String tableName, String segmentName) {
+ if (!_tableSegmentMap.containsKey(tableName)) {
+ LOGGER.error("try to fetch data manager for non-existing table {} segment {}", tableName, segmentName);
+ } else {
+ final Map<String, Set<DataManagerCallback>> segmentDataManagerMap = _tableSegmentMap.get(tableName);
+ if (segmentDataManagerMap.containsKey(segmentName)) {
+ return ImmutableSet.copyOf(segmentDataManagerMap.get(segmentName));
+ }
+ }
+ return ImmutableSet.of();
+ }
+
+ /**
+ * add a data manager for a given table and segment name
+ */
+ public synchronized void addDataManager(String tableName, String segmentName, DataManagerCallback dataManager) {
+ LOGGER.info("adding new data manager to updater for table {}, segment {}", tableName, segmentName);
+ if (!_tableSegmentMap.containsKey(tableName)) {
+ _tableSegmentMap.put(tableName, new ConcurrentHashMap<>());
+ }
+ _tableSegmentMap.get(tableName).computeIfAbsent(segmentName, sn -> ConcurrentHashMap.newKeySet()).add(dataManager);
+ }
+
+ /**
+ * remove a specific data manager for a given table and segment name.
+ * do nothing if there is no such data manager for the given table/segment name
+ */
+ public synchronized void removeDataManager(String tableName, String segmentName,
+ DataManagerCallback toDeleteManager) {
+ Map<String, Set<DataManagerCallback>> segmentMap = _tableSegmentMap.get(tableName);
+ if (segmentMap != null) {
+ Set<DataManagerCallback> segmentDataManagers = segmentMap.get(segmentName);
+ if (segmentDataManagers != null) {
+ segmentDataManagers.remove(toDeleteManager);
+ LOGGER.info("removing data manager for table {} segment {}", tableName, segmentName);
+ if (segmentDataManagers.size() == 0) {
+ segmentMap.remove(segmentName);
+ }
+ }
+ }
+ }
+
+ /**
+ * remove all data managers for a table and segment
+ * @return true if we indeed remove any data manager, false otherwise
+ */
+ public synchronized boolean removeAllDataManagerForSegment(String tableName, String segmentName) {
+ Map<String, Set<DataManagerCallback>> segmentManagerMap = _tableSegmentMap.get(tableName);
+ if (segmentManagerMap != null) {
+ if (segmentManagerMap.containsKey(segmentName)) {
+ LOGGER.error("trying to remove segment storage with {} segment data manager", segmentManagerMap.get(segmentName).size());
+ }
+ Set<DataManagerCallback> result = segmentManagerMap.remove(segmentName);
+ return result != null;
+ }
+ return false;
+ }
+
+ /**
+ * check if the table still has any associated data manager. If there is no data managers, then removed it from cached
+ * @return true if the given table is removed, false otherwise
+ */
+ public synchronized boolean maybeRemoveTable(String tableName) {
+ Map<String, Set<DataManagerCallback>> segmentManagerMap = _tableSegmentMap.get(tableName);
+ if (segmentManagerMap != null && segmentManagerMap.size() == 0) {
+ _tableSegmentMap.remove(tableName);
+ return true;
+ }
+ return false;
+ }
+
+}
diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolderTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolderTest.java
new file mode 100644
index 0000000..ca7aceb
--- /dev/null
+++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/segment/updater/SegmentUpdaterDataManagerHolderTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.updater;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.pinot.core.data.manager.upsert.DataManagerCallback;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+
+public class SegmentUpdaterDataManagerHolderTest {
+ private SegmentUpdaterDataManagerHolder emptyHolder;
+ private SegmentUpdaterDataManagerHolder dataManagerHolder;
+
+ private DataManagerCallback dummyManager1;
+ private DataManagerCallback dummyManager2;
+ private DataManagerCallback dummyManager3;
+
+ @BeforeMethod
+ public void setUp() {
+ emptyHolder = new SegmentUpdaterDataManagerHolder();
+ dataManagerHolder = new SegmentUpdaterDataManagerHolder();
+ dummyManager1 = mock(DataManagerCallback.class);
+ dummyManager2 = mock(DataManagerCallback.class);
+ dummyManager3 = mock(DataManagerCallback.class);
+ dataManagerHolder.addDataManager("table", "segment1", dummyManager1);
+ dataManagerHolder.addDataManager("table", "segment2", dummyManager2);
+ dataManagerHolder.addDataManager("table2", "segment3", dummyManager3);
+ }
+
+ @Test
+ public void testGetAllTables() {
+ Set<String> tables = dataManagerHolder.getAllTables();
+ ensureSetEqual(tables, ImmutableSet.of("table", "table2"));
+
+ Assert.assertEquals(emptyHolder.getAllTables().size(), 0);
+ }
+
+ @Test
+ public void testHasTable() {
+ Assert.assertFalse(emptyHolder.hasTable("table"));
+
+ Assert.assertTrue(dataManagerHolder.hasTable("table"));
+ Assert.assertTrue(dataManagerHolder.hasTable("table2"));
+ Assert.assertFalse(dataManagerHolder.hasTable("table3"));
+ }
+
+ @Test
+ public void testGetDataManagers() {
+ Set<DataManagerCallback> dataManagers = dataManagerHolder.getDataManagers("table", "segment1");
+ ensureSetEqual(dataManagers, ImmutableSet.of(dummyManager1));
+
+ dataManagers = dataManagerHolder.getDataManagers("table", "segment2");
+ ensureSetEqual(dataManagers, ImmutableSet.of(dummyManager2));
+
+ dataManagers = dataManagerHolder.getDataManagers("table2", "segment3");
+ ensureSetEqual(dataManagers, ImmutableSet.of(dummyManager3));
+
+ // non exist tables/segments
+ dataManagers = dataManagerHolder.getDataManagers("table2", "segment1");
+ ensureSetEqual(dataManagers, ImmutableSet.of());
+
+ dataManagers = dataManagerHolder.getDataManagers("table3", "segment1");
+ ensureSetEqual(dataManagers, ImmutableSet.of());
+ }
+
+ @Test
+ public void testAddDataManager() {
+ DataManagerCallback dummyManager4 = mock(DataManagerCallback.class);
+ DataManagerCallback dummyManager5 = mock(DataManagerCallback.class);
+ DataManagerCallback dummyManager6 = mock(DataManagerCallback.class);
+ dataManagerHolder.addDataManager("table", "segment1", dummyManager4);
+ dataManagerHolder.addDataManager("table", "segment2", dummyManager5);
+ dataManagerHolder.addDataManager("table2", "segment1", dummyManager6);
+
+ Set<DataManagerCallback> tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1");
+ ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager1, dummyManager4));
+
+ tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment2");
+ ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager2, dummyManager5));
+
+ tableSegmentDMs = dataManagerHolder.getDataManagers("table2", "segment1");
+ ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager6));
+ }
+
+ @Test
+ public void testRemoveDataManager() {
+ DataManagerCallback dummyManager4 = mock(DataManagerCallback.class);
+ DataManagerCallback dummyManager5 = mock(DataManagerCallback.class);
+ DataManagerCallback dummyManager6 = mock(DataManagerCallback.class);
+ dataManagerHolder.addDataManager("table", "segment1", dummyManager4);
+ dataManagerHolder.addDataManager("table", "segment2", dummyManager5);
+ dataManagerHolder.addDataManager("table2", "segment1", dummyManager6);
+ Set<DataManagerCallback> tableSegmentDMs;
+
+ // start deleting
+ dataManagerHolder.removeDataManager("table", "segment1", dummyManager1);
+ dataManagerHolder.removeDataManager("table", "segment1", dummyManager2);
+ dataManagerHolder.removeDataManager("table", "segment", dummyManager2);
+ tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1");
+ ensureSetEqual(tableSegmentDMs, ImmutableSet.of(dummyManager4));
+
+ // delete all segment
+ dataManagerHolder.removeDataManager("table", "segment1", dummyManager4);
+ tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1");
+ ensureSetEqual(tableSegmentDMs, ImmutableSet.of());
+
+ // delete some more
+ dataManagerHolder.removeDataManager("table", "segment1", dummyManager4);
+ tableSegmentDMs = dataManagerHolder.getDataManagers("table", "segment1");
+ ensureSetEqual(tableSegmentDMs, ImmutableSet.of());
+
+ // add some back and delete
+ dataManagerHolder.addDataManager("table", "segment1", dummyManager4);
+ ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of(dummyManager4));
+
+ dataManagerHolder.removeDataManager("table", "segment1", dummyManager4);
+ ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of());
+ }
+
+ @Test
+ public void testRemoveAllDataManagerForSegment() {
+ DataManagerCallback dummyManager4 = mock(DataManagerCallback.class);
+ DataManagerCallback dummyManager5 = mock(DataManagerCallback.class);
+ DataManagerCallback dummyManager6 = mock(DataManagerCallback.class);
+ dataManagerHolder.addDataManager("table", "segment1", dummyManager4);
+ dataManagerHolder.addDataManager("table", "segment2", dummyManager5);
+ dataManagerHolder.addDataManager("table2", "segment1", dummyManager6);
+
+ boolean result = dataManagerHolder.removeAllDataManagerForSegment("table", "segment1");
+ ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of());
+ Assert.assertTrue(result);
+
+ result = dataManagerHolder.removeAllDataManagerForSegment("table", "segment1");
+ ensureSetEqual(dataManagerHolder.getDataManagers("table", "segment1"), ImmutableSet.of());
+ Assert.assertFalse(result);
+
+ result = dataManagerHolder.removeAllDataManagerForSegment("table3", "segment1");
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void testMaybeRemoveTable() {
+
+ Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table"));
+ Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table2"));
+ Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table4"));
+
+ dataManagerHolder.removeAllDataManagerForSegment("table2", "segment3");
+ Assert.assertTrue(dataManagerHolder.maybeRemoveTable("table2"));
+
+ dataManagerHolder.removeAllDataManagerForSegment("table", "segment1");
+ Assert.assertFalse(dataManagerHolder.maybeRemoveTable("table"));
+ dataManagerHolder.removeAllDataManagerForSegment("table", "segment2");
+ Assert.assertTrue(dataManagerHolder.maybeRemoveTable("table"));
+ }
+
+ private <T> void ensureSetEqual(Set<T> set1, Set<T> set2) {
+ Assert.assertEquals(set2.size(), set1.size());
+ for (T o: set1) {
+ Assert.assertTrue(set2.contains(o));
+ }
+ }
+}
\ No newline at end of file
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java
index 13a7612..eb1f3ea 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java
@@ -26,19 +26,23 @@ import io.swagger.annotations.ApiResponses;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.TableDataManager;
-import org.apache.pinot.core.data.manager.UpsertSegmentDataManager;
import org.apache.pinot.server.starter.ServerInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
@Api(tags = "UpsertDebug")
@Path("/")
public class UpsertDebugResource {
+ private static final Logger LOGGER = LoggerFactory.getLogger(UpsertDebugResource.class);
@Inject
ServerInstance serverInstance;
@@ -64,6 +68,7 @@ public class UpsertDebugResource {
if (tableDataManager == null) {
return "no table for " + tableName;
}
+ /*
SegmentDataManager segmentDataManager = null;
try {
segmentDataManager = tableDataManager.acquireSegment(segmentName);
@@ -73,12 +78,20 @@ public class UpsertDebugResource {
if (!(segmentDataManager instanceof UpsertSegmentDataManager)) {
return "it is not an upsert table";
} else {
- return ((UpsertSegmentDataManager) segmentDataManager).getVirtualColumnInfo(Long.parseLong(offsetStr));
+ long offset = Long.parseLong(offsetStr);
+ LOGGER.info("getting virtual column for table {} segment {} offset {}", tableName, segmentName, offset);
+ return ( segmentDataManager).getVirtualColumnInfo(Long.parseLong(offsetStr));
}
+ } catch (Exception ex) {
+ LOGGER.error("failed to fetch virtual column info", ex);
+ throw new WebApplicationException("Failed to fetch virtual column info" + ex.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR);
} finally {
if (segmentDataManager != null) {
tableDataManager.releaseSegment(segmentDataManager);
}
}
+ */
+ return "";
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org