You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2020/05/27 11:58:12 UTC

[asterixdb] 03/03: Merge branch 'gerrit/mad-hatter'

This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 8fd7d467ce073a179204107954ed8d475dbed125
Merge: 9627b2a 1d6f811
Author: Michael Blow <mb...@apache.org>
AuthorDate: Tue May 26 15:34:42 2020 -0400

    Merge branch 'gerrit/mad-hatter'
    
    Change-Id: I1ed2ad00b1fb6ef5fe70e7a1d6a753d8da59e269

 .../org/apache/asterix/app/nc/HaltCallback.java    |  5 +-
 .../PersistentLocalResourceRepositoryTest.java     | 53 ++++++++++++++++++++++
 .../common/context/GlobalVirtualBufferCache.java   |  2 +-
 .../ioopcallbacks/LSMIOOperationCallback.java      | 32 ++++++++++++-
 .../asterix/common/storage/ResourceReference.java  | 14 ++++++
 .../replication/messaging/ComponentMaskTask.java   |  4 +-
 .../PersistentLocalResourceRepository.java         | 14 +-----
 .../java/org/apache/hyracks/util/ExitUtil.java     |  2 +-
 .../main/java/org/apache/hyracks/util/Span.java    | 21 ++++++++-
 9 files changed, 123 insertions(+), 24 deletions(-)

