You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2018/03/21 20:52:37 UTC

[1/5] asterixdb git commit: [NO ISSUE][RT] Report all errors on SuperActivityOperatorNodePushable

Repository: asterixdb
Updated Branches:
  refs/heads/release-0.9.4-pre-rc [created] e6587f628


[NO ISSUE][RT] Report all errors on SuperActivityOperatorNodePushable

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Currently, if a failure happens in SuperActivityOperatorNodePushable,
  we only report that failure and miss the rest of the failures.
  This is especially critical in case of job cancellation since we
  don't know where each thread was interrupted.
- After this change, we suppress all other failures in the root
  failure for reporting purposes.

Change-Id: Ibbf31dd91303ce2f606734fcccb19270875266b3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2500
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/3c32971e
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/3c32971e
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/3c32971e

Branch: refs/heads/release-0.9.4-pre-rc
Commit: 3c32971e4636bc99784cd62185718a401a45e059
Parents: 1b412c5
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Mon Mar 19 18:40:43 2018 -0700
Committer: Michael Blow <mb...@apache.org>
Committed: Mon Mar 19 21:25:18 2018 -0700

----------------------------------------------------------------------
 .../SuperActivityOperatorNodePushable.java      | 22 +++++--
 .../org/apache/hyracks/control/nc/Task.java     | 65 +++++++++-----------
 2 files changed, 46 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3c32971e/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 83ab532..d499554 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
@@ -44,6 +45,7 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.util.ExceptionUtils;
 
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
@@ -193,15 +195,20 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable
     }
 
     private void runInParallel(OperatorNodePushableAction action) throws HyracksDataException {
-        List<Future<Void>> tasks = new ArrayList<>();
+        List<Future<Void>> tasks = new ArrayList<>(operatorNodePushablesBFSOrder.size());
+        Queue<Throwable> failures = new ArrayBlockingQueue<>(operatorNodePushablesBFSOrder.size());
         final Semaphore startSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
         final Semaphore completeSemaphore = new Semaphore(1 - operatorNodePushablesBFSOrder.size());
+        Throwable root = null;
         try {
             for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
                 tasks.add(ctx.getExecutorService().submit(() -> {
                     startSemaphore.release();
                     try {
                         action.run(op);
+                    } catch (Throwable th) { // NOSONAR: Must catch all causes of failure
+                        failures.offer(th);
+                        throw th;
                     } finally {
                         completeSemaphore.release();
                     }
@@ -211,13 +218,16 @@ public class SuperActivityOperatorNodePushable implements IOperatorNodePushable
             for (Future<Void> task : tasks) {
                 task.get();
             }
-        } catch (InterruptedException e) {
-            cancelTasks(tasks, startSemaphore, completeSemaphore);
-            Thread.currentThread().interrupt();
-            throw HyracksDataException.create(e);
         } catch (ExecutionException e) {
+            root = e.getCause();
+        } catch (Throwable e) { // NOSONAR: Must catch all causes of failure
+            root = e;
+        }
+        if (root != null) {
+            final Throwable failure = root;
             cancelTasks(tasks, startSemaphore, completeSemaphore);
-            throw HyracksDataException.create(e.getCause());
+            failures.forEach(t -> ExceptionUtils.suppress(failure, t));
+            throw HyracksDataException.create(failure);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3c32971e/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index dcfc291..9d99968 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -102,7 +102,7 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
 
     private volatile boolean aborted;
 
-    private NodeControllerService ncs;
+    private final NodeControllerService ncs;
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
 
@@ -286,67 +286,62 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
         }
         ct.setName(displayName + ":" + taskAttemptId + ":" + 0);
         try {
-            Exception operatorException = null;
+            Throwable operatorException = null;
             try {
                 operator.initialize();
                 if (collectors.length > 0) {
                     final Semaphore sem = new Semaphore(collectors.length - 1);
                     for (int i = 1; i < collectors.length; ++i) {
+                        // Q. Do we ever have a task that has more than one collector?
                         final IPartitionCollector collector = collectors[i];
                         final IFrameWriter writer = operator.getInputFrameWriter(i);
-                        sem.acquire();
+                        sem.acquireUninterruptibly();
                         final int cIdx = i;
                         executorService.execute(() -> {
-                            Thread thread = Thread.currentThread();
-                            // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
-                            // the thread is not escaped from interruption.
-                            if (!addPendingThread(thread)) {
-                                return;
-                            }
-                            thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
-                            thread.setPriority(Thread.MIN_PRIORITY);
                             try {
-                                pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
-                            } catch (HyracksDataException e) {
-                                synchronized (Task.this) {
-                                    exceptions.add(e);
+                                Thread thread = Thread.currentThread();
+                                // Calls synchronized addPendingThread(..) to make sure that in the abort() method,
+                                // the thread is not escaped from interruption.
+                                if (!addPendingThread(thread)) {
+                                    return;
+                                }
+                                thread.setName(displayName + ":" + taskAttemptId + ":" + cIdx);
+                                thread.setPriority(Thread.MIN_PRIORITY);
+                                try {
+                                    pushFrames(collector, inputChannelsFromConnectors.get(cIdx), writer);
+                                } catch (HyracksDataException e) {
+                                    synchronized (Task.this) {
+                                        exceptions.add(e);
+                                    }
+                                } finally {
+                                    removePendingThread(thread);
                                 }
                             } finally {
                                 sem.release();
-                                removePendingThread(thread);
                             }
                         });
                     }
                     try {
                         pushFrames(collectors[0], inputChannelsFromConnectors.get(0), operator.getInputFrameWriter(0));
                     } finally {
-                        sem.acquire(collectors.length - 1);
+                        sem.acquireUninterruptibly(collectors.length - 1);
                     }
                 }
-            } catch (Exception e) {
-                // Store the operator exception
+            } catch (Throwable e) { // NOSONAR: Must catch all failures
                 operatorException = e;
-                throw e;
             } finally {
                 try {
                     operator.deinitialize();
-                } catch (Exception e) {
-                    if (operatorException != null) {
-                        // Add deinitialize exception to the operator exception to keep track of both
-                        operatorException.addSuppressed(e);
-                    } else {
-                        operatorException = e;
-                    }
-                    throw operatorException;
+                } catch (Throwable e) { // NOSONAR: Must catch all failures
+                    operatorException = ExceptionUtils.suppress(operatorException, e);
                 }
             }
-            NodeControllerService ncs = joblet.getNodeController();
+            if (operatorException != null) {
+                throw operatorException;
+            }
             ncs.getWorkQueue().schedule(new NotifyTaskCompleteWork(ncs, this));
-        } catch (InterruptedException e) {
-            exceptions.add(e);
-            Thread.currentThread().interrupt();
-        } catch (Exception e) {
-            exceptions.add(e);
+        } catch (Throwable e) { // NOSONAR: Catch all failures
+            exceptions.add(HyracksDataException.create(e));
         } finally {
             close();
             removePendingThread(ct);
@@ -360,7 +355,6 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
                             exceptions.get(i));
                 }
             }
-            NodeControllerService ncs = joblet.getNodeController();
             ExceptionUtils.setNodeIds(exceptions, ncs.getId());
             ncs.getWorkQueue()
                     .schedule(new NotifyTaskFailureWork(ncs, this, exceptions, joblet.getJobId(), taskAttemptId));
@@ -457,6 +451,7 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
         return ncs.createOrGetJobParameterByteStore(joblet.getJobId()).getParameterValue(name, start, length);
     }
 
+    @Override
     public Set<JobFlag> getJobFlags() {
         return jobFlags;
     }


[2/5] asterixdb git commit: [NO ISSUE] Simplify IoUtil delete API

Posted by im...@apache.org.
[NO ISSUE] Simplify IoUtil delete API

Change-Id: I0dabcb642fa3007b3b6e9d30be9911a22ed8f252
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2502
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/60bdec3b
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/60bdec3b
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/60bdec3b

Branch: refs/heads/release-0.9.4-pre-rc
Commit: 60bdec3bfb328806aa97bbb1580446e994c1ab9b
Parents: 3c32971
Author: Michael Blow <mb...@apache.org>
Authored: Tue Mar 20 09:37:22 2018 -0400
Committer: Michael Blow <mb...@apache.org>
Committed: Tue Mar 20 09:52:17 2018 -0700

----------------------------------------------------------------------
 .../replication/messaging/DropIndexTask.java    |  2 +-
 .../org/apache/hyracks/api/util/IoUtil.java     | 42 +++++++-------------
 2 files changed, 16 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60bdec3b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
index b7f0985..4bd97c1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
@@ -52,7 +52,7 @@ public class DropIndexTask implements IReplicaTask {
             final File indexFile = ioManager.resolve(file).getFile();
             if (indexFile.exists()) {
                 File indexDir = indexFile.getParentFile();
-                IoUtil.deleteDirectory(indexDir);
+                IoUtil.delete(indexDir);
                 LOGGER.info(() -> "Deleted index: " + indexFile.getAbsolutePath());
             } else {
                 LOGGER.warning(() -> "Requested to delete a non-existing index: " + indexFile.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/60bdec3b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
index 396c026..03227ee 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/IoUtil.java
@@ -46,30 +46,29 @@ public class IoUtil {
     /**
      * Delete a file
      *
-     * @param fileRef
-     *            the file to be deleted
-     * @throws HyracksDataException
-     *             if the file couldn't be deleted
+     * @param fileRef the file to be deleted
+     * @throws HyracksDataException if the file couldn't be deleted
      */
     public static void delete(FileReference fileRef) throws HyracksDataException {
         delete(fileRef.getFile());
     }
 
     /**
-     * Delete a file
+     * Delete a file or directory
      *
-     * @param file
-     *            the file to be deleted
-     * @throws HyracksDataException
-     *             if the file couldn't be deleted
+     * @param file the file to be deleted
+     * @throws HyracksDataException if the file (or directory if exists) couldn't be deleted
      */
     public static void delete(File file) throws HyracksDataException {
         try {
             if (file.isDirectory()) {
-                deleteDirectory(file);
-            } else {
-                Files.delete(file.toPath());
+                if (!file.exists()) {
+                    return;
+                } else if (!FileUtils.isSymlink(file)) {
+                    cleanDirectory(file);
+                }
             }
+            Files.delete(file.toPath());
         } catch (NoSuchFileException | FileNotFoundException e) {
             LOGGER.warn(() -> FILE_NOT_FOUND_MSG + ": " + e.getMessage(), e);
         } catch (IOException e) {
@@ -80,10 +79,8 @@ public class IoUtil {
     /**
      * Create a file on disk
      *
-     * @param fileRef
-     *            the file to create
-     * @throws HyracksDataException
-     *             if the file already exists or if it couldn't be created
+     * @param fileRef the file to create
+     * @throws HyracksDataException if the file already exists or if it couldn't be created
      */
     public static void create(FileReference fileRef) throws HyracksDataException {
         if (fileRef.getFile().exists()) {
@@ -99,17 +96,7 @@ public class IoUtil {
         }
     }
 
-    public static void deleteDirectory(File directory) throws IOException {
-        if (!directory.exists()) {
-            return;
-        }
-        if (!FileUtils.isSymlink(directory)) {
-            cleanDirectory(directory);
-        }
-        Files.delete(directory.toPath());
-    }
-
-    public static void cleanDirectory(final File directory) throws IOException {
+    private static void cleanDirectory(final File directory) throws IOException {
         final File[] files = verifiedListFiles(directory);
         for (final File file : files) {
             delete(file);
@@ -133,4 +120,5 @@ public class IoUtil {
         }
         return files;
     }
+
 }


[3/5] asterixdb git commit: [ASTERIXDB-2321][STO] Follow the contract in IIndexCursor.open calls

Posted by im...@apache.org.
[ASTERIXDB-2321][STO] Follow the contract in IIndexCursor.open calls

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- The index cursor contract says that an open call which returns
  successfully, leaves the cursor in the open state, otherwise,
  the cursor remains in the closed state.
- The LSM cursors have many cursors inside. In the
  case where one of the cursors fails to open, and an exception
  is about to be thrown, we must close all previously open cursors
  since the LSM cursor will be in the closed state
  and close will not be called.

Change-Id: I19db2afd2d6ca4a2ca1056cd95ae504b2be69813
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2501
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4d0de7dd
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4d0de7dd
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4d0de7dd

Branch: refs/heads/release-0.9.4-pre-rc
Commit: 4d0de7dd511f77c1ec9693a03a6506156b2dc007
Parents: 60bdec3
Author: Abdullah Alamoudi <ba...@gmail.com>
Authored: Mon Mar 19 18:42:36 2018 -0700
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Tue Mar 20 10:03:00 2018 -0700

----------------------------------------------------------------------
 .../impls/LSMBTreeDiskComponentScanCursor.java  |  22 ++--
 .../btree/impls/LSMBTreeRangeSearchCursor.java  |  16 ++-
 .../impls/LSMBTreeWithBuddySortedCursor.java    |  20 +++-
 .../btree/impls/LSMBuddyBTreeMergeCursor.java   |  13 ++-
 .../storage/am/lsm/common/api/ILSMIndex.java    |   9 ++
 ...nvertedIndexDeletedKeysBTreeMergeCursor.java |  14 ++-
 .../LSMRTreeDeletedKeysBTreeMergeCursor.java    |  11 +-
 .../lsm/rtree/impls/LSMRTreeSortedCursor.java   |  20 ++--
 ...LSMRTreeWithAntiMatterTuplesFlushCursor.java |  13 +--
 ...SMRTreeWithAntiMatterTuplesSearchCursor.java |  31 ++++--
 .../storage/common/util/IndexCursorUtils.java   | 109 +++++++++++++++++++
 11 files changed, 223 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4d0de7dd/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
index a120296..a6c8393 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponentScanCursor.java
@@ -40,6 +40,7 @@ import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMBTreeDiskComponentScanCursor extends LSMIndexSearchCursor {
 
@@ -77,15 +78,20 @@ public class LSMBTreeDiskComponentScanCursor extends LSMIndexSearchCursor {
             BTree btree = (BTree) component.getIndex();
             btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             rangeCursors[i] = btreeAccessors[i].createSearchCursor(false);
-            btreeAccessors[i].search(rangeCursors[i], searchPred);
         }
-
-        cursorIndexPointable = new IntegerPointable();
-        int length = IntegerPointable.TYPE_TRAITS.getFixedLength();
-        cursorIndexPointable.set(new byte[length], 0, length);
-
-        setPriorityQueueComparator();
-        initPriorityQueue();
+        IndexCursorUtils.open(btreeAccessors, rangeCursors, searchPred);
+        try {
+            cursorIndexPointable = new IntegerPointable();
+            int length = IntegerPointable.TYPE_TRAITS.getFixedLength();
+            cursorIndexPointable.set(new byte[length], 0, length);
+            setPriorityQueueComparator();
+            initPriorityQueue();
+        } catch (Throwable th) { // NOSONAR: Must call this on
+            for (int i = 0; i < numBTrees; i++) {
+                IndexCursorUtils.close(rangeCursors[i], th);
+            }
+            throw HyracksDataException.create(th);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4d0de7dd/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
index 0cb7d1c..bfda985 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java
@@ -42,6 +42,7 @@ import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
     private final ArrayTupleReference copyTuple;
@@ -349,7 +350,6 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
             rangeCursors = new IIndexCursor[numBTrees];
             btreeAccessors = new BTreeAccessor[numBTrees];
         }
-
         for (int i = 0; i < numBTrees; i++) {
             ILSMComponent component = operationalComponents.get(i);
             BTree btree;
@@ -365,12 +365,16 @@ public class LSMBTreeRangeSearchCursor extends LSMIndexSearchCursor {
                 btreeAccessors[i].reset(btree, NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
                 rangeCursors[i].close();
             }
-            btreeAccessors[i].search(rangeCursors[i], searchPred);
         }
-
-        setPriorityQueueComparator();
-        initPriorityQueue();
-        canCallProceed = true;
+        IndexCursorUtils.open(btreeAccessors, rangeCursors, searchPred);
+        try {
+            setPriorityQueueComparator();
+            initPriorityQueue();
+            canCallProceed = true;
+        } catch (Throwable th) { // NOSONAR Must catch all
+            IndexCursorUtils.close(rangeCursors, th);
+            throw HyracksDataException.create(th);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4d0de7dd/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
index 2913be1..b600f12 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddySortedCursor.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMBTreeWithBuddySortedCursor extends LSMBTreeWithBuddyAbstractCursor {
     // TODO: This class can be removed and instead use a search cursor that uses
@@ -160,12 +161,21 @@ public class LSMBTreeWithBuddySortedCursor extends LSMBTreeWithBuddyAbstractCurs
         foundNext = false;
         for (int i = 0; i < numberOfTrees; i++) {
             btreeCursors[i].close();
-            btreeAccessors[i].search(btreeCursors[i], btreeRangePredicate);
-            if (btreeCursors[i].hasNext()) {
-                btreeCursors[i].next();
-            } else {
-                depletedBtreeCursors[i] = true;
+        }
+        IndexCursorUtils.open(btreeAccessors, btreeCursors, btreeRangePredicate);
+        try {
+            for (int i = 0; i < numberOfTrees; i++) {
+                if (btreeCursors[i].hasNext()) {
+                    btreeCursors[i].next();
+                } else {
+                    depletedBtreeCursors[i] = true;
+                }
+            }
+        } catch (Throwable th) { // NOSONAR Must catch all failures to close before throwing
+            for (int i = 0; i < numberOfTrees; i++) {
+                IndexCursorUtils.close(btreeCursors[i], th);
             }
+            throw HyracksDataException.create(th);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4d0de7dd/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
index fd13e62..b35c5d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBuddyBTreeMergeCursor.java
@@ -32,6 +32,7 @@ import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMBuddyBTreeMergeCursor extends LSMIndexSearchCursor {
 
@@ -55,7 +56,6 @@ public class LSMBuddyBTreeMergeCursor extends LSMIndexSearchCursor {
         lsmHarness = null;
         int numBTrees = operationalComponents.size();
         rangeCursors = new IIndexCursor[numBTrees];
-
         RangePredicate btreePredicate = new RangePredicate(null, null, true, true, cmp, cmp);
         IIndexAccessor[] btreeAccessors = new ITreeIndexAccessor[numBTrees];
         for (int i = 0; i < numBTrees; i++) {
@@ -64,9 +64,14 @@ public class LSMBuddyBTreeMergeCursor extends LSMIndexSearchCursor {
             rangeCursors[i] = new BTreeRangeSearchCursor(leafFrame, false);
             BTree buddyBtree = ((LSMBTreeWithBuddyDiskComponent) component).getBuddyIndex();
             btreeAccessors[i] = buddyBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            btreeAccessors[i].search(rangeCursors[i], btreePredicate);
         }
-        setPriorityQueueComparator();
-        initPriorityQueue();
+        IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
+        try {
+            setPriorityQueueComparator();
+            initPriorityQueue();
+        } catch (Throwable th) { // NOSONAR: Must catch all failures
+            IndexCursorUtils.close(rangeCursors, th);
+            throw HyracksDataException.create(th);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4d0de7dd/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 62493f4..a8467d3 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -61,6 +61,15 @@ public interface ILSMIndex extends IIndex {
 
     void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException;
 
+    /**
+     * If this method returns successfully, then the cursor has been opened, and need to be closed
+     * Otherwise, it has not been opened
+     *
+     * @param ictx
+     * @param cursor
+     * @param pred
+     * @throws HyracksDataException
+     */
     void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException;
 
     public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor cursor) throws HyracksDataException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4d0de7dd/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
index 21ce940..f1f5241 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDeletedKeysBTreeMergeCursor.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMInvertedIndexDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
 
@@ -48,7 +49,8 @@ public class LSMInvertedIndexDeletedKeysBTreeMergeCursor extends LSMIndexSearchC
                 (LSMInvertedIndexRangeSearchCursorInitialState) initialState;
         cmp = lsmInitialState.getOriginalKeyComparator();
         operationalComponents = lsmInitialState.getOperationalComponents();
-        // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already do that when we merge the inverted indexes.
+        // We intentionally set the lsmHarness to null so that we don't call lsmHarness.endSearch() because we already
+        // do that when we merge the inverted indexes.
         lsmHarness = null;
         int numBTrees = operationalComponents.size();
         rangeCursors = new IIndexCursor[numBTrees];
@@ -60,7 +62,13 @@ public class LSMInvertedIndexDeletedKeysBTreeMergeCursor extends LSMIndexSearchC
             rangeCursors[i] = btreeAccessors.get(i).createSearchCursor(false);
             btreeAccessors.get(i).search(rangeCursors[i], btreePredicate);
         }
-        setPriorityQueueComparator();
-        initPriorityQueue();
+        IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
+        try {
+            setPriorityQueueComparator();
+            initPriorityQueue();
+        } catch (Throwable th) { // NOSONAR: Must catch all failures
+            IndexCursorUtils.close(rangeCursors, th);
+            throw HyracksDataException.create(th);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4d0de7dd/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
index 892fe83..b57b517 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDeletedKeysBTreeMergeCursor.java
@@ -33,6 +33,7 @@ import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMRTreeDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
 
@@ -65,7 +66,13 @@ public class LSMRTreeDeletedKeysBTreeMergeCursor extends LSMIndexSearchCursor {
             btreeAccessors[i] = btree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             btreeAccessors[i].search(rangeCursors[i], btreePredicate);
         }
-        setPriorityQueueComparator();
-        initPriorityQueue();
+        IndexCursorUtils.open(btreeAccessors, rangeCursors, btreePredicate);
+        try {
+            setPriorityQueueComparator();
+            initPriorityQueue();
+        } catch (Throwable th) { // NOSONAR: Must catch all failures
+            IndexCursorUtils.close(rangeCursors, th);
+            throw HyracksDataException.create(th);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4d0de7dd/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
index 9bbc3e1..483e082 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeSortedCursor.java
@@ -28,6 +28,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMRTreeSortedCursor extends LSMRTreeAbstractCursor {
 
@@ -163,14 +164,19 @@ public class LSMRTreeSortedCursor extends LSMRTreeAbstractCursor {
         super.doOpen(initialState, searchPred);
         depletedRtreeCursors = new boolean[numberOfTrees];
         foundNext = false;
-        for (int i = 0; i < numberOfTrees; i++) {
-            rtreeCursors[i].close();
-            rtreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
-            if (rtreeCursors[i].hasNext()) {
-                rtreeCursors[i].next();
-            } else {
-                depletedRtreeCursors[i] = true;
+        try {
+            for (int i = 0; i < numberOfTrees; i++) {
+                rtreeCursors[i].close();
+                rtreeAccessors[i].search(rtreeCursors[i], rtreeSearchPredicate);
+                if (rtreeCursors[i].hasNext()) {
+                    rtreeCursors[i].next();
+                } else {
+                    depletedRtreeCursors[i] = true;
+                }
             }
+        } catch (Throwable th) { // NOSONAR. Must catch all failures
+            IndexCursorUtils.close(rtreeCursors, th);
+            throw HyracksDataException.create(th);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4d0de7dd/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java
index 449c711..427575b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesFlushCursor.java
@@ -27,6 +27,7 @@ import org.apache.hyracks.storage.common.EnforcedIndexCursor;
 import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMRTreeWithAntiMatterTuplesFlushCursor extends EnforcedIndexCursor implements ILSMIndexCursor {
     private final TreeTupleSorter rTreeTupleSorter;
@@ -49,17 +50,13 @@ public class LSMRTreeWithAntiMatterTuplesFlushCursor extends EnforcedIndexCursor
 
     @Override
     public void doOpen(ICursorInitialState initialState, ISearchPredicate searchPred) throws HyracksDataException {
-        boolean rtreeOpen = false;
-        boolean btreeOpen = false;
         try {
             rTreeTupleSorter.open(initialState, searchPred);
-            rtreeOpen = true;
             bTreeTupleSorter.open(initialState, searchPred);
-            btreeOpen = true;
-        } finally {
-            if (rtreeOpen && !btreeOpen) {
-                rTreeTupleSorter.close();
-            }
+        } catch (Throwable th) { // NOSONAR: Must catch all failures
+            IndexCursorUtils.close(bTreeTupleSorter, th);
+            IndexCursorUtils.close(rTreeTupleSorter, th);
+            throw HyracksDataException.create(th);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4d0de7dd/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
index 094acbc..7db65bd 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuplesSearchCursor.java
@@ -42,6 +42,7 @@ import org.apache.hyracks.storage.common.ICursorInitialState;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
+import org.apache.hyracks.storage.common.util.IndexCursorUtils;
 
 public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCursor {
 
@@ -113,19 +114,25 @@ public class LSMRTreeWithAntiMatterTuplesSearchCursor extends LSMIndexSearchCurs
         rangeCursors = new RTreeSearchCursor[numImmutableComponents];
         ITreeIndexAccessor[] immutableRTreeAccessors = new ITreeIndexAccessor[numImmutableComponents];
         int j = 0;
-        for (int i = numMemoryComponents; i < operationalComponents.size(); i++) {
-            ILSMComponent component = operationalComponents.get(i);
-            rangeCursors[j] = new RTreeSearchCursor(
-                    (IRTreeInteriorFrame) lsmInitialState.getRTreeInteriorFrameFactory().createFrame(),
-                    (IRTreeLeafFrame) lsmInitialState.getRTreeLeafFrameFactory().createFrame());
-            RTree rtree = ((LSMRTreeWithAntimatterDiskComponent) component).getIndex();
-            immutableRTreeAccessors[j] = rtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-            immutableRTreeAccessors[j].search(rangeCursors[j], searchPred);
-            j++;
+        try {
+            for (int i = numMemoryComponents; i < operationalComponents.size(); i++) {
+                ILSMComponent component = operationalComponents.get(i);
+                rangeCursors[j] = new RTreeSearchCursor(
+                        (IRTreeInteriorFrame) lsmInitialState.getRTreeInteriorFrameFactory().createFrame(),
+                        (IRTreeLeafFrame) lsmInitialState.getRTreeLeafFrameFactory().createFrame());
+                RTree rtree = ((LSMRTreeWithAntimatterDiskComponent) component).getIndex();
+                immutableRTreeAccessors[j] = rtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+                immutableRTreeAccessors[j].search(rangeCursors[j], searchPred);
+                j++;
+            }
+            searchNextCursor();
+            setPriorityQueueComparator();
+            initPriorityQueue();
+        } catch (Throwable th) { // NOSONAR: Must catch all failures
+            IndexCursorUtils.close(rangeCursors, th);
+            IndexCursorUtils.close(mutableRTreeCursors, th);
+            throw HyracksDataException.create(th);
         }
-        searchNextCursor();
-        setPriorityQueueComparator();
-        initPriorityQueue();
         open = true;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4d0de7dd/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/util/IndexCursorUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/util/IndexCursorUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/util/IndexCursorUtils.java
new file mode 100644
index 0000000..7587a2a
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/util/IndexCursorUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.common.util;
+
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.storage.common.IIndexAccessor;
+import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchPredicate;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class IndexCursorUtils {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private IndexCursorUtils() {
+    }
+
+    /**
+     * Close the IIndexCursor and suppress any Throwable thrown by the close call.
+     * This method must NEVER throw any Throwable
+     *
+     * @param cursor
+     *            the cursor to close
+     * @param root
+     *            the first exception encountered during release of resources
+     * @return the root Throwable if not null or a new Throwable if any was thrown, otherwise, it returns null
+     */
+    public static Throwable close(IIndexCursor cursor, Throwable root) {
+        if (cursor != null) {
+            try {
+                cursor.close();
+            } catch (Throwable th) { // NOSONAR Will be suppressed
+                try {
+                    LOGGER.log(Level.WARN, "Failure closing a cursor", th);
+                } catch (Throwable loggingFailure) { // NOSONAR: Ignore catching Throwable
+                    // NOSONAR ignore logging failure
+                }
+                root = ExceptionUtils.suppress(root, th); // NOSONAR: Using the same variable is not bad in this context
+            }
+        }
+        return root;
+    }
+
+    public static void open(List<IIndexAccessor> accessors, IIndexCursor[] cursors, ISearchPredicate pred)
+            throws HyracksDataException {
+        int opened = 0;
+        try {
+            for (int i = 0; i < cursors.length; i++) {
+                if (accessors.get(i) != null) {
+                    accessors.get(i).search(cursors[i], pred);
+                }
+                opened++;
+            }
+        } catch (Throwable th) { // NOSONAR: Much catch all failures
+            for (int j = 0; j < opened; j++) {
+                IndexCursorUtils.close(cursors[j], th);
+            }
+            throw HyracksDataException.create(th);
+        }
+    }
+
+    public static void open(IIndexAccessor[] accessors, IIndexCursor[] cursors, ISearchPredicate pred)
+            throws HyracksDataException {
+        int opened = 0;
+        try {
+            for (int i = 0; i < accessors.length; i++) {
+                if (accessors[i] != null) {
+                    accessors[i].search(cursors[i], pred);
+                }
+                opened++;
+            }
+        } catch (Throwable th) { // NOSONAR: Much catch all failures
+            for (int j = 0; j < opened; j++) {
+                IndexCursorUtils.close(cursors[j], th);
+            }
+            throw HyracksDataException.create(th);
+        }
+    }
+
+    public static Throwable close(IIndexCursor[] cursors, Throwable th) {
+        for (int j = 0; j < cursors.length; j++) {
+            th = IndexCursorUtils.close(cursors[j], th); // NOSONAR: Using the same variable is cleaner in this context
+        }
+        return th;
+    }
+
+}


[5/5] asterixdb git commit: [ASTERIXDB-1708][TX] Prevent log deletion during scan

Posted by im...@apache.org.
[ASTERIXDB-1708][TX] Prevent log deletion during scan

Right now there is a potential for a soft checkpoint to delete a
log file that is about to be read as part of a transaction rollback.
This patch stops the soft checkpoint from proceeding if a rollback
is about to take place and vice-versa.

Change-Id: Icff1a520af24c8fac8e5836cdbf46425b78b1260
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2508
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <mh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e6587f62
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e6587f62
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e6587f62

Branch: refs/heads/release-0.9.4-pre-rc
Commit: e6587f628a72f10594e676ec375c1cf26e73ec53
Parents: 01d7fe9
Author: Ian Maxon <im...@apache.org>
Authored: Wed Mar 21 11:58:21 2018 -0700
Committer: Ian Maxon <im...@apache.org>
Committed: Wed Mar 21 13:51:47 2018 -0700

----------------------------------------------------------------------
 .../apache/asterix/app/nc/RecoveryManager.java  |  19 +--
 .../asterix/test/logging/CheckpointingTest.java | 116 ++++++++++++++++---
 .../common/transactions/ICheckpointManager.java |  17 ++-
 .../management/service/logging/LogManager.java  |  61 +++-------
 .../service/recovery/CheckpointManager.java     |  28 ++++-
 .../service/transaction/TransactionManager.java |   5 +-
 6 files changed, 173 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4a2cf2d..4b14a9c 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -59,6 +59,7 @@ import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -104,6 +105,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     private SystemState state;
     private final INCServiceContext serviceCtx;
     private final INcApplicationContext appCtx;
+    private static final TxnId recoveryTxnId = new TxnId(-1);
 
     public RecoveryManager(ITransactionSubsystem txnSubsystem, INCServiceContext serviceCtx) {
         this.serviceCtx = serviceCtx;
@@ -505,21 +507,24 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
     }
 
     @Override
-    public void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException {
-        long minLSN = getPartitionsMinLSN(partitions);
-        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-        if (minLSN < readableSmallestLSN) {
-            minLSN = readableSmallestLSN;
-        }
-
+    public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush)
+            throws HyracksDataException {
         //replay logs > minLSN that belong to these partitions
         try {
+            checkpointManager.secure(recoveryTxnId);
+            long minLSN = getPartitionsMinLSN(partitions);
+            long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+            if (minLSN < readableSmallestLSN) {
+                minLSN = readableSmallestLSN;
+            }
             replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN);
             if (flush) {
                 appCtx.getDatasetLifecycleManager().flushAllDatasets();
             }
         } catch (IOException | ACIDException e) {
             throw HyracksDataException.create(e);
+        } finally {
+            checkpointManager.completed(recoveryTxnId);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 91d98e5..418282e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -28,16 +28,19 @@ import java.util.List;
 import org.apache.asterix.app.bootstrap.TestNodeController;
 import org.apache.asterix.app.data.gen.TupleGenerator;
 import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.app.nc.RecoveryManager;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.ICheckpointManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.asterix.external.util.DataflowUtils;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -50,20 +53,23 @@ import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.test.common.TestHelper;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
 import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.asterix.transaction.management.service.transaction.TransactionManager;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
-import org.apache.hyracks.util.StorageUtil;
-import org.apache.hyracks.util.StorageUtil.StorageUnit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 public class CheckpointingTest {
 
@@ -88,6 +94,8 @@ public class CheckpointingTest {
     private static final String DATASET_NAME = "TestDS";
     private static final String DATA_TYPE_NAME = "DUMMY";
     private static final String NODE_GROUP_NAME = "DEFAULT";
+    private volatile boolean threadException = false;
+    private Throwable exception = null;
 
     @Before
     public void setUp() throws Exception {
@@ -128,7 +136,7 @@ public class CheckpointingTest {
                 VSizeFrame frame = new VSizeFrame(ctx);
                 FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
 
-                IRecoveryManager recoveryManager = nc.getTransactionSubsystem().getRecoveryManager();
+                RecoveryManager recoveryManager = (RecoveryManager) nc.getTransactionSubsystem().getRecoveryManager();
                 ICheckpointManager checkpointManager = nc.getTransactionSubsystem().getCheckpointManager();
                 LogManager logManager = (LogManager) nc.getTransactionSubsystem().getLogManager();
                 // Number of log files after node startup should be one
@@ -178,20 +186,70 @@ public class CheckpointingTest {
 
                 /*
                  * At this point, the low-water mark is not in the initialLowWaterMarkFileId, so
-                 * a checkpoint should delete it.
+                 * a checkpoint should delete it. We will also start a second
+                  * job to ensure that the checkpointing coexists peacefully
+                  * with other concurrent readers of the log that request
+                  * deletions to be witheld
                  */
-                checkpointManager.tryCheckpoint(recoveryManager.getMinFirstLSN());
 
-                // Validate initialLowWaterMarkFileId was deleted
-                for (Long fileId : logManager.getLogFileIds()) {
-                    Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
+                JobId jobId2 = nc.newJobId();
+                IHyracksTaskContext ctx2 = nc.createTestContext(jobId2, 0, false);
+                nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx2),
+                        new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
+                // Prepare insert operation
+                LSMInsertDeleteOperatorNodePushable insertOp2 = nc.getInsertPipeline(ctx2, dataset, KEY_TYPES,
+                        RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft();
+                insertOp2.open();
+                VSizeFrame frame2 = new VSizeFrame(ctx2);
+                FrameTupleAppender tupleAppender2 = new FrameTupleAppender(frame2);
+                for (int i = 0; i < 4; i++) {
+                    long lastCkpoint = recoveryManager.getMinFirstLSN();
+                    long lastFileId = logManager.getLogFileId(lastCkpoint);
+
+                    checkpointManager.tryCheckpoint(lowWaterMarkLSN);
+                    // Validate initialLowWaterMarkFileId was deleted
+                    for (Long fileId : logManager.getLogFileIds()) {
+                        Assert.assertNotEquals(initialLowWaterMarkFileId, fileId.longValue());
+                    }
+
+                    while (currentLowWaterMarkLogFileId == lastFileId) {
+                        ITupleReference tuple = tupleGenerator.next();
+                        DataflowUtils.addTupleToFrame(tupleAppender2, tuple, insertOp2);
+                        lowWaterMarkLSN = recoveryManager.getMinFirstLSN();
+                        currentLowWaterMarkLogFileId = logManager.getLogFileId(lowWaterMarkLSN);
+                    }
                 }
+                Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+                    public void uncaughtException(Thread th, Throwable ex) {
+                        threadException = true;
+                        exception = ex;
+                    }
+                };
+
+                Thread t = new Thread(() -> {
+                    TransactionManager spyTxnMgr = spy((TransactionManager) nc.getTransactionManager());
+                    doAnswer((Answer) i -> {
+                        stallAbortTxn(Thread.currentThread(), txnCtx, nc.getTransactionSubsystem(),
+                                (TxnId) i.getArguments()[0]);
+                        return null;
+                    }).when(spyTxnMgr).abortTransaction(any(TxnId.class));
 
-                if (tupleAppender.getTupleCount() > 0) {
-                    tupleAppender.write(insertOp, true);
+                    spyTxnMgr.abortTransaction(txnCtx.getTxnId());
+                });
+                t.setUncaughtExceptionHandler(h);
+                synchronized (t) {
+                    t.start();
+                    t.wait();
+                }
+                long lockedLSN = recoveryManager.getMinFirstLSN();
+                checkpointManager.tryCheckpoint(lockedLSN);
+                synchronized (t) {
+                    t.notifyAll();
+                }
+                t.join();
+                if (threadException) {
+                    throw exception;
                 }
-                insertOp.close();
-                nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             } finally {
                 nc.deInit();
             }
@@ -201,6 +259,32 @@ public class CheckpointingTest {
         }
     }
 
+    private void stallAbortTxn(Thread t, ITransactionContext txnCtx, ITransactionSubsystem txnSubsystem, TxnId txnId)
+            throws InterruptedException, HyracksDataException {
+
+        try {
+            if (txnCtx.isWriteTxn()) {
+                LogRecord logRecord = new LogRecord();
+                TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
+                txnSubsystem.getLogManager().log(logRecord);
+                txnSubsystem.getCheckpointManager().secure(txnId);
+                synchronized (t) {
+                    t.notifyAll();
+                    t.wait();
+                }
+                txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+                txnCtx.setTxnState(ITransactionManager.ABORTED);
+            }
+        } catch (ACIDException | HyracksDataException e) {
+            String msg = "Could not complete rollback! System is in an inconsistent state";
+            throw new ACIDException(msg, e);
+        } finally {
+            txnCtx.complete();
+            txnSubsystem.getLockManager().releaseLocks(txnCtx);
+            txnSubsystem.getCheckpointManager().completed(txnId);
+        }
+    }
+
     @Test
     public void testCorruptedCheckpointFiles() {
         try {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
index 9e7eb0d..e3cf8b8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ICheckpointManager.java
@@ -46,4 +46,19 @@ public interface ICheckpointManager extends ILifeCycleComponent {
      * @throws HyracksDataException
      */
     long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException;
-}
\ No newline at end of file
+
+    /**
+     * Secures the current low-water mark until the transaction identified by {@code id} completes.
+     *
+     * @param id
+     * @throws HyracksDataException
+     */
+    void secure(TxnId id) throws HyracksDataException;
+
+    /**
+     * Notifies this {@link ICheckpointManager} that the transaction identified by {@code id} completed.
+     *
+     * @param id
+     */
+    void completed(TxnId id);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 3ada608..736de07 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -76,7 +76,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     private final String logFilePrefix;
     private final MutableLong flushLSN;
     private final String nodeId;
-    private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>();
     private final long logFileSize;
     private final int logPageSize;
     private final AtomicLong appendLSN;
@@ -407,24 +406,20 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
             /**
              * At this point, any future LogReader should read from LSN >= checkpointLSN
              */
-            synchronized (txnLogFileId2ReaderCount) {
-                for (Long id : logFileIds) {
-                    /**
-                     * Stop deletion if:
-                     * The log file which contains the checkpointLSN has been reached.
-                     * The oldest log file being accessed by a LogReader has been reached.
-                     */
-                    if (id >= checkpointLSNLogFileID
-                            || (txnLogFileId2ReaderCount.containsKey(id) && txnLogFileId2ReaderCount.get(id) > 0)) {
-                        break;
-                    }
-                    //delete old log file
-                    File file = new File(getLogFilePath(id));
-                    file.delete();
-                    txnLogFileId2ReaderCount.remove(id);
-                    if (LOGGER.isInfoEnabled()) {
-                        LOGGER.info("Deleted log file " + file.getAbsolutePath());
-                    }
+            for (Long id : logFileIds) {
+                /**
+                 * Stop deletion if:
+                 * The log file which contains the checkpointLSN has been reached.
+                 * The oldest log file being accessed by a LogReader has been reached.
+                 */
+                if (id >= checkpointLSNLogFileID) {
+                    break;
+                }
+                //delete old log file
+                File file = new File(getLogFilePath(id));
+                file.delete();
+                if (LOGGER.isInfoEnabled()) {
+                    LOGGER.info("Deleted log file " + file.getAbsolutePath());
                 }
             }
         }
@@ -450,7 +445,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
     }
 
     private long deleteAllLogFiles() {
-        txnLogFileId2ReaderCount.clear();
         List<Long> logFileIds = getLogFileIds();
         if (!logFileIds.isEmpty()) {
             for (Long id : logFileIds) {
@@ -607,7 +601,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
         RandomAccessFile raf = new RandomAccessFile(new File(logFilePath), "r");
         FileChannel newFileChannel = raf.getChannel();
         TxnLogFile logFile = new TxnLogFile(this, newFileChannel, fileId, fileId * logFileSize);
-        touchLogFile(fileId);
         return logFile;
     }
 
@@ -617,32 +610,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
             LOGGER.warn(() -> "Closing log file with id(" + logFileRef.getLogFileId() + ") with a closed channel.");
         }
         fileChannel.close();
-        untouchLogFile(logFileRef.getLogFileId());
-    }
-
-    private void touchLogFile(long fileId) {
-        synchronized (txnLogFileId2ReaderCount) {
-            if (txnLogFileId2ReaderCount.containsKey(fileId)) {
-                txnLogFileId2ReaderCount.put(fileId, txnLogFileId2ReaderCount.get(fileId) + 1);
-            } else {
-                txnLogFileId2ReaderCount.put(fileId, 1);
-            }
-        }
-    }
-
-    private void untouchLogFile(long fileId) {
-        synchronized (txnLogFileId2ReaderCount) {
-            if (txnLogFileId2ReaderCount.containsKey(fileId)) {
-                int newReaderCount = txnLogFileId2ReaderCount.get(fileId) - 1;
-                if (newReaderCount < 0) {
-                    throw new IllegalStateException(
-                            "Invalid log file reader count (ID=" + fileId + ", count: " + newReaderCount + ")");
-                }
-                txnLogFileId2ReaderCount.put(fileId, newReaderCount);
-            } else {
-                throw new IllegalStateException("Trying to close log file id(" + fileId + ") which was not opened.");
-            }
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index a541bd9..6efd0e5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -22,10 +22,15 @@ import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.transactions.CheckpointProperties;
 import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * An implementation of {@link ICheckpointManager} that defines the logic
  * of checkpoints.
@@ -33,9 +38,12 @@ import org.apache.logging.log4j.Logger;
 public class CheckpointManager extends AbstractCheckpointManager {
 
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final long NO_SECURED_LSN = -1l;
+    private final Map<TxnId, Long> securedLSNs;
 
     public CheckpointManager(ITransactionSubsystem txnSubsystem, CheckpointProperties checkpointProperties) {
         super(txnSubsystem, checkpointProperties);
+        securedLSNs = new HashMap<>();
     }
 
     /**
@@ -62,6 +70,10 @@ public class CheckpointManager extends AbstractCheckpointManager {
     @Override
     public synchronized long tryCheckpoint(long checkpointTargetLSN) throws HyracksDataException {
         LOGGER.info("Attemping soft checkpoint...");
+        final long minSecuredLSN = getMinSecuredLSN();
+        if (minSecuredLSN != NO_SECURED_LSN && checkpointTargetLSN >= minSecuredLSN) {
+            return minSecuredLSN;
+        }
         final long minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
         boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
         if (!checkpointSucceeded) {
@@ -77,4 +89,18 @@ public class CheckpointManager extends AbstractCheckpointManager {
         }
         return minFirstLSN;
     }
-}
\ No newline at end of file
+
+    @Override
+    public synchronized void secure(TxnId id) throws HyracksDataException {
+        securedLSNs.put(id, txnSubsystem.getRecoveryManager().getMinFirstLSN());
+    }
+
+    @Override
+    public synchronized void completed(TxnId id) {
+        securedLSNs.remove(id);
+    }
+
+    private synchronized long getMinSecuredLSN() {
+        return securedLSNs.isEmpty() ? NO_SECURED_LSN : Collections.min(securedLSNs.values());
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e6587f62/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 76ecc63..c218dec 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -33,6 +33,7 @@ import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.TransactionUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.Level;
@@ -103,10 +104,11 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon
                 LogRecord logRecord = new LogRecord();
                 TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, false);
                 txnSubsystem.getLogManager().log(logRecord);
+                txnSubsystem.getCheckpointManager().secure(txnId);
                 txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
                 txnCtx.setTxnState(ITransactionManager.ABORTED);
             }
-        } catch (ACIDException e) {
+        } catch (HyracksDataException e) {
             String msg = "Could not complete rollback! System is in an inconsistent state";
             if (LOGGER.isErrorEnabled()) {
                 LOGGER.log(Level.ERROR, msg, e);
@@ -116,6 +118,7 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon
             txnCtx.complete();
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
             txnCtxRepository.remove(txnCtx.getTxnId());
+            txnSubsystem.getCheckpointManager().completed(txnId);
         }
     }
 


[4/5] asterixdb git commit: [NO ISSUE][CLUS] Asynchronous reregistration with CC

Posted by im...@apache.org.
[NO ISSUE][CLUS] Asynchronous reregistration with CC

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Perform CC reregistration asynchronously on a
  separate thread when connection is restored.
- Ensure a single registration is sent to CC when
  connection is restored.
- Restart JVM on unexpected CC registration request
  failures.
- Halt JVM on registration failures or timeout.

Change-Id: I404256e6f550c42a6eaf17c0ae4defb7ffb9cb2f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2505
Reviewed-by: Michael Blow <mb...@apache.org>
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/01d7fe9a
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/01d7fe9a
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/01d7fe9a

Branch: refs/heads/release-0.9.4-pre-rc
Commit: 01d7fe9aee2b21481abc6cd896bf873e5d8f8df4
Parents: 4d0de7d
Author: Murtadha Hubail <mh...@apache.org>
Authored: Wed Mar 21 05:13:09 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Tue Mar 20 21:34:05 2018 -0700

----------------------------------------------------------------------
 .../apache/hyracks/control/nc/CcConnection.java | 39 ++++++++++++++++++--
 .../control/nc/NodeControllerService.java       | 11 +++---
 2 files changed, 41 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/01d7fe9a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
index 1c6c98e..dce7d35 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/CcConnection.java
@@ -18,10 +18,15 @@
  */
 package org.apache.hyracks.control.nc;
 
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.control.common.base.IClusterController;
 import org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
+import org.apache.hyracks.util.ExitUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -31,6 +36,7 @@ public class CcConnection {
 
     private final IClusterController ccs;
     private boolean registrationPending;
+    private boolean registrationCompleted;
     private Exception registrationException;
     private NodeParameters nodeParameters;
 
@@ -57,12 +63,14 @@ public class CcConnection {
     public synchronized CcId registerNode(NodeRegistration nodeRegistration, int registrationId) throws Exception {
         registrationPending = true;
         ccs.registerNode(nodeRegistration, registrationId);
-        while (registrationPending) {
-            wait();
+        try {
+            InvokeUtil.runWithTimeout(this::wait, () -> !registrationPending, 2, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            registrationException = e;
         }
         if (registrationException != null) {
-            LOGGER.log(Level.WARN, "Registering with {} failed with exception", this, registrationException);
-            throw registrationException;
+            LOGGER.fatal("Registering with {} failed with exception", this, registrationException);
+            ExitUtil.halt(ExitUtil.EC_IMMEDIATE_HALT);
         }
         return getCcId();
     }
@@ -74,4 +82,27 @@ public class CcConnection {
     public NodeParameters getNodeParameters() {
         return nodeParameters;
     }
+
+    public synchronized void notifyConnectionRestored(NodeControllerService ncs, InetSocketAddress ccAddress)
+            throws InterruptedException {
+        if (registrationCompleted) {
+            registrationCompleted = false;
+            ncs.getExecutor().submit(() -> {
+                try {
+                    return ncs.registerNode(this, ccAddress);
+                } catch (Exception e) {
+                    LOGGER.log(Level.ERROR, "Failed registering with cc", e);
+                    throw new IllegalStateException(e);
+                }
+            });
+        }
+        while (!registrationCompleted) {
+            wait();
+        }
+    }
+
+    public synchronized void notifyRegistrationCompleted() {
+        registrationCompleted = true;
+        notifyAll();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/01d7fe9a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 0756210..98f5c70 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -352,10 +352,11 @@ public class NodeControllerService implements IControllerService {
                 @Override
                 public void ipcHandleRestored(IIPCHandle handle) throws IPCException {
                     // we need to re-register in case of NC -> CC connection reset
+                    final CcConnection ccConnection = getCcConnection(ccAddressMap.get(ccAddress));
                     try {
-                        registerNode(getCcConnection(ccAddressMap.get(ccAddress)), ccAddress);
-                    } catch (Exception e) {
-                        LOGGER.log(Level.WARN, "Failed Registering with cc", e);
+                        ccConnection.notifyConnectionRestored(NodeControllerService.this, ccAddress);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
                         throw new IPCException(e);
                     }
                 }
@@ -412,7 +413,7 @@ public class NodeControllerService implements IControllerService {
         }
     }
 
-    private CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
+    public CcId registerNode(CcConnection ccc, InetSocketAddress ccAddress) throws Exception {
         LOGGER.info("Registering with Cluster Controller {}", ccc);
 
         int registrationId = nextRegistrationId.incrementAndGet();
@@ -444,7 +445,7 @@ public class NodeControllerService implements IControllerService {
             ccTimer.schedule(new ProfileDumpTask(ccs, ccId), 0, nodeParameters.getProfileDumpPeriod());
             ccTimers.put(ccId, ccTimer);
         }
-
+        ccc.notifyRegistrationCompleted();
         LOGGER.info("Registering with Cluster Controller {} complete", ccc);
         return ccId;
     }