You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/09/24 14:32:17 UTC

[4/7] asterixdb git commit: [NO ISSUE][STO] Ensure Files From Failed Bulkload Are Deleted

[NO ISSUE][STO] Ensure Files From Failed Bulkload Are Deleted

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- If a bulkload is interrupted at the end-stage (i.e. after all
  tuples have been added but before the component is added to
  the index's disk components), then the bulkload operation will
  fail but the files from the failed operation are not deleted.
  This change ensures that if a bulkload fails in such stage,
  the files of the generated component are deleted to avoid
  file is already mapped exception if the bulkload is attempted
  again.
- Ensure disk component is only destroyed once in case of
  bulkload cleanup.
- Stop attempting to write a checkpoint early if the failure was
  due to the thread being interrupted.

Change-Id: I7f63054ac72a9482e779e49eb4da658a08fe7e9d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2970
Reviewed-by: Murtadha Hubail <mh...@apache.org>
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/3613cab7
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/3613cab7
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/3613cab7

Branch: refs/heads/master
Commit: 3613cab7691f0257b4a8befa41eedcc55ee5b680
Parents: 8078e65
Author: Murtadha Hubail <mh...@apache.org>
Authored: Tue Sep 18 13:19:02 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Tue Sep 18 08:45:19 2018 -0700

----------------------------------------------------------------------
 .../asterix/app/nc/IndexCheckpointManager.java  |  3 ++
 .../ChainedLSMDiskComponentBulkLoader.java      |  2 +-
 .../impls/LSMIndexDiskComponentBulkLoader.java  | 44 +++++++++++++-------
 3 files changed, 34 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3613cab7/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 3c62d99..7b08bad 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -196,6 +196,9 @@ public class IndexCheckpointManager implements IIndexCheckpointManager {
                 // ensure it was written correctly by reading it
                 read(checkpointPath);
                 return;
+            } catch (ClosedByInterruptException e) {
+                LOGGER.info("interrupted while writing checkpoint at {}", checkpointPath);
+                throw HyracksDataException.create(e);
             } catch (IOException e) {
                 if (i == MAX_CHECKPOINT_WRITE_ATTEMPTS) {
                     throw HyracksDataException.create(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3613cab7/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
index ab59b59..3fa45c9 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ChainedLSMDiskComponentBulkLoader.java
@@ -101,8 +101,8 @@ public class ChainedLSMDiskComponentBulkLoader implements ILSMDiskComponentBulkL
             for (int i = 0; i < bulkloadersCount; i++) {
                 bulkloaderChain.get(i).cleanupArtifacts();;
             }
+            diskComponent.deactivateAndDestroy();
         }
-        diskComponent.deactivateAndDestroy();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3613cab7/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
index 3a43ba7..c739ad0 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexDiskComponentBulkLoader.java
@@ -22,6 +22,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponentBulkLoader;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.common.IIndexBulkLoader;
@@ -31,6 +32,7 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
     private final AbstractLSMIndex lsmIndex;
     private final ILSMDiskComponentBulkLoader componentBulkLoader;
     private final ILSMIndexOperationContext opCtx;
+    private boolean failed = false;
 
     public LSMIndexDiskComponentBulkLoader(AbstractLSMIndex lsmIndex, ILSMIndexOperationContext opCtx, float fillFactor,
             boolean verifyInput, long numElementsHint) throws HyracksDataException {
@@ -68,20 +70,10 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
     @Override
     public void end() throws HyracksDataException {
         try {
-            try {
-                lsmIndex.getIOOperationCallback().afterOperation(opCtx.getIoOperation());
-                componentBulkLoader.end();
-            } catch (Throwable th) { // NOSONAR Must not call afterFinalize without setting failure
-                opCtx.getIoOperation().setStatus(LSMIOOperationStatus.FAILURE);
-                opCtx.getIoOperation().setFailure(th);
-                throw th;
-            } finally {
-                lsmIndex.getIOOperationCallback().afterFinalize(opCtx.getIoOperation());
-            }
-            if (opCtx.getIoOperation().getStatus() == LSMIOOperationStatus.SUCCESS
-                    && opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) {
-                lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation());
-            }
+            presistComponentToDisk();
+        } catch (Throwable th) { // NOSONAR must cleanup in case of any failure
+            fail(th);
+            throw th;
         } finally {
             lsmIndex.getIOOperationCallback().completed(opCtx.getIoOperation());
         }
@@ -116,4 +108,28 @@ public class LSMIndexDiskComponentBulkLoader implements IIndexBulkLoader {
         return opCtx.getIoOperation().getFailure();
     }
 
+    private void presistComponentToDisk() throws HyracksDataException {
+        try {
+            lsmIndex.getIOOperationCallback().afterOperation(opCtx.getIoOperation());
+            componentBulkLoader.end();
+        } catch (Throwable th) { // NOSONAR Must not call afterFinalize without setting failure
+            fail(th);
+            throw th;
+        } finally {
+            lsmIndex.getIOOperationCallback().afterFinalize(opCtx.getIoOperation());
+        }
+        if (opCtx.getIoOperation().getStatus() == LSMIOOperationStatus.SUCCESS
+                && opCtx.getIoOperation().getNewComponent().getComponentSize() > 0) {
+            lsmIndex.getHarness().addBulkLoadedComponent(opCtx.getIoOperation());
+        }
+    }
+
+    private void fail(Throwable th) {
+        if (!failed) {
+            failed = true;
+            final ILSMIOOperation loadOp = opCtx.getIoOperation();
+            loadOp.setFailure(th);
+            lsmIndex.cleanUpFilesForFailedOperation(loadOp);
+        }
+    }
 }
\ No newline at end of file