You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/03/16 17:08:51 UTC

asterixdb git commit: [NO ISSUE][RT] Fix wait for IO operations

Repository: asterixdb
Updated Branches:
  refs/heads/master 8798c324a -> 282a79198


[NO ISSUE][RT] Fix wait for IO operations

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Some operations such as close dataset, delete components,
  and drop index need to wait for IO operations.
- Before this change, the wait for IO operation would just check
  the count of IO operations on the dataset info. This is not
  enough as a flush might have started by writing the flush log to
  the log tail but only on the flush of that log, we trigger the
  flush operation and the count of IO operation increases.
- To address this problem, we write a wait log before we check the
  IO operation count ensuring that any flush logs in the log tail
  have been flushed and counts incremented.

Change-Id: Ibfa883410cd24e0af54732f7ea6f1b4eb2184e8e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2495
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/282a7919
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/282a7919
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/282a7919

Branch: refs/heads/master
Commit: 282a79198af022563c3d93c51e0d9a052e140ab2
Parents: 8798c32
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Thu Mar 15 22:52:53 2018 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Fri Mar 16 10:08:33 2018 -0700

----------------------------------------------------------------------
 .../asterix/common/context/DatasetInfo.java     | 43 ++++++++++++--------
 .../common/context/DatasetLifecycleManager.java |  2 +-
 .../CorrelatedPrefixMergePolicyTest.java        |  2 +-
 .../management/service/logging/LogManager.java  |  3 +-
 4 files changed, 31 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/282a7919/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 44baf77..f4d764a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -23,6 +23,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.logging.log4j.LogManager;
@@ -35,6 +38,8 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
     // resourceID -> index
     private final Map<Long, IndexInfo> indexes;
     private final int datasetID;
+    private final ILogManager logManager;
+    private final LogRecord waitLog = new LogRecord();
     private int numActiveIOOps;
     private long lastAccess;
     private boolean isExternal;
@@ -42,13 +47,16 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
     private boolean memoryAllocated;
     private boolean durable;
 
-    public DatasetInfo(int datasetID) {
+    public DatasetInfo(int datasetID, ILogManager logManager) {
         this.partitionIndexes = new HashMap<>();
         this.indexes = new HashMap<>();
         this.setLastAccess(-1);
         this.datasetID = datasetID;
         this.setRegistered(false);
         this.setMemoryAllocated(false);
+        this.logManager = logManager;
+        waitLog.setLogType(LogType.WAIT);
+        waitLog.computeAndSetLogSize();
     }
 
     @Override
@@ -199,23 +207,26 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
         this.lastAccess = lastAccess;
     }
 
-    public synchronized void waitForIO() throws HyracksDataException {
-        while (numActiveIOOps > 0) {
-            try {
-                /**
-                 * Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation()}
-                 */
-                wait();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw HyracksDataException.create(e);
+    public void waitForIO() throws HyracksDataException {
+        logManager.log(waitLog);
+        synchronized (this) {
+            while (numActiveIOOps > 0) {
+                try {
+                    /**
+                     * Will be Notified by {@link DatasetInfo#undeclareActiveIOOperation()}
+                     */
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
+                }
             }
-        }
-        if (numActiveIOOps < 0) {
-            if (LOGGER.isErrorEnabled()) {
-                LOGGER.error("Number of IO operations cannot be negative for dataset: " + this);
+            if (numActiveIOOps < 0) {
+                if (LOGGER.isErrorEnabled()) {
+                    LOGGER.error("Number of IO operations cannot be negative for dataset: " + this);
+                }
+                throw new IllegalStateException("Number of IO operations cannot be negative");
             }
-            throw new IllegalStateException("Number of IO operations cannot be negative");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/282a7919/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index f25e4f6..b715eec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -239,7 +239,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
         synchronized (datasets) {
             dsr = datasets.get(did);
             if (dsr == null) {
-                DatasetInfo dsInfo = new DatasetInfo(did);
+                DatasetInfo dsInfo = new DatasetInfo(did, logManager);
                 int partitions = MetadataIndexImmutableProperties.isMetadataDataset(did) ? METADATA_DATASETS_PARTITIONS
                         : numPartitions;
                 DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/282a7919/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index f9f742a..befbeed 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -189,7 +189,7 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase {
         properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT));
         properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE));
 
-        DatasetInfo dsInfo = new DatasetInfo(DATASET_ID);
+        DatasetInfo dsInfo = new DatasetInfo(DATASET_ID, null);
         for (IndexInfo index : indexInfos) {
             dsInfo.addIndex(index.getResourceId(), index);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/282a7919/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 6208cef..3ada608 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -161,7 +161,8 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     }
 
     synchronized void syncAppendToLogTail(ILogRecord logRecord) {
-        if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH) {
+        if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
+                && logRecord.getLogType() != LogType.WAIT) {
             ITransactionContext txnCtx = logRecord.getTxnCtx();
             if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
                 throw new ACIDException(