diff --cc asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
index e3bd13d,0000000..45594eb
mode 100644,000000..100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/GlobalVirtualBufferCache.java
@@@ -1,517 -1,0 +1,517 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.common.context;
 +
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.asterix.common.config.StorageProperties;
 +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 +import org.apache.hyracks.api.exceptions.HyracksDataException;
 +import org.apache.hyracks.api.io.FileReference;
 +import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 +import org.apache.hyracks.api.replication.IIOReplicationManager;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 +import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
 +import org.apache.hyracks.storage.am.lsm.common.impls.VirtualBufferCache;
 +import org.apache.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
 +import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 +import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper;
 +import org.apache.hyracks.storage.common.buffercache.IFIFOPageWriter;
 +import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
 +import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
 +import org.apache.hyracks.storage.common.buffercache.VirtualPage;
 +import org.apache.hyracks.storage.common.file.BufferedFileHandle;
 +import org.apache.hyracks.storage.common.file.IFileMapManager;
 +import org.apache.hyracks.util.ExitUtil;
 +import org.apache.logging.log4j.LogManager;
 +import org.apache.logging.log4j.Logger;
 +
 +import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 +import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
 +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 +
 +public class GlobalVirtualBufferCache implements IVirtualBufferCache, ILifeCycleComponent {
 +    private static final Logger LOGGER = LogManager.getLogger();
 +
 +    // keep track of the memory usage of each filtered memory component
 +    private final Map<ILSMMemoryComponent, AtomicInteger> memoryComponentUsageMap =
 +            Collections.synchronizedMap(new HashMap<>());
 +    private final Map<FileReference, AtomicInteger> fileRefUsageMap = Collections.synchronizedMap(new HashMap<>());
 +    private final Int2ObjectMap<AtomicInteger> fileIdUsageMap =
 +            Int2ObjectMaps.synchronize(new Int2ObjectOpenHashMap<>());
 +
 +    private final List<ILSMIndex> primaryIndexes = new ArrayList<>();
 +    private volatile int flushPtr;
 +    private volatile ILSMIndex flushingIndex;
 +
 +    private final int filteredMemoryComponentMaxNumPages;
 +    private final int flushPageBudget;
 +    private final VirtualBufferCache vbc;
 +    private final AtomicBoolean isOpen = new AtomicBoolean(false);
 +    private final FlushThread flushThread = new FlushThread();
 +
 +    public GlobalVirtualBufferCache(ICacheMemoryAllocator allocator, StorageProperties storageProperties) {
 +        this.vbc = new VirtualBufferCache(allocator, storageProperties.getBufferCachePageSize(),
 +                (int) (storageProperties.getMemoryComponentGlobalBudget()
 +                        / storageProperties.getMemoryComponentPageSize()));
 +        this.flushPageBudget = (int) (storageProperties.getMemoryComponentGlobalBudget()
 +                / storageProperties.getMemoryComponentPageSize()
 +                * storageProperties.getMemoryComponentFlushThreshold());
 +        this.filteredMemoryComponentMaxNumPages = storageProperties.getFilteredMemoryComponentMaxNumPages();
 +    }
 +
 +    @Override
 +    public int getPageSize() {
 +        return vbc.getPageSize();
 +    }
 +
 +    @Override
 +    public int getPageSizeWithHeader() {
 +        return vbc.getPageSizeWithHeader();
 +    }
 +
 +    @Override
 +    public synchronized void register(ILSMMemoryComponent memoryComponent) {
 +        ILSMIndex index = memoryComponent.getLsmIndex();
 +        if (index.isPrimaryIndex()) {
 +            if (!primaryIndexes.contains(index)) {
 +                // make sure only add index once
 +                primaryIndexes.add(index);
 +                if (LOGGER.isInfoEnabled()) {
 +                    LOGGER.info("Registered {} index {} to the global VBC",
 +                            isMetadataIndex(index) ? "metadata" : "primary", index.toString());
 +                }
 +            }
 +            if (index.getNumOfFilterFields() > 0) {
 +                // handle filtered primary index
 +                AtomicInteger usage = new AtomicInteger();
 +                memoryComponentUsageMap.put(memoryComponent, usage);
 +                for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
 +                    if (ref != null) {
 +                        fileRefUsageMap.put(ref, usage);
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public synchronized void unregister(ILSMMemoryComponent memoryComponent) {
 +        ILSMIndex index = memoryComponent.getLsmIndex();
 +        if (index.isPrimaryIndex()) {
 +            int pos = primaryIndexes.indexOf(index);
 +            if (pos >= 0) {
 +                primaryIndexes.remove(index);
 +                if (LOGGER.isInfoEnabled()) {
 +                    LOGGER.info("Unregistered {} index {} to the global VBC",
 +                            isMetadataIndex(index) ? "metadata" : "primary", index.toString());
 +                }
 +                if (primaryIndexes.isEmpty()) {
 +                    flushPtr = 0;
 +                } else if (flushPtr > pos) {
 +                    // If the removed index is before flushPtr, we should decrement flushPtr by 1 so that
 +                    // it still points to the same index.
 +                    flushPtr = (flushPtr - 1) % primaryIndexes.size();
 +                }
 +            }
 +            if (index.getNumOfFilterFields() > 0) {
 +                memoryComponentUsageMap.remove(memoryComponent);
 +                for (FileReference ref : memoryComponent.getComponentFileRefs().getFileReferences()) {
 +                    if (ref != null) {
 +                        fileRefUsageMap.remove(ref);
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void flushed(ILSMMemoryComponent memoryComponent) throws HyracksDataException {
 +        if (memoryComponent.getLsmIndex() == flushingIndex) {
 +            synchronized (this) {
 +                if (memoryComponent.getLsmIndex() == flushingIndex) {
 +                    flushingIndex = null;
 +                    // After the flush operation is completed, we may have 2 cases:
 +                    // 1. there is no active reader on this memory component and memory is reclaimed;
 +                    // 2. there are still some active readers and memory cannot be reclaimed.
 +                    // But for both cases, we will notify all primary index op trackers to let their writers retry,
 +                    // if they have been blocked. Moreover, we will check whether more flushes are needed.
 +                    final int size = primaryIndexes.size();
 +                    for (int i = 0; i < size; i++) {
 +                        ILSMOperationTracker opTracker = primaryIndexes.get(i).getOperationTracker();
 +                        synchronized (opTracker) {
 +                            opTracker.notifyAll();
 +                        }
 +                    }
 +
 +                    if (LOGGER.isInfoEnabled()) {
 +                        LOGGER.info("Completed flushing {}. Resetting flushIndex back to null.",
 +                                memoryComponent.getIndex().toString());
 +                    }
 +                }
 +            }
 +            checkAndNotifyFlushThread();
 +        }
 +        if (memoryComponent.getLsmIndex().getNumOfFilterFields() > 0
 +                && memoryComponent.getLsmIndex().isPrimaryIndex()) {
 +            AtomicInteger usage = memoryComponentUsageMap.get(memoryComponent);
 +            if (usage != null) {
 +                // reset usage to 0 after the memory component is flushed
 +                usage.set(0);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public int getPageBudget() {
 +        return vbc.getPageBudget();
 +    }
 +
 +    @Override
 +    public boolean isFull() {
 +        return vbc.isFull();
 +    }
 +
 +    @Override
 +    public boolean isFull(ILSMMemoryComponent memoryComponent) {
 +        return memoryComponent.getLsmIndex() == flushingIndex || isFilteredMemoryComponentFull(memoryComponent);
 +    }
 +
 +    private boolean isFilteredMemoryComponentFull(ILSMMemoryComponent memoryComponent) {
 +        if (filteredMemoryComponentMaxNumPages <= 0 || memoryComponent.getLsmIndex().getNumOfFilterFields() == 0
 +                || !memoryComponent.getLsmIndex().isPrimaryIndex()) {
 +            return false;
 +        }
 +        AtomicInteger usage = memoryComponentUsageMap.get(memoryComponent);
 +        return usage.get() >= filteredMemoryComponentMaxNumPages;
 +    }
 +
 +    @Override
 +    public int createFile(FileReference fileRef) throws HyracksDataException {
 +        int fileId = vbc.createFile(fileRef);
 +        updateFileIdUsageMap(fileRef, fileId);
 +        return fileId;
 +    }
 +
 +    @Override
 +    public int openFile(FileReference fileRef) throws HyracksDataException {
 +        int fileId = vbc.openFile(fileRef);
 +        updateFileIdUsageMap(fileRef, fileId);
 +        return fileId;
 +    }
 +
 +    private void updateFileIdUsageMap(FileReference fileRef, int fileId) {
 +        AtomicInteger usage = fileRefUsageMap.get(fileRef);
 +        if (usage != null) {
 +            fileIdUsageMap.put(fileId, usage);
 +        }
 +    }
 +
 +    @Override
 +    public void openFile(int fileId) throws HyracksDataException {
 +        vbc.openFile(fileId);
 +    }
 +
 +    @Override
 +    public void closeFile(int fileId) throws HyracksDataException {
 +        vbc.closeFile(fileId);
 +    }
 +
 +    @Override
 +    public void deleteFile(FileReference fileRef) throws HyracksDataException {
 +        vbc.deleteFile(fileRef);
 +    }
 +
 +    @Override
 +    public void deleteFile(int fileId) throws HyracksDataException {
 +        vbc.deleteFile(fileId);
 +    }
 +
 +    @Override
 +    public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
 +        ICachedPage page = vbc.pin(dpid, newPage);
 +        if (newPage) {
 +            incrementFilteredMemoryComponentUsage(dpid, 1);
 +            checkAndNotifyFlushThread();
 +        }
 +        return page;
 +    }
 +
 +    private void incrementFilteredMemoryComponentUsage(long dpid, int pages) {
 +        if (filteredMemoryComponentMaxNumPages > 0) {
 +            // update memory usage of filtered index
 +            AtomicInteger usage = fileIdUsageMap.get(BufferedFileHandle.getFileId(dpid));
 +            if (usage != null) {
 +                usage.addAndGet(pages);
 +                // We do not need extra code to flush this filtered memory component when it becomes full.
 +                // This method is only called when there are active writers on this memory component.
 +                // When the writer exits, it'll automatically flush this memory component when it finds out
 +                // that this memory component becomes full.
 +            }
 +        }
 +    }
 +
 +    private void checkAndNotifyFlushThread() {
 +        if (vbc.getUsage() < flushPageBudget || flushingIndex != null) {
 +            // For better performance, we only flush one dataset partition at a time.
 +            // After reclaiming memory from this dataset partition, its memory can be used by other indexes.
 +            // Thus, given N dataset partitions, each dataset partition will approximately receive 2/N of
 +            // the total memory instead of 1/N, which doubles the memory utilization.
 +            return;
 +        }
 +        // Notify the flush thread to schedule flushes. This is used to avoid deadlocks because page pins can be
 +        // called while synchronizing on op trackers.
 +        synchronized (flushThread.flushLock) {
 +            flushThread.flushLock.notifyAll();
 +        }
 +    }
 +
 +    @Override
 +    public void resizePage(ICachedPage cPage, int multiplier, IExtraPageBlockHelper extraPageBlockHelper)
 +            throws HyracksDataException {
 +        vbc.resizePage(cPage, multiplier, extraPageBlockHelper);
 +        int delta = multiplier - cPage.getFrameSizeMultiplier();
 +        incrementFilteredMemoryComponentUsage(((VirtualPage) cPage).dpid(), delta);
 +        if (delta > 0) {
 +            checkAndNotifyFlushThread();
 +        }
 +    }
 +
 +    @Override
 +    public void unpin(ICachedPage page) throws HyracksDataException {
 +        vbc.unpin(page);
 +    }
 +
 +    @Override
 +    public void flush(ICachedPage page) throws HyracksDataException {
 +        vbc.flush(page);
 +    }
 +
 +    @Override
 +    public void force(int fileId, boolean metadata) throws HyracksDataException {
 +        vbc.force(fileId, metadata);
 +    }
 +
 +    @Override
 +    public void open() throws HyracksDataException {
 +
 +    }
 +
 +    @Override
 +    public void close() throws HyracksDataException {
 +        // no op
 +    }
 +
 +    @Override
 +    public void start() {
 +        if (isOpen.compareAndSet(false, true)) {
 +            try {
 +                vbc.open();
 +            } catch (HyracksDataException e) {
 +                throw new IllegalStateException("Fail to open virtual buffer cache ", e);
 +            }
 +            flushThread.start();
 +        }
 +    }
 +
 +    @Override
 +    public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
 +        if (isOpen.compareAndSet(true, false)) {
 +            if (dumpState) {
 +                dumpState(ouputStream);
 +            }
 +            vbc.close();
 +            synchronized (flushThread.flushLock) {
 +                flushThread.flushLock.notifyAll();
 +            }
 +            try {
 +                flushThread.join();
 +            } catch (InterruptedException e) {
 +                Thread.currentThread().interrupt();
 +                throw HyracksDataException.create(e);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void dumpState(OutputStream os) throws IOException {
 +        os.write(vbc.toString().getBytes());
 +    }
 +
 +    @Override
 +    public IFileMapManager getFileMapProvider() {
 +        return vbc.getFileMapProvider();
 +    }
 +
 +    @Override
 +    public int getNumPagesOfFile(int fileId) throws HyracksDataException {
 +        return vbc.getNumPagesOfFile(fileId);
 +    }
 +
 +    @Override
 +    public void returnPage(ICachedPage page) {
 +        vbc.returnPage(page);
 +    }
 +
 +    @Override
 +    public IFIFOPageWriter createFIFOWriter(IPageWriteCallback callback, IPageWriteFailureCallback failureCallback) {
 +        return vbc.createFIFOWriter(callback, failureCallback);
 +    }
 +
 +    @Override
 +    public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
 +        return vbc.confiscatePage(dpid);
 +    }
 +
 +    @Override
 +    public ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId)
 +            throws HyracksDataException {
 +        return vbc.confiscateLargePage(dpid, multiplier, extraBlockPageId);
 +    }
 +
 +    @Override
 +    public void returnPage(ICachedPage page, boolean reinsert) {
 +        vbc.returnPage(page, reinsert);
 +    }
 +
 +    @Override
 +    public int getFileReferenceCount(int fileId) {
 +        return vbc.getFileReferenceCount(fileId);
 +    }
 +
 +    @Override
 +    public boolean isReplicationEnabled() {
 +        return vbc.isReplicationEnabled();
 +    }
 +
 +    @Override
 +    public IIOReplicationManager getIOReplicationManager() {
 +        return vbc.getIOReplicationManager();
 +    }
 +
 +    @Override
 +    public void purgeHandle(int fileId) throws HyracksDataException {
 +        vbc.purgeHandle(fileId);
 +    }
 +
 +    @Override
 +    public String toString() {
 +        return vbc.toString();
 +    }
 +
 +    @Override
 +    public void closeFileIfOpen(FileReference fileRef) {
 +        vbc.closeFileIfOpen(fileRef);
 +    }
 +
 +    @Override
 +    public int getUsage() {
 +        return vbc.getUsage();
 +    }
 +
 +    private boolean isMetadataIndex(ILSMIndex index) {
 +        BaseOperationTracker opTracker = (BaseOperationTracker) index.getOperationTracker();
 +        return MetadataIndexImmutableProperties.isMetadataDataset(opTracker.getDatasetInfo().getDatasetID());
 +    }
 +
 +    /**
 +     * We use a dedicated thread to schedule flushes to avoid deadlock. We cannot schedule flushes directly during
 +     * page pins because page pins can be called while synchronized on op trackers (e.g., when resetting a
 +     * memory component).
 +     */
 +    private class FlushThread extends Thread {
 +        private final Object flushLock = new Object();
 +
 +        @Override
 +        public void run() {
 +            while (isOpen.get()) {
 +                synchronized (flushLock) {
 +                    try {
 +                        flushLock.wait();
 +                    } catch (InterruptedException e) {
 +                        LOGGER.error("Flushing thread is interrupted unexpectedly.", e);
 +                    }
 +                }
 +                if (isOpen.get()) {
 +                    try {
 +                        scheduleFlush();
 +                    } catch (Throwable e) {
 +                        LOGGER.error("Unexpected exception when trying to schedule flushes.", e);
-                         ExitUtil.halt(ExitUtil.EC_FLUSH_FAILED);
++                        ExitUtil.halt(ExitUtil.EC_IO_SCHEDULER_FAILED);
 +                    }
 +                }
 +            }
 +        }
 +
 +        private void scheduleFlush() throws HyracksDataException {
 +            synchronized (GlobalVirtualBufferCache.this) {
 +                int cycles = 0;
 +                while (vbc.getUsage() >= flushPageBudget && flushingIndex == null && cycles <= primaryIndexes.size()) {
 +                    // find the first modified memory component while avoiding infinite loops
 +                    while (cycles <= primaryIndexes.size()
 +                            && primaryIndexes.get(flushPtr).isCurrentMutableComponentEmpty()) {
 +                        flushPtr = (flushPtr + 1) % primaryIndexes.size();
 +                        cycles++;
 +                    }
 +
 +                    ILSMIndex primaryIndex = primaryIndexes.get(flushPtr);
 +                    flushPtr = (flushPtr + 1) % primaryIndexes.size();
 +                    // we need to manually flush this memory component because it may be idle at this point
 +                    // note that this is different from flushing a filtered memory component
 +                    PrimaryIndexOperationTracker opTracker =
 +                            (PrimaryIndexOperationTracker) primaryIndex.getOperationTracker();
 +                    synchronized (opTracker) {
 +                        boolean flushable = !primaryIndex.isCurrentMutableComponentEmpty();
 +                        if (flushable && !opTracker.isFlushLogCreated()) {
 +                            // if the flush log has already been created, then we can simply wait for
 +                            // that flush to complete
 +                            opTracker.setFlushOnExit(true);
 +                            opTracker.flushIfNeeded();
 +                            // If the flush cannot be scheduled at this time, then there must be active writers.
 +                            // The flush will be eventually scheduled when writers exit
 +                            if (LOGGER.isInfoEnabled()) {
 +                                LOGGER.info("Requested {} flushing primary index {}",
 +                                        isMetadataIndex(primaryIndex) ? "metadata" : "primary",
 +                                        primaryIndex.toString());
 +                            }
 +                        }
 +                        if ((flushable || opTracker.isFlushLogCreated()) && !isMetadataIndex(primaryIndex)) {
 +                            // global vbc cannot wait on metadata indexes because metadata indexes support full
 +                            // ACID transactions. Waiting on metadata indexes can introduce deadlocks.
 +                            flushingIndex = primaryIndex;
 +                            LOGGER.debug("Waiting for flushing primary index {} to complete...", primaryIndex);
 +                            break;
 +                        }
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +}