You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2020/08/24 21:44:13 UTC

[asterixdb] branch master updated: [NO ISSUE][STO] Add API to get datasets pending IO

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3977ad2  [NO ISSUE][STO] Add API to get datasets pending IO
3977ad2 is described below

commit 3977ad24df8e2d3cc72c2226ec8ad6f6f58dddf6
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Thu Aug 20 20:08:46 2020 +0300

    [NO ISSUE][STO] Add API to get datasets pending IO
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Add a new API that gets the number of pending io (flush/merge)
      ops for all datasets on an NC.
    
    Change-Id: I062de60e36677f138c60855ff565a0610d80c998
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7644
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Reviewed-by: Till Westmann <ti...@apache.org>
---
 .../common/api/IDatasetLifecycleManager.java       |  6 ++++
 .../common/context/BaseOperationTracker.java       |  6 ++--
 .../apache/asterix/common/context/DatasetInfo.java | 37 +++++++++++++++++--
 .../common/context/DatasetLifecycleManager.java    | 11 ++++++
 .../ioopcallbacks/LSMIOOperationCallback.java      |  4 +--
 .../asterix/common/storage/StorageIOStats.java     | 41 ++++++++++++++++++++++
 .../storage/am/lsm/common/api/ILSMIOOperation.java |  3 +-
 7 files changed, 100 insertions(+), 8 deletions(-)

diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 10eddb1..f7cbf18 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -25,6 +25,7 @@ import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.context.IndexInfo;
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.StorageIOStats;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -143,4 +144,9 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd
      * @throws HyracksDataException
      */
     void waitForIO(IReplicationStrategy replicationStrategy) throws HyracksDataException;
+
+    /**
+     * @return the current datasets io stats
+     */
+    StorageIOStats getDatasetsIOStats();
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 3ff13cb..f8a81e4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.common.context;
 
+import static org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType.REPLICATE;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -38,7 +40,7 @@ public class BaseOperationTracker implements ITransactionOperationTracker {
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.declareActiveIOOperation();
+            dsInfo.declareActiveIOOperation(REPLICATE);
         }
     }
 
@@ -54,7 +56,7 @@ public class BaseOperationTracker implements ITransactionOperationTracker {
     public void completeOperation(ILSMIndex index, LSMOperationType opType, ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.undeclareActiveIOOperation();
+            dsInfo.undeclareActiveIOOperation(REPLICATE);
         }
     }
 
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 3fcc528..72dacc4 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -28,6 +28,7 @@ import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -42,6 +43,8 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
     private final ILogManager logManager;
     private final LogRecord waitLog = new LogRecord();
     private int numActiveIOOps;
+    private int pendingFlushes;
+    private int pendingMerges;
     private long lastAccess;
     private boolean isExternal;
     private boolean isRegistered;
@@ -70,12 +73,32 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
         setLastAccess(System.currentTimeMillis());
     }
 
-    public synchronized void declareActiveIOOperation() {
+    public synchronized void declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
         numActiveIOOps++;
+        switch (opType) {
+            case FLUSH:
+                pendingFlushes++;
+                break;
+            case MERGE:
+                pendingMerges++;
+                break;
+            default:
+                break;
+        }
     }
 
-    public synchronized void undeclareActiveIOOperation() {
+    public synchronized void undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
         numActiveIOOps--;
+        switch (opType) {
+            case FLUSH:
+                pendingFlushes--;
+                break;
+            case MERGE:
+                pendingMerges--;
+                break;
+            default:
+                break;
+        }
         //notify threads waiting on this dataset info
         notifyAll();
     }
@@ -204,7 +227,7 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
             while (numActiveIOOps > 0) {
                 try {
                     /**
-                     * Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation()}
+                     * Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType)}
                      */
                     wait();
                 } catch (InterruptedException e) {
@@ -220,4 +243,12 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
             }
         }
     }
+
+    public synchronized int getPendingFlushes() {
+        return pendingFlushes;
+    }
+
+    public synchronized int getPendingMerges() {
+        return pendingMerges;
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 0750749..5ea79b3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -38,6 +38,7 @@ import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.common.storage.StorageIOStats;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
@@ -519,6 +520,16 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         }
     }
 
+    @Override
+    public StorageIOStats getDatasetsIOStats() {
+        StorageIOStats stats = new StorageIOStats();
+        for (DatasetResource dsr : datasets.values()) {
+            stats.addPendingFlushes(dsr.getDatasetInfo().getPendingFlushes());
+            stats.addPendingMerges(dsr.getDatasetInfo().getPendingMerges());
+        }
+        return stats;
+    }
+
     private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
         if (indexInfo.isOpen()) {
             ILSMOperationTracker opTracker = indexInfo.getIndex().getOperationTracker();
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index c3737da..ce6d253 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -259,7 +259,7 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback {
 
     @Override
     public synchronized void scheduled(ILSMIOOperation operation) throws HyracksDataException {
-        dsInfo.declareActiveIOOperation();
+        dsInfo.declareActiveIOOperation(operation.getIOOpertionType());
         if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
             pendingFlushes++;
             FlushOperation flush = (FlushOperation) operation;
@@ -282,7 +282,7 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback {
                         pendingFlushes == 0 ? firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN);
             }
         }
-        dsInfo.undeclareActiveIOOperation();
+        dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType());
     }
 
     public synchronized boolean hasPendingFlush() {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
new file mode 100644
index 0000000..0b44f76
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/StorageIOStats.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+public class StorageIOStats {
+
+    private int pendingFlushes;
+    private int pendingMerges;
+
+    public void addPendingFlushes(int pending) {
+        pendingFlushes += pending;
+    }
+
+    public void addPendingMerges(int pending) {
+        pendingMerges += pending;
+    }
+
+    public int getPendingFlushes() {
+        return pendingFlushes;
+    }
+
+    public int getPendingMerges() {
+        return pendingMerges;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
index 8084c81..753d27a 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperation.java
@@ -37,7 +37,8 @@ public interface ILSMIOOperation extends Callable<LSMIOOperationStatus>, IPageWr
         FLUSH,
         MERGE,
         LOAD,
-        NOOP
+        NOOP,
+        REPLICATE
     }
 
     /**