You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/02/13 19:05:32 UTC

asterixdb git commit: [ASTERIXDB-2271][RT] Remove Result Ref of Aborted Jobs

Repository: asterixdb
Updated Branches:
  refs/heads/master e19da1fc3 -> e8e78e24a


[ASTERIXDB-2271][RT] Remove Result Ref of Aborted Jobs

- user model changes: no
- storage format changes: no
- interface changes: yes
  - IDatasetPartitionManager (-) abortAllReaders

Details:
- Currently, there is a possibility of reusing the same
  result reference for two different jobs. This change
  fixes this issue by removing old reference of aborted
  jobs.
- Abort job tasks before aborting result readers to stop
  result generation.

Change-Id: I8170887e007d63b143ef08a3a8e149ab3866fcb1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2386
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/e8e78e24
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/e8e78e24
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/e8e78e24

Branch: refs/heads/master
Commit: e8e78e24a0c151947c67888b83d603f8080a490f
Parents: e19da1f
Author: Murtadha Hubail <mh...@apache.org>
Authored: Mon Feb 12 06:59:29 2018 +0300
Committer: Murtadha Hubail <mh...@apache.org>
Committed: Tue Feb 13 11:04:34 2018 -0800

----------------------------------------------------------------------
 .../api/dataset/IDatasetPartitionManager.java   |  2 --
 .../nc/dataset/DatasetPartitionManager.java     | 29 ++++++--------------
 .../control/nc/work/AbortAllJobsWork.java       | 22 +++++++--------
 3 files changed, 20 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e8e78e24/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
index e6cf6d3..b1e203f 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -39,8 +39,6 @@ public interface IDatasetPartitionManager extends IDatasetManager {
 
     void abortReader(JobId jobId);
 
-    void abortAllReaders();
-
     void close();
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e8e78e24/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
index fb7308e..b7cf9a4 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -18,7 +18,7 @@
  */
 package org.apache.hyracks.control.nc.dataset;
 
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -65,7 +65,7 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I
         } else {
             datasetMemoryManager = null;
         }
-        partitionResultStateMap = new LinkedHashMap<>();
+        partitionResultStateMap = new HashMap<>();
         executor.execute(new ResultStateSweeper(this, resultSweepThreshold, LOGGER));
     }
 
@@ -77,14 +77,11 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I
         synchronized (this) {
             dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, asyncMode, orderedResult, partition, nPartitions,
                     datasetMemoryManager, fileFactory, maxReads);
-
             ResultSetMap rsIdMap = partitionResultStateMap.computeIfAbsent(jobId, k -> new ResultSetMap());
-
             ResultState[] resultStates = rsIdMap.createOrGetResultStates(rsId, nPartitions);
             resultStates[partition] = dpw.getResultState();
         }
-
-        LOGGER.debug("Initialized partition writer: JobId: " + jobId + ":partition: " + partition);
+        LOGGER.debug("Initialized partition writer: JobId: {}:partition: {}", jobId, partition);
         return dpw;
     }
 
@@ -103,8 +100,8 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I
     @Override
     public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
         try {
-            LOGGER.debug("Reporting partition write completion: JobId: " + jobId + ": ResultSetId: " + rsId
-                    + ":partition: " + partition);
+            LOGGER.debug("Reporting partition write completion: JobId: {}:ResultSetId: {}:partition: {}", jobId, rsId,
+                    partition);
             ncs.getClusterController(jobId.getCcId()).reportResultPartitionWriteCompletion(jobId, rsId, partition);
         } catch (Exception e) {
             throw HyracksException.create(e);
@@ -117,11 +114,11 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I
         ResultState resultState = getResultState(jobId, resultSetId, partition);
         DatasetPartitionReader dpr = new DatasetPartitionReader(this, datasetMemoryManager, executor, resultState);
         dpr.writeTo(writer);
-        LOGGER.debug("Initialized partition reader: JobId: " + jobId + ":ResultSetId: " + resultSetId + ":partition: "
-                + partition);
+        LOGGER.debug("Initialized partition reader: JobId: {}:ResultSetId: {}:partition: {}", jobId, resultSetId,
+                partition);
     }
 
-    protected synchronized ResultState getResultState(JobId jobId, ResultSetId resultSetId, int partition)
+    private synchronized ResultState getResultState(JobId jobId, ResultSetId resultSetId, int partition)
             throws HyracksException {
         ResultSetMap rsIdMap = partitionResultStateMap.get(jobId);
         if (rsIdMap == null) {
@@ -155,13 +152,6 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I
     }
 
     @Override
-    public synchronized void abortAllReaders() {
-        for (ResultSetMap rsIdMap : partitionResultStateMap.values()) {
-            rsIdMap.abortAll();
-        }
-    }
-
-    @Override
     public synchronized void close() {
         for (JobId jobId : getJobIds()) {
             deinit(jobId);
@@ -175,7 +165,7 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I
     }
 
     @Override
-    public ResultSetMap getState(JobId jobId) {
+    public synchronized ResultSetMap getState(JobId jobId) {
         return partitionResultStateMap.get(jobId);
     }
 
@@ -191,5 +181,4 @@ public class DatasetPartitionManager extends AbstractDatasetManager implements I
             rsIdMap.closeAndDeleteAll();
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/e8e78e24/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
index 68d677f..2bcf414 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortAllJobsWork.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
 import org.apache.hyracks.control.nc.Joblet;
@@ -50,19 +51,18 @@ public class AbortAllJobsWork extends SynchronizableWork {
             LOGGER.log(Level.WARN, "DatasetPartitionManager is null on " + ncs.getId());
         }
         Collection<Joblet> joblets = ncs.getJobletMap().values();
-        for (Joblet ji : joblets) {
-            // TODO(mblow): should we have one jobletmap per cc?
-            if (!ji.getJobId().getCcId().equals(ccId)) {
-                continue;
-            }
-            if (dpm != null) {
-                dpm.abortReader(ji.getJobId());
-            }
-            Collection<Task> tasks = ji.getTaskMap().values();
+        // TODO(mblow): should we have one jobletmap per cc?
+        joblets.stream().filter(joblet -> joblet.getJobId().getCcId().equals(ccId)).forEach(joblet -> {
+            Collection<Task> tasks = joblet.getTaskMap().values();
             for (Task task : tasks) {
                 task.abort();
             }
-            ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, ji.getJobId(), JobStatus.FAILURE));
-        }
+            final JobId jobId = joblet.getJobId();
+            if (dpm != null) {
+                dpm.abortReader(jobId);
+                dpm.sweep(jobId);
+            }
+            ncs.getWorkQueue().schedule(new CleanupJobletWork(ncs, jobId, JobStatus.FAILURE));
+        });
     }
 }