You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ti...@apache.org on 2017/01/26 09:42:17 UTC

asterixdb git commit: Minor cleanups for JobManager and ICCApplicationContext.

Repository: asterixdb
Updated Branches:
  refs/heads/master e0c232d27 -> 9d30640f2


Minor cleanups for JobManager and ICCApplicationContext.

Change-Id: Iba7b6fd9b75a141fcc9f589db4b8cdeac570ec2d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1471
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
BAD: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/9d30640f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/9d30640f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/9d30640f

Branch: refs/heads/master
Commit: 9d30640f26ed832f98b08c5153ec2bdc0cbe5f91
Parents: e0c232d
Author: Yingyi Bu <yi...@couchbase.com>
Authored: Wed Jan 25 17:02:58 2017 -0800
Committer: Till Westmann <ti...@apache.org>
Committed: Thu Jan 26 00:55:47 2017 -0800

----------------------------------------------------------------------
 .../api/application/ICCApplicationContext.java  |  1 -
 .../control/cc/ClusterControllerService.java    | 21 +++++++++++++++-----
 .../hyracks/control/cc/job/IJobManager.java     |  7 ++++---
 .../hyracks/control/cc/job/JobManager.java      | 11 +++++++---
 .../cc/work/WaitForJobCompletionWork.java       |  2 +-
 5 files changed, 29 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d30640f/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
index 5f4877d..55cc5fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 
 import org.apache.hyracks.api.context.ICCContext;
 import org.apache.hyracks.api.job.IJobLifecycleListener;
-import org.apache.hyracks.api.job.resource.IJobCapacityController;
 
 /**
  * Application Context at the Cluster Controller for an application.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d30640f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 1a363c7..ae0f361 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -21,6 +21,7 @@ package org.apache.hyracks.control.cc;
 import java.io.File;
 import java.io.FileReader;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -54,11 +55,12 @@ import org.apache.hyracks.control.cc.cluster.NodeManager;
 import org.apache.hyracks.control.cc.dataset.DatasetDirectoryService;
 import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
 import org.apache.hyracks.control.cc.job.IJobManager;
+import org.apache.hyracks.control.cc.job.JobManager;
 import org.apache.hyracks.control.cc.scheduler.IResourceManager;
 import org.apache.hyracks.control.cc.scheduler.ResourceManager;
 import org.apache.hyracks.control.cc.web.WebServer;
-import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
 import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
+import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
 import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun;
 import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
 import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
@@ -203,10 +205,19 @@ public class ClusterControllerService implements IControllerService {
         }
 
         // Job manager is in charge of job lifecycle management.
-        Constructor<?> jobManagerConstructor = this.getClass().getClassLoader().loadClass(ccConfig.jobManagerClassName)
-                .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
-        jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
-
+        try {
+            Constructor<?> jobManagerConstructor = this.getClass().getClassLoader()
+                    .loadClass(ccConfig.jobManagerClassName)
+                    .getConstructor(CCConfig.class, ClusterControllerService.class, IJobCapacityController.class);
+            jobManager = (IJobManager) jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
+                | InvocationTargetException e) {
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.log(Level.WARNING, "class " + ccConfig.jobManagerClassName + " could not be used: ", e);
+            }
+            // Falls back to the default implementation if the user-provided class name is not valid.
+            jobManager = new JobManager(ccConfig, this, jobCapacityController);
+        }
     }
 
     private void connectNCs() throws Exception {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d30640f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
index 8f621df..180e850 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/IJobManager.java
@@ -86,13 +86,14 @@ public interface IJobManager {
     JobRun get(JobId jobId);
 
     /**
-     * Retrieves a historical job from a given job id.
+     * Retrieves the exception records for a historical job.
      *
      * @param jobId,
      *            the job id.
-     * @return the matched historical jobs that have been run but not yet discarded.
+     * @return the exception records of a job that has been removed from the archive but still stays in the stored
+     *         history, where each historical job is paired with exceptions during its execution.
      */
-    List<Exception> getRunHistory(JobId jobId);
+    List<Exception> getExceptionHistory(JobId jobId);
 
     /**
      * @return all jobs that are currently running.

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d30640f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 52ad301..e3f9557 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -44,6 +44,7 @@ import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.application.CCApplicationContext;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
+import org.apache.hyracks.control.cc.scheduler.FIFOJobQueue;
 import org.apache.hyracks.control.cc.scheduler.IJobQueue;
 import org.apache.hyracks.control.cc.work.JobCleanupWork;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -58,11 +59,11 @@ public class JobManager implements IJobManager {
     private static final Logger LOGGER = Logger.getLogger(JobManager.class.getName());
 
     private final ClusterControllerService ccs;
-    private final IJobQueue jobQueue;
     private final Map<JobId, JobRun> activeRunMap;
     private final Map<JobId, JobRun> runMapArchive;
     private final Map<JobId, List<Exception>> runMapHistory;
     private final IJobCapacityController jobCapacityController;
+    private IJobQueue jobQueue;
 
     public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController)
             throws HyracksException {
@@ -74,7 +75,11 @@ public class JobManager implements IJobManager {
             jobQueue = (IJobQueue) jobQueueConstructor.newInstance(this, this.jobCapacityController);
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException
                 | InvocationTargetException e) {
-            throw HyracksException.create(ErrorCode.CLASS_LOADING_ISSUE, e, e.getMessage());
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.log(Level.WARNING, "class " + ccConfig.jobQueueClassName + " could not be used: ", e);
+            }
+            // Falls back to the default implementation if the user-provided class name is not valid.
+            jobQueue = new FIFOJobQueue(this, jobCapacityController);
         }
         activeRunMap = new HashMap<>();
         runMapArchive = new LinkedHashMap<JobId, JobRun>() {
@@ -248,7 +253,7 @@ public class JobManager implements IJobManager {
     }
 
     @Override
-    public List<Exception> getRunHistory(JobId jobId) {
+    public List<Exception> getExceptionHistory(JobId jobId) {
         return runMapHistory.get(jobId);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9d30640f/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index f7ef175..713cf96 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -55,7 +55,7 @@ public class WaitForJobCompletionWork extends SynchronizableWork {
                 }
             });
         } else {
-            final List<Exception> exceptions = jobManager.getRunHistory(jobId);
+            final List<Exception> exceptions = jobManager.getExceptionHistory(jobId);
             ccs.getExecutor().execute(new Runnable() {
                 @Override
                 public void run() {