You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2021/04/07 12:47:52 UTC

[asterixdb] 11/25: [NO ISSUE][STO] Delete invalid indexes during cluster global recovery

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 17670aab184fe12fa30dc79376e819e07fac43c4
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Wed Mar 31 21:18:18 2021 +0300

    [NO ISSUE][STO] Delete invalid indexes during cluster global recovery
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - Before starting cluster global recovery, send to all NCs valid
      dataset ids from the metadata node.
    - Delete any invalid indexes on NCs based on the metadata received
      from the CC.
    - Add storage options to enable/disable global storage recovery.
      This allows tests that create storage objects without using the
      metadata node to bypass global cleanup.
    - Add storage option to specify the timeout for nodes to perform
      global storage cleanup.
    - Add test case for global storage recovery.
    - Adapt existing test cases that require bypassing global cleanup.
    
    Change-Id: Idee73e57fa5879c3b9aab5f881bf848e225f874b
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10784
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Reviewed-by: Michael Blow <mb...@apache.org>
---
 .../app/message/StorageCleanupRequestMessage.java  | 108 +++++++++++++++++++++
 .../apache/asterix/app/message/VoidResponse.java   |  63 ++++++++++++
 .../hyracks/bootstrap/GlobalRecoveryManager.java   |  30 ++++++
 .../test/dataflow/GlobalStorageCleanupTest.java    |  68 +++++++++++++
 .../test/dataflow/LSMFlushRecoveryTest.java        |  12 ++-
 .../api/cluster_state_1/cluster_state_1.1.regexadm |   1 +
 .../cluster_state_1_full.1.regexadm                |   1 +
 .../cluster_state_1_less.1.regexadm                |   1 +
 .../asterix/common/config/StorageProperties.java   |  25 ++++-
 9 files changed, 304 insertions(+), 5 deletions(-)

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
new file mode 100644
index 0000000..85269a1
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import static org.apache.hyracks.util.ExitUtil.EC_NC_FAILED_TO_NOTIFY_TASKS_COMPLETED;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.messaging.CcIdentifiedMessage;
+import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.hyracks.util.ExitUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class StorageCleanupRequestMessage extends CcIdentifiedMessage implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final Set<Integer> validDatasetIds;
+    private final long reqId;
+
+    public StorageCleanupRequestMessage(long reqId, Set<Integer> validDatasetIds) {
+        this.validDatasetIds = validDatasetIds;
+        this.reqId = reqId;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appContext) throws HyracksDataException, InterruptedException {
+        INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker();
+        PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) appContext.getLocalResourceRepository();
+        Map<Long, LocalResource> localResources = localResourceRepository.loadAndGetAllResources();
+        for (LocalResource resource : localResources.values()) {
+            DatasetLocalResource lr = (DatasetLocalResource) resource.getResource();
+            if (MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())) {
+                // skip metadata indexes
+                continue;
+            }
+            if (!validDatasetIds.contains(lr.getDatasetId())) {
+                LOGGER.warn("found invalid index {} with dataset id {}", resource.getPath(), lr.getDatasetId());
+                deleteInvalidIndex(appContext, localResourceRepository, resource);
+            }
+        }
+        try {
+            broker.sendMessageToPrimaryCC(new VoidResponse(reqId, null));
+        } catch (Exception e) {
+            LOGGER.error("failed to notify CC of storage clean up; halting...", e);
+            ExitUtil.halt(EC_NC_FAILED_TO_NOTIFY_TASKS_COMPLETED);
+        }
+    }
+
+    private void deleteInvalidIndex(INcApplicationContext appContext,
+            PersistentLocalResourceRepository localResourceRepository, LocalResource resource)
+            throws HyracksDataException {
+        IDatasetLifecycleManager lcManager = appContext.getDatasetLifecycleManager();
+        String resourceRelPath = resource.getPath();
+        synchronized (lcManager) {
+            IIndex index;
+            index = lcManager.get(resourceRelPath);
+            if (index != null) {
+                LOGGER.warn("unregistering invalid index {}", resourceRelPath);
+                lcManager.unregister(resourceRelPath);
+            } else {
+                LOGGER.warn("initializing unregistered invalid index {}", resourceRelPath);
+                try {
+                    index = resource.getResource().createInstance(appContext.getServiceContext());
+                } catch (Exception e) {
+                    LOGGER.warn("failed to initialize invalid index {}", resourceRelPath, e);
+                }
+            }
+            localResourceRepository.delete(resourceRelPath);
+            if (index != null) {
+                index.destroy();
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return StorageCleanupRequestMessage.class.getSimpleName();
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/VoidResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/VoidResponse.java
new file mode 100644
index 0000000..6a51c2d
--- /dev/null
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/VoidResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker.ResponseState;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.common.messaging.api.INcResponse;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * A response to a request only indicating success or failure
+ */
+public class VoidResponse implements ICcAddressedMessage, INcResponse {
+
+    private static final long serialVersionUID = 1L;
+    private final Long reqId;
+    private final Throwable failure;
+
+    public VoidResponse(Long reqId, Throwable failure) {
+        this.reqId = reqId;
+        this.failure = failure;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException {
+        ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker();
+        broker.respond(reqId, this);
+    }
+
+    @Override
+    public void setResult(MutablePair<ResponseState, Object> result) {
+        if (failure != null) {
+            result.setLeft(ResponseState.FAILURE);
+            result.setRight(failure);
+        } else {
+            result.setLeft(ResponseState.SUCCESS);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "{ \"response\" : \"" + (failure == null ? "success" : failure.getClass().getSimpleName()) + "\"}";
+    }
+}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 8165316..cf2af95 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -18,11 +18,15 @@
  */
 package org.apache.asterix.hyracks.bootstrap;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.asterix.app.message.StorageCleanupRequestMessage;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
@@ -32,6 +36,7 @@ import org.apache.asterix.common.config.DatasetConfig.TransactionState;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -111,6 +116,10 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager {
             LOGGER.info("Starting Global Recovery");
             MetadataManager.INSTANCE.init();
             MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            if (appCtx.getStorageProperties().isStorageGlobalCleanup()) {
+                int storageGlobalCleanupTimeout = appCtx.getStorageProperties().getStorageGlobalCleanupTimeout();
+                performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout);
+            }
             mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             recoveryCompleted = true;
@@ -122,6 +131,27 @@ public class GlobalRecoveryManager implements IGlobalRecoveryManager {
         }
     }
 
+    protected void performGlobalStorageCleanup(MetadataTransactionContext mdTxnCtx, int storageGlobalCleanupTimeoutSecs)
+            throws Exception {
+        List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+        Set<Integer> validDatasetIds = new HashSet<>();
+        for (Dataverse dataverse : dataverses) {
+            List<Dataset> dataverseDatasets =
+                    MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName());
+            dataverseDatasets.stream().map(Dataset::getDatasetId).forEach(validDatasetIds::add);
+        }
+        ICcApplicationContext ccAppCtx = (ICcApplicationContext) serviceCtx.getApplicationContext();
+        final List<String> ncs = new ArrayList<>(ccAppCtx.getClusterStateManager().getParticipantNodes());
+        CCMessageBroker messageBroker = (CCMessageBroker) ccAppCtx.getServiceContext().getMessageBroker();
+        long reqId = messageBroker.newRequestId();
+        List<StorageCleanupRequestMessage> requests = new ArrayList<>();
+        for (int i = 0; i < ncs.size(); i++) {
+            requests.add(new StorageCleanupRequestMessage(reqId, validDatasetIds));
+        }
+        messageBroker.sendSyncRequestToNCs(reqId, ncs, requests,
+                TimeUnit.SECONDS.toMillis(storageGlobalCleanupTimeoutSecs));
+    }
+
     protected MetadataTransactionContext doRecovery(ICcApplicationContext appCtx, MetadataTransactionContext mdTxnCtx)
             throws Exception {
         // Loop over datasets
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalStorageCleanupTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalStorageCleanupTest.java
new file mode 100644
index 0000000..84107fb
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/GlobalStorageCleanupTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class GlobalStorageCleanupTest {
+
+    public static final Logger LOGGER = LogManager.getLogger();
+    private static TestNodeController nc;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        System.out.println("SetUp: ");
+        TestHelper.deleteExistingInstanceFiles();
+        String configPath = System.getProperty("user.dir") + File.separator + "src" + File.separator + "test"
+                + File.separator + "resources" + File.separator + "cc.conf";
+        nc = new TestNodeController(configPath, false);
+    }
+
+    @Test
+    public void globalStorageCleanup() throws Exception {
+        nc.init(true);
+        LSMFlushRecoveryTest.nc = nc;
+        LSMFlushRecoveryTest lsmFlushRecoveryTest = new LSMFlushRecoveryTest();
+        lsmFlushRecoveryTest.initializeTestCtx();
+        lsmFlushRecoveryTest.createIndex();
+        lsmFlushRecoveryTest.readIndex();
+        nc.deInit(false);
+        nc.init(false);
+        // the index should deleted after the node initialization
+        lsmFlushRecoveryTest.initializeTestCtx();
+        boolean failedToReadIndex = false;
+        try {
+            lsmFlushRecoveryTest.readIndex();
+        } catch (Exception e) {
+            failedToReadIndex = true;
+            Assert.assertTrue(e.getMessage().contains(ErrorCode.INDEX_DOES_NOT_EXIST.errorCode()));
+        }
+        Assert.assertTrue(failedToReadIndex);
+        nc.deInit(false);
+    }
+}
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
index c3a6839..9c6e95e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java
@@ -77,7 +77,7 @@ import org.junit.Test;
 
 public class LSMFlushRecoveryTest {
     public static final Logger LOGGER = LogManager.getLogger();
-    private static TestNodeController nc;
+    public static TestNodeController nc;
     private static Dataset dataset;
     private static PrimaryIndexInfo[] primaryIndexInfos;
     private static SecondaryIndexInfo[] secondaryIndexInfo;
@@ -156,6 +156,10 @@ public class LSMFlushRecoveryTest {
     }
 
     private void initializeNc(boolean cleanUpOnStart) throws Exception {
+        // disable global clean up for this test to allow internal index creation
+        List<Pair<IOption, Object>> opts = new ArrayList<>();
+        opts.add(Pair.of(Option.STORAGE_GLOBAL_CLEANUP, false));
+        nc.setOpts(opts);
         nc.init(cleanUpOnStart);
         ncAppCtx = nc.getAppRuntimeContext();
         // Override the LSMIOScheduler to avoid halting on failure and enable
@@ -177,7 +181,7 @@ public class LSMFlushRecoveryTest {
         dsLifecycleMgr = ncAppCtx.getDatasetLifecycleManager();
     }
 
-    private void createIndex() throws Exception {
+    public void createIndex() throws Exception {
         dataset = StorageTestUtils.DATASET;
         secondaryIndexEntity = new Index(dataset.getDataverseName(), dataset.getDatasetName(), SECONDARY_INDEX_NAME,
                 SECONDARY_INDEX_TYPE, SECONDARY_INDEX_FIELD_NAMES, SECONDARY_INDEX_FIELD_INDICATORS,
@@ -193,7 +197,7 @@ public class LSMFlushRecoveryTest {
 
     }
 
-    private void initializeTestCtx() throws Exception {
+    public void initializeTestCtx() throws Exception {
         JobId jobId = nc.newJobId();
         testCtxs = new IHyracksTaskContext[NUM_PARTITIONS];
         for (int i = 0; i < NUM_PARTITIONS; i++) {
@@ -203,7 +207,7 @@ public class LSMFlushRecoveryTest {
                 new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
     }
 
-    private void readIndex() throws HyracksDataException {
+    public void readIndex() throws HyracksDataException {
         primaryIndexDataflowHelpers = new IIndexDataflowHelper[NUM_PARTITIONS];
         primaryIndexes = new TestLsmBtree[NUM_PARTITIONS];
         for (int i = 0; i < NUM_PARTITIONS; i++) {
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
index eba50dc..f4dfdd1 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm
@@ -44,6 +44,7 @@
     "replication\.timeout" : 30,
     "ssl\.enabled" : false,
     "storage.compression.block" : "snappy",
+    "storage.global.cleanup.timeout" : 600,
     "storage.lsm.bloomfilter.falsepositiverate" : 0.01,
     "txn\.commitprofiler\.enabled" : false,
     "txn\.commitprofiler\.reportinterval" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
index 98faa65..8fc48f9 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm
@@ -44,6 +44,7 @@
     "replication\.timeout" : 30,
     "ssl\.enabled" : false,
     "storage.compression.block" : "snappy",
+    "storage.global.cleanup.timeout" : 600,
     "storage.lsm.bloomfilter.falsepositiverate" : 0.01,
     "txn\.commitprofiler\.enabled" : false,
     "txn\.commitprofiler\.reportinterval" : 5,
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
index a92f7d1..ed265e5 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm
@@ -44,6 +44,7 @@
     "replication\.timeout" : 30,
     "ssl\.enabled" : false,
     "storage.compression.block" : "snappy",
+    "storage.global.cleanup.timeout" : 600,
     "storage.lsm.bloomfilter.falsepositiverate" : 0.01,
     "txn\.commitprofiler\.enabled" : false,
     "txn\.commitprofiler\.reportinterval" : 5,
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
index d9463bf..12c9c68 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/StorageProperties.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.config;
 
+import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN;
 import static org.apache.hyracks.control.common.config.OptionTypes.DOUBLE;
 import static org.apache.hyracks.control.common.config.OptionTypes.INTEGER_BYTE_UNIT;
 import static org.apache.hyracks.control.common.config.OptionTypes.LONG_BYTE_UNIT;
@@ -27,6 +28,7 @@ import static org.apache.hyracks.control.common.config.OptionTypes.STRING;
 import static org.apache.hyracks.util.StorageUtil.StorageUnit.KILOBYTE;
 import static org.apache.hyracks.util.StorageUtil.StorageUnit.MEGABYTE;
 
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
@@ -57,7 +59,9 @@ public class StorageProperties extends AbstractProperties {
         STORAGE_WRITE_RATE_LIMIT(LONG_BYTE_UNIT, 0l),
         STORAGE_MAX_CONCURRENT_FLUSHES_PER_PARTITION(NONNEGATIVE_INTEGER, 2),
         STORAGE_MAX_SCHEDULED_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 8),
-        STORAGE_MAX_CONCURRENT_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 2);
+        STORAGE_MAX_CONCURRENT_MERGES_PER_PARTITION(NONNEGATIVE_INTEGER, 2),
+        STORAGE_GLOBAL_CLEANUP(BOOLEAN, true),
+        STORAGE_GLOBAL_CLEANUP_TIMEOUT(POSITIVE_INTEGER, (int) TimeUnit.MINUTES.toSeconds(10));
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -72,6 +76,8 @@ public class StorageProperties extends AbstractProperties {
             switch (this) {
                 case STORAGE_COMPRESSION_BLOCK:
                 case STORAGE_LSM_BLOOMFILTER_FALSEPOSITIVERATE:
+                case STORAGE_GLOBAL_CLEANUP:
+                case STORAGE_GLOBAL_CLEANUP_TIMEOUT:
                     return Section.COMMON;
                 default:
                     return Section.NC;
@@ -119,6 +125,10 @@ public class StorageProperties extends AbstractProperties {
                     return "The maximum number of scheduled merges per partition (0 means unlimited)";
                 case STORAGE_MAX_CONCURRENT_MERGES_PER_PARTITION:
                     return "The maximum number of concurrently executed merges per partition (0 means unlimited)";
+                case STORAGE_GLOBAL_CLEANUP:
+                    return "Indicates whether or not global storage cleanup is performed";
+                case STORAGE_GLOBAL_CLEANUP_TIMEOUT:
+                    return "The maximum time to wait for nodes to respond to global storage cleanup requests";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -138,6 +148,11 @@ public class StorageProperties extends AbstractProperties {
         public String usageDefaultOverride(IApplicationConfig accessor, Function<IOption, String> optionPrinter) {
             return null;
         }
+
+        @Override
+        public boolean hidden() {
+            return this == STORAGE_GLOBAL_CLEANUP;
+        }
     }
 
     public static final long MAX_HEAP_BYTES = Runtime.getRuntime().maxMemory();
@@ -227,6 +242,14 @@ public class StorageProperties extends AbstractProperties {
         return value != 0 ? value * numPartitions : Integer.MAX_VALUE;
     }
 
+    public boolean isStorageGlobalCleanup() {
+        return accessor.getBoolean(Option.STORAGE_GLOBAL_CLEANUP);
+    }
+
+    public int getStorageGlobalCleanupTimeout() {
+        return accessor.getInt(Option.STORAGE_GLOBAL_CLEANUP_TIMEOUT);
+    }
+
     protected int getMetadataDatasets() {
         return MetadataIndexImmutableProperties.METADATA_DATASETS_COUNT;
     }