You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2014/11/25 15:02:05 UTC

svn commit: r1641620 [2/2] - in /uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator: ./ factory/

Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/factory/JobFactoryV2.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/factory/JobFactoryV2.java?rev=1641620&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/factory/JobFactoryV2.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/factory/JobFactoryV2.java Tue Nov 25 14:02:05 2014
@@ -0,0 +1,704 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator.factory;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.uima.ducc.common.IDuccEnv;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.container.FlagsHelper;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.DuccProperties;
+import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
+import org.apache.uima.ducc.common.utils.QuotedOptions;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.Utils;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
+import org.apache.uima.ducc.common.utils.id.IDuccIdFactory;
+import org.apache.uima.ducc.orchestrator.CGroupManager;
+import org.apache.uima.ducc.orchestrator.JobDriverHostManager;
+import org.apache.uima.ducc.orchestrator.OrchestratorCommonArea;
+import org.apache.uima.ducc.transport.cmdline.ACommandLine;
+import org.apache.uima.ducc.transport.cmdline.JavaCommandLine;
+import org.apache.uima.ducc.transport.cmdline.NonJavaCommandLine;
+import org.apache.uima.ducc.transport.event.cli.JobRequestProperties;
+import org.apache.uima.ducc.transport.event.cli.JobSpecificationProperties;
+import org.apache.uima.ducc.transport.event.cli.ReservationSpecificationProperties;
+import org.apache.uima.ducc.transport.event.cli.ServiceRequestProperties;
+import org.apache.uima.ducc.transport.event.common.DuccProcess;
+import org.apache.uima.ducc.transport.event.common.DuccSchedulingInfo;
+import org.apache.uima.ducc.transport.event.common.DuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.DuccUimaAggregate;
+import org.apache.uima.ducc.transport.event.common.DuccUimaAggregateComponent;
+import org.apache.uima.ducc.transport.event.common.DuccUimaDeploymentDescriptor;
+import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver;
+import org.apache.uima.ducc.transport.event.common.IDuccCommand;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
+import org.apache.uima.ducc.transport.event.common.IDuccUimaAggregate;
+import org.apache.uima.ducc.transport.event.common.IDuccUimaAggregateComponent;
+import org.apache.uima.ducc.transport.event.common.IDuccUimaDeploymentDescriptor;
+import org.apache.uima.ducc.transport.event.common.IDuccUnits.MemoryUnits;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkService.ServiceDeploymentType;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
+
+public class JobFactoryV2 implements IJobFactory {
+	private static JobFactoryV2 jobFactory = new JobFactoryV2();
+	private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(JobFactoryV2.class.getName());
+	
+	public static IJobFactory getInstance() {
+		return jobFactory;
+	}
+	
+	private long driver_max_size_in_bytes = 0;
+
+	private JobFactoryV2() {
+		String ducc_jd_share_quantum = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_jd_share_quantum);
+		long oneKB = 1024;
+		long oneMB = 1024*oneKB;
+		driver_max_size_in_bytes = Long.parseLong(ducc_jd_share_quantum) * oneMB;
+	}
+	
+	private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+	private IDuccIdFactory duccIdFactory = orchestratorCommonArea.getDuccIdFactory();
+	private JobDriverHostManager hostManager = orchestratorCommonArea.getHostManager();
+	private DuccIdFactory jdIdFactory = new DuccIdFactory();
+	
+	private String classpath_order = System.getProperty("ducc.orchestrator.job.factory.classpath.order");
+	
+	private int addEnvironment(DuccWorkJob job, String type, ACommandLine aCommandLine, String environmentVariables) {
+		String methodName = "addEnvironment";
+		logger.trace(methodName, job.getDuccId(), "enter");
+		int retVal = 0;
+		if(environmentVariables != null) {
+			logger.debug(methodName, job.getDuccId(), environmentVariables);
+			// Tokenize the list of assignments, dequote, and convert to a map of environment settings
+			ArrayList<String> envVarList = QuotedOptions.tokenizeList(environmentVariables, true);
+			Map<String, String> envMap;
+			try {
+			    envMap = QuotedOptions.parseAssignments(envVarList, false);
+			} catch (IllegalArgumentException e) {
+                logger.warn(methodName, job.getDuccId(),"Invalid environment syntax in: " + environmentVariables);
+                return 0;  // Should not happen as CLI should have checked and rejected the request
+			}
+			aCommandLine.addEnvironment(envMap);
+			retVal = envMap.size();
+		}
+		logger.trace(methodName, job.getDuccId(), "exit");
+		return retVal;
+	}
+	
+	private void checkSpec(DuccWorkJob job, JobRequestProperties jobRequestProperties) {
+		String methodName = "checkSpec";
+		logger.trace(methodName, job.getDuccId(), "enter");
+		jobRequestProperties.normalize();
+		Enumeration<Object> keys = jobRequestProperties.keys();
+		while(keys.hasMoreElements()) {
+			String key = (String) keys.nextElement();
+			if(!jobRequestProperties.isRecognized(key)) {
+				logger.warn(methodName, job.getDuccId(), "unrecognized: "+key);
+			}
+		}
+		logger.trace(methodName, job.getDuccId(), "exit");
+		return;
+	}
+	
+	private ArrayList<String> toArrayList(String overrides) {
+		String methodName = "toArrayList";
+		logger.trace(methodName, null, "enter");
+		// To match other lists tokenize on blanks & strip any quotes around values.
+		ArrayList<String> list = QuotedOptions.tokenizeList(overrides, true);
+		logger.trace(methodName, null, "exit");
+		return list;
+	}
+
+	private void dump(DuccWorkJob job, IDuccUimaAggregate uimaAggregate) {
+		String methodName = "dump";
+		logger.info(methodName, job.getDuccId(), "brokerURL     "+uimaAggregate.getBrokerURL());
+		logger.info(methodName, job.getDuccId(), "endpoint      "+uimaAggregate.getEndpoint());
+		logger.info(methodName, job.getDuccId(), "description   "+uimaAggregate.getDescription());
+		logger.info(methodName, job.getDuccId(), "name          "+uimaAggregate.getName());
+		logger.info(methodName, job.getDuccId(), "thread-count  "+uimaAggregate.getThreadCount());
+		List<IDuccUimaAggregateComponent> components = uimaAggregate.getComponents();
+		for(IDuccUimaAggregateComponent component : components) {
+			logger.info(methodName, job.getDuccId(), "descriptor    "+component.getDescriptor());
+			List<String> overrides = component.getOverrides();
+			for(String override : overrides) {
+				logger.info(methodName, job.getDuccId(), "override      "+override);
+			}
+		}
+	}
+	
+	private void dump(DuccWorkJob job, IDuccUimaDeploymentDescriptor uimaDeploymentDescriptor) {
+		String methodName = "dump";
+		logger.info(methodName, job.getDuccId(), "uimaDeploymentDescriptor      "+uimaDeploymentDescriptor);
+	}
+	
+	private void logSweeper(String logDir, DuccId jobId) {
+		String methodName = "logSweeper";
+		if(logDir != null) {
+			if(jobId != null) {
+				if(!logDir.endsWith(File.separator)) {
+					logDir += File.separator;
+				}
+				logDir += jobId;
+				try {
+					File file = new File(logDir);
+					if(file.exists()) {
+						File dest = new File(logDir+"."+"sweep"+"."+java.util.Calendar.getInstance().getTime().toString());
+						file.renameTo(dest);
+						logger.warn(methodName, jobId, "renamed "+logDir);
+					}
+				}
+				catch(Throwable t) {
+					logger.warn(methodName, jobId, "unable to rename "+logDir, t);
+				}
+			}
+			else {
+				logger.warn(methodName, jobId, "jobId is null");
+			}
+		}
+		else {
+			logger.warn(methodName, jobId, "logDir is null");
+		}
+	}
+	
+	private boolean isClasspathOrderUserBeforeDucc(String user_specified_classpath_order, DuccId jobId) {
+		String methodName = "isClasspathOrderUserBeforeDucc";
+		boolean retVal = false;
+		if(user_specified_classpath_order != null) {
+			if(user_specified_classpath_order.trim().equals("user-before-ducc")) {
+				logger.warn(methodName, jobId, "user specified classpath order: "+user_specified_classpath_order);
+				retVal = true;
+			}
+		}
+		else if(classpath_order != null) {
+			if(classpath_order.trim().equals("user-before-ducc")) {
+				retVal = true;
+			}
+		}
+		return retVal;
+	}
+	
+	private boolean isJpUima(DuccType duccType, ServiceDeploymentType serviceDeploymentType) {
+		boolean retVal = true;
+		switch(duccType) {
+		case Job:
+			break;
+		case Service:
+			switch(serviceDeploymentType) {
+			case uima:
+				break;
+			case custom:	
+			case other:	
+			default:
+				retVal = false;
+				break;
+			}
+			break;
+		case Reservation:
+		case Pop:
+		case Undefined:
+		default:
+			//huh?
+			retVal = false;
+			break;
+		}
+		return retVal;
+	}
+	
+	private void setDebugPorts(CommonConfiguration common, JobRequestProperties jobRequestProperties,  DuccWorkJob job) {
+		String location = "setDebugPorts";
+		DuccId jobid = job.getDuccId();
+		String portDriver = jobRequestProperties.getProperty(JobSpecificationProperties.key_driver_debug);
+		if(portDriver != null) {
+			try {
+				long port = Long.parseLong(portDriver);
+				job.setDebugPortDriver(port);
+				logger.debug(location, jobid, "Driver debug port: "+job.getDebugPortDriver());
+			}
+			catch(Exception e) {
+				logger.error(location, jobid, "Invalid driver debug port: "+portDriver);
+			}
+		}
+		String portProcess = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_debug);
+		if(portProcess != null) {
+			try {
+				long port = Long.parseLong(portProcess);
+				job.setDebugPortProcess(port);
+				logger.debug(location, jobid, "Process debug port: "+job.getDebugPortProcess());
+			}
+			catch(Exception e) {
+				logger.error(location, jobid, "Invalid process debug port: "+portProcess);
+			}
+		}
+	}
+	
+	private String buildJobDriverClasspath(JobRequestProperties jobRequestProperties, DuccId jobid) {
+		String methodName = "buildJobDriverClasspath";
+		String cp = null;
+		cp = getDuccClasspath(0);
+		logger.debug(methodName, jobid, "java CP:"+cp);	
+		return cp;
+	}
+	
+	private JavaCommandLine buildJobDriverCommandLine(JobRequestProperties jobRequestProperties, DuccId jobid) {
+		JavaCommandLine jcl = null;
+		// java command
+		String javaCmd = jobRequestProperties.getProperty(JobSpecificationProperties.key_jvm);
+		jcl = new JavaCommandLine(javaCmd);
+		jcl.setClassName(IDuccCommand.main);
+		jcl.addOption(IDuccCommand.arg_ducc_deploy_configruation);
+		jcl.addOption(IDuccCommand.arg_ducc_deploy_components);
+		jcl.addOption(IDuccCommand.arg_ducc_job_id+jobid.toString());
+		String cp = buildJobDriverClasspath(jobRequestProperties, jobid);
+		jcl.setClasspath(cp);
+		// Add the user-provided JVM args
+		boolean haveXmx = false;
+		String driver_jvm_args = jobRequestProperties.getProperty(JobSpecificationProperties.key_driver_jvm_args);
+		ArrayList<String> dTokens = QuotedOptions.tokenizeList(driver_jvm_args, true);
+		for(String token : dTokens) {
+			jcl.addOption(token);
+			if (!haveXmx) {
+			    haveXmx = token.startsWith("-Xmx");
+			}
+		}
+		// Add any site-provided JVM args, but not -Xmx if the user has provided one
+		String siteJvmArgs = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_driver_jvm_args);
+		dTokens = QuotedOptions.tokenizeList(siteJvmArgs, true);    // a null arg is acceptable
+		for (String token : dTokens) {
+		    if (!haveXmx || !token.startsWith("-Xmx")) {
+		       jcl.addOption(token);
+		    }
+		}
+		// Add job JVM args
+			// add JobId
+			String opt;
+			opt = FlagsHelper.Name.JobId.dname()+"="+jobid.getFriendly();
+			jcl.addOption(opt);
+			// add CrXML
+			String crxml = jobRequestProperties.getProperty(JobSpecificationProperties.key_driver_descriptor_CR);
+			if(crxml != null) {
+				opt = FlagsHelper.Name.CollectionReaderXml.dname()+"="+crxml;
+				jcl.addOption(opt);
+			}
+			// add CrCfg
+			String crcfg = jobRequestProperties.getProperty(JobSpecificationProperties.key_driver_descriptor_CR_overrides);
+			if(crcfg != null) {
+				opt = FlagsHelper.Name.CollectionReaderCfg.dname()+"="+crcfg;
+				jcl.addOption(opt);
+			}
+			// add userCP
+			String userCP = jobRequestProperties.getProperty(JobSpecificationProperties.key_classpath);
+			if(userCP == null) {
+				userCP = "";
+			}
+			String augment = IDuccEnv.DUCC_HOME+File.separator+"lib"+File.separator+"uima-ducc"+File.separator+"*";
+			userCP = augment+File.pathSeparator+userCP;
+			opt = FlagsHelper.Name.UserClasspath.dname()+"="+userCP;
+			jcl.addOption(opt);
+		// Name the log config file explicitly - the default of searching the user-provided classpath is dangerous
+		jcl.addOption("-Dlog4j.configuration=file://" + Utils.findDuccHome() + "/resources/log4j.xml");
+		// Log directory
+		jcl.setLogDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
+		return jcl;
+	}
+	
+	private void createDriver(CommonConfiguration common, JobRequestProperties jobRequestProperties,  DuccWorkJob job) {
+		String methodName = "createDriver";
+		DuccPropertiesResolver duccPropertiesResolver = DuccPropertiesResolver.getInstance();
+		// broker & queue
+		job.setJobBroker(common.brokerUrl);
+		job.setJobQueue(common.jdQueuePrefix+job.getDuccId());
+		// CR
+		String crxml = jobRequestProperties.getProperty(JobSpecificationProperties.key_driver_descriptor_CR);
+		String crcfg = jobRequestProperties.getProperty(JobSpecificationProperties.key_driver_descriptor_CR_overrides);
+		// getMeta
+		String meta_time = duccPropertiesResolver.getFileProperty(DuccPropertiesResolver.default_process_get_meta_time_max);
+		// lost
+		String lost_time = duccPropertiesResolver.getFileProperty(DuccPropertiesResolver.ducc_jd_queue_timeout_minutes);
+		// process_per_item_time_max
+		String wi_time = jobRequestProperties.getProperty(JobRequestProperties.key_process_per_item_time_max);
+		if(wi_time == null) {
+			wi_time = duccPropertiesResolver.getFileProperty(DuccPropertiesResolver.default_process_per_item_time_max);
+		}
+		// Exception handler
+		String processExceptionHandler = jobRequestProperties.getProperty(JobRequestProperties.key_driver_exception_handler);
+		// ProcessStatusMaxWaitMillis
+		long processStatusMaxWaitMillis = -1;
+		try {
+			String ducc_orchestrator_abbreviated_state_publish_rate = duccPropertiesResolver.getFileProperty(DuccPropertiesResolver.ducc_orchestrator_abbreviated_state_publish_rate);
+			String ducc_rm_state_publish_rate = duccPropertiesResolver.getFileProperty(DuccPropertiesResolver.ducc_rm_state_publish_rate);
+			String ducc_agent_node_metrics_publish_rate = duccPropertiesResolver.getFileProperty(DuccPropertiesResolver.ducc_agent_node_metrics_publish_rate);
+			String ducc_rm_node_stability = duccPropertiesResolver.getFileProperty(DuccPropertiesResolver.ducc_rm_node_stability);
+			long orMillisRate = Long.parseLong(ducc_orchestrator_abbreviated_state_publish_rate);
+			long rmMillisRate = Long.parseLong(ducc_rm_state_publish_rate);
+			long metricsMillisRate = Long.parseLong(ducc_agent_node_metrics_publish_rate);
+			long stabilityCount = Long.parseLong(ducc_rm_node_stability);
+			long fudgeMillis = 10 * 1000;
+			processStatusMaxWaitMillis = metricsMillisRate*stabilityCount+(orMillisRate+rmMillisRate+fudgeMillis);
+			logger.debug(methodName, job.getDuccId(), "driver:processStatusMaxWaitMillis="+processStatusMaxWaitMillis);
+		}
+		catch(Exception e) {
+			logger.error(methodName, job.getDuccId(), e);
+		}
+		// Command line
+		DuccWorkPopDriver driver = new DuccWorkPopDriver(job.getjobBroker(), job.getjobQueue(), crxml, crcfg, meta_time, lost_time, wi_time, processExceptionHandler, processStatusMaxWaitMillis);
+		JavaCommandLine driverCommandLine = buildJobDriverCommandLine(jobRequestProperties, job.getDuccId());
+		// Environment
+		String driverEnvironmentVariables = jobRequestProperties.getProperty(JobSpecificationProperties.key_environment);
+		int envCountDriver = addEnvironment(job, "driver", driverCommandLine, driverEnvironmentVariables);
+		logger.info(methodName, job.getDuccId(), "driver env vars: "+envCountDriver);
+		logger.debug(methodName, job.getDuccId(), "driver: "+driverCommandLine.getCommand());
+		driver.setCommandLine(driverCommandLine);
+		//
+		NodeIdentity nodeIdentity = hostManager.getNode();
+		DuccId duccId = jdIdFactory.next();
+		duccId.setFriendly(0);
+		DuccProcess driverProcess = new DuccProcess(duccId,nodeIdentity,ProcessType.Pop);
+		CGroupManager.assign(job.getDuccId(), driverProcess, driver_max_size_in_bytes);
+		driverProcess.setResourceState(ResourceState.Allocated);
+		driverProcess.setNodeIdentity(nodeIdentity);
+		driver.getProcessMap().put(driverProcess.getDuccId(), driverProcess);
+		//
+		orchestratorCommonArea.getProcessAccounting().addProcess(duccId, job.getDuccId());
+		//
+		job.setDriver(driver);
+	}
+	
+	private void checkSchedulingLimits(DuccWorkJob job, DuccSchedulingInfo schedulingInfo) {
+	    String methodName = "checkSpec";
+        long limit = 0;
+        String p_limit = DuccPropertiesResolver.get(DuccPropertiesResolver.ducc_threads_limit);
+        if (p_limit != null) {
+            if (!p_limit.equals("unlimited")) {
+                limit = Long.parseLong(p_limit);
+            }
+        }
+        if (limit <= 0) {
+            return;
+        }
+        int threads_per_share = schedulingInfo.getIntThreadsPerShare();
+        long sharesLimit = limit / threads_per_share;
+        long maxShares = schedulingInfo.getLongSharesMax();
+        if (maxShares == 0 || maxShares > sharesLimit) {
+            logger.info(methodName, job.getDuccId(), "change max-shares from "+maxShares+" to " + sharesLimit);
+            schedulingInfo.setSharesMax(String.valueOf(sharesLimit));
+        }
+	}
+	
+	public DuccWorkJob create(CommonConfiguration common, JobRequestProperties jobRequestProperties) {
+		String methodName = "create";
+		DuccWorkJob job = new DuccWorkJob();
+		checkSpec(job, jobRequestProperties);
+		// id, type
+		String ddCr = jobRequestProperties.getProperty(JobSpecificationProperties.key_driver_descriptor_CR);
+		if(ddCr == null) {
+			job.setDuccType(DuccType.Service);
+			job.setDuccId(duccIdFactory.next());
+		}
+		else {
+			job.setDuccType(DuccType.Job);
+			job.setDuccId(duccIdFactory.next());
+			
+		}
+		// driver
+		DuccType duccType = job.getDuccType();
+		switch(duccType) {
+			case Job:
+				createDriver(common, jobRequestProperties, job);
+				setDebugPorts(common, jobRequestProperties, job);
+				break;
+			case Service:
+				break;
+		}
+        // Service Deployment Type
+        if(jobRequestProperties.containsKey(ServiceRequestProperties.key_service_type_custom)) {
+			job.setServiceDeploymentType(ServiceDeploymentType.custom);
+		}
+        else if(jobRequestProperties.containsKey(ServiceRequestProperties.key_service_type_other)) {
+			job.setServiceDeploymentType(ServiceDeploymentType.other);
+		}
+        else if(jobRequestProperties.containsKey(ServiceRequestProperties.key_service_type_uima)) {
+			job.setServiceDeploymentType(ServiceDeploymentType.uima);
+		}
+        else {
+        	job.setServiceDeploymentType(ServiceDeploymentType.unspecified);
+        }
+        // Service Id
+        String serviceId = null;
+        if(jobRequestProperties.containsKey(ServiceRequestProperties.key_service_id)) {
+        	serviceId = jobRequestProperties.getProperty(ServiceRequestProperties.key_service_id);
+        }
+        job.setServiceId(serviceId);
+		// sweep out leftover logging trash
+		logSweeper(jobRequestProperties.getProperty(JobRequestProperties.key_log_directory), job.getDuccId());
+		// log
+		jobRequestProperties.specification(logger);
+		// classpath
+        String java_classpath = getDuccClasspath(1);    // for job processes & services 
+		String processClasspath = jobRequestProperties.getProperty(JobSpecificationProperties.key_classpath);
+		logger.debug(methodName, job.getDuccId(), "process CP (spec):"+processClasspath);
+		logger.debug(methodName, job.getDuccId(), "java CP:"+java_classpath);
+		if(processClasspath != null) {
+			if(isClasspathOrderUserBeforeDucc(jobRequestProperties.getProperty(JobSpecificationProperties.key_classpath_order),job.getDuccId())) {
+				logger.info(methodName, job.getDuccId(), "process:OrderUserBeforeDucc");
+				processClasspath=processClasspath+File.pathSeparator+java_classpath;
+			}
+			else {
+				logger.info(methodName, job.getDuccId(), "process:OrderDuccBeforeUser");
+				processClasspath=java_classpath+File.pathSeparator+processClasspath;
+			}
+		}
+		else {
+			logger.info(methodName, job.getDuccId(), "process:OrderDefault");
+			processClasspath=java_classpath;
+		}
+		logger.debug(methodName, job.getDuccId(), "process CP (combined):"+processClasspath);
+		// java command
+		String javaCmd = jobRequestProperties.getProperty(JobSpecificationProperties.key_jvm);
+		if(javaCmd == null) {
+            // Agent will set javaCmd for Driver and Processes
+		}
+		// standard info
+		DuccStandardInfo standardInfo = new DuccStandardInfo();
+		job.setStandardInfo(standardInfo);
+		standardInfo.setUser(jobRequestProperties.getProperty(JobSpecificationProperties.key_user));
+		standardInfo.setSubmitter(jobRequestProperties.getProperty(JobSpecificationProperties.key_submitter_pid_at_host));
+		standardInfo.setDateOfSubmission(TimeStamp.getCurrentMillis());
+		standardInfo.setDateOfCompletion(null);
+		standardInfo.setDescription(jobRequestProperties.getProperty(JobSpecificationProperties.key_description));
+		standardInfo.setLogDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
+		standardInfo.setWorkingDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_working_directory));
+		String notifications = jobRequestProperties.getProperty(JobSpecificationProperties.key_notifications);
+		if(notifications == null) {
+			standardInfo.setNotifications(null);
+		}
+		else {
+			String[] notificationsArray = notifications.split(" ,");
+			for(String notification : notificationsArray) {
+				notification.trim();
+			}
+			standardInfo.setNotifications(notificationsArray);
+		}
+		// scheduling info
+		DuccSchedulingInfo schedulingInfo = new DuccSchedulingInfo();
+		String ducc_rm_share_quantum = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_rm_share_quantum);
+		if(ducc_rm_share_quantum != null) {
+			ducc_rm_share_quantum = ducc_rm_share_quantum.trim();
+			if(ducc_rm_share_quantum.length() > 0) {
+				schedulingInfo.setShareMemorySize(ducc_rm_share_quantum);
+			}
+		}
+		job.setSchedulingInfo(schedulingInfo);
+		schedulingInfo.setSchedulingClass(jobRequestProperties.getProperty(JobSpecificationProperties.key_scheduling_class));
+		schedulingInfo.setSchedulingPriority(jobRequestProperties.getProperty(JobSpecificationProperties.key_scheduling_priority));
+		schedulingInfo.setSharesMax(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_deployments_max));
+		schedulingInfo.setSharesMin(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_deployments_min));
+		schedulingInfo.setThreadsPerShare(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_thread_count));
+		schedulingInfo.setShareMemorySize(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_memory_size));
+		schedulingInfo.setShareMemoryUnits(MemoryUnits.GB);
+		
+		if (job.getDuccType() == DuccType.Job){ 
+		    checkSchedulingLimits(job, schedulingInfo);
+		}
+		
+		// process_initialization_time_max
+		String pi_time = jobRequestProperties.getProperty(JobRequestProperties.key_process_initialization_time_max);
+		if(pi_time == null) {
+			pi_time = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_agent_launcher_process_init_timeout);
+		}
+		try {
+			long value = Long.parseLong(pi_time)*60*1000;
+			standardInfo.setProcessInitializationTimeMax(value);
+		}
+		catch(Exception e) {
+			logger.error(methodName, job.getDuccId(), e);
+		}
+		// jp
+		ServiceDeploymentType serviceDeploymentType = job.getServiceDeploymentType();
+		if(isJpUima(duccType, serviceDeploymentType)) {
+			String process_DD = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_DD);
+			if(process_DD != null) {
+				// user DD
+				IDuccUimaDeploymentDescriptor uimaDeploymentDescriptor = new DuccUimaDeploymentDescriptor(process_DD);
+				job.setUimaDeployableConfiguration(uimaDeploymentDescriptor);
+				dump(job, uimaDeploymentDescriptor);
+			}
+			else {
+				// UIMA aggregate
+				String name = common.jdQueuePrefix+job.getDuccId().toString();
+				String description = job.getStandardInfo().getDescription();
+				int threadCount = Integer.parseInt(job.getSchedulingInfo().getThreadsPerShare());
+				String brokerURL = job.getjobBroker();;
+				String endpoint = job.getjobQueue();
+				ArrayList<IDuccUimaAggregateComponent> components = new ArrayList<IDuccUimaAggregateComponent>();
+				String CMDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM);
+				if(CMDescriptor != null) {
+					ArrayList<String> CMOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM_overrides));
+					IDuccUimaAggregateComponent componentCM = new DuccUimaAggregateComponent(CMDescriptor, CMOverrides);
+					components.add(componentCM);
+				}
+				String AEDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE);
+				if(AEDescriptor != null) {
+					ArrayList<String> AEOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE_overrides));
+					IDuccUimaAggregateComponent componentAE = new DuccUimaAggregateComponent(AEDescriptor, AEOverrides);
+					components.add(componentAE);
+				}
+				String CCDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC);
+				if(CCDescriptor != null) {
+					ArrayList<String> CCOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC_overrides));
+					IDuccUimaAggregateComponent componentCC = new DuccUimaAggregateComponent(CCDescriptor, CCOverrides);
+					components.add(componentCC);
+				}
+				IDuccUimaAggregate uimaAggregate = new DuccUimaAggregate(name,description,threadCount,brokerURL,endpoint,components);
+				job.setUimaDeployableConfiguration(uimaAggregate);
+				dump(job, uimaAggregate);
+			}
+			// pipelines
+			JavaCommandLine pipelineCommandLine = new JavaCommandLine(javaCmd);
+			pipelineCommandLine.setClassName("main:provided-by-Process-Manager");
+			pipelineCommandLine.setClasspath(processClasspath);
+			String process_jvm_args = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_jvm_args);
+			ArrayList<String> pTokens = QuotedOptions.tokenizeList(process_jvm_args, true);
+			for(String token : pTokens) {
+				pipelineCommandLine.addOption(token);
+			}
+		    // Add any site-provided JVM args
+	        String siteJvmArgs = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_process_jvm_args);
+	        pTokens = QuotedOptions.tokenizeList(siteJvmArgs, true);   // a null arg is acceptable
+	        for(String token : pTokens) {
+	            pipelineCommandLine.addOption(token);
+	        }
+			
+			String processEnvironmentVariables = jobRequestProperties.getProperty(JobSpecificationProperties.key_environment);
+			int envCountProcess = addEnvironment(job, "process", pipelineCommandLine, processEnvironmentVariables);
+			logger.info(methodName, job.getDuccId(), "process env vars: "+envCountProcess);
+			logger.debug(methodName, job.getDuccId(), "pipeline: "+pipelineCommandLine.getCommand());
+			pipelineCommandLine.setLogDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
+			job.setCommandLine(pipelineCommandLine);
+		}
+		else {
+			// ducclet (sometimes known as arbitrary process)
+			String process_executable = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_executable);
+			NonJavaCommandLine executableProcessCommandLine = new NonJavaCommandLine(process_executable);
+			String processEnvironmentVariables = jobRequestProperties.getProperty(JobSpecificationProperties.key_environment);
+			int envCountProcess = addEnvironment(job, "process", executableProcessCommandLine, processEnvironmentVariables);
+			logger.info(methodName, job.getDuccId(), "process env vars: "+envCountProcess);
+			logger.debug(methodName, job.getDuccId(), "ducclet: "+executableProcessCommandLine.getCommandLine());
+			job.setCommandLine(executableProcessCommandLine);
+			// Tokenize arguments string and strip any quotes, then add to command line.
+			// Note: placeholders replaced by CLI so can avoid the add method.
+			List<String> process_executable_arguments = QuotedOptions.tokenizeList(
+			        jobRequestProperties.getProperty(JobSpecificationProperties.key_process_executable_args), true);
+			executableProcessCommandLine.getArguments().addAll(process_executable_arguments);
+		}
+		// process_initialization_failures_cap
+		String failures_cap = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_initialization_failures_cap);
+		try {
+			long process_failures_cap = Long.parseLong(failures_cap);
+			if(process_failures_cap > 0) {
+				job.setProcessInitFailureCap(process_failures_cap);
+			}
+			else {
+				logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_initialization_failures_cap+": "+failures_cap);
+			}
+		}
+		catch(Exception e) {
+			logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_initialization_failures_cap+": "+failures_cap);
+		}
+		// process_failures_limit
+		String failures_limit = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_failures_limit);
+		try {
+			long process_failures_limit = Long.parseLong(failures_limit);
+			if(process_failures_limit > 0) {
+				job.setProcessFailureLimit(process_failures_limit);
+			}
+			else {
+				logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_failures_limit+": "+failures_limit);
+			}
+		}
+		catch(Exception e) {
+			logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_failures_limit+": "+failures_limit);
+		}
+        //
+        // Set the service dependency, if there is one.
+        //
+        String depstr = jobRequestProperties.getProperty(JobSpecificationProperties.key_service_dependency);
+        if ( depstr == null ) {            
+            logger.debug(methodName, job.getDuccId(), "No service dependencies");
+        } else {
+            logger.debug(methodName, job.getDuccId(), "Adding service dependency", depstr);
+            String[] deps = depstr.split("\\s+");      
+            job.setServiceDependencies(deps);
+        }
+        // Service Endpoint
+        String ep = jobRequestProperties.getProperty(ServiceRequestProperties.key_service_request_endpoint);
+        if ( ep == null ) {                     
+            logger.debug(methodName, job.getDuccId(), "No service endpoint");
+        } else {
+            logger.debug(methodName, job.getDuccId(), "Adding service endpoint", ep);
+            job.setServiceEndpoint(ep);
+        }
+        // Cancel On Interrupt
+        if(jobRequestProperties.containsKey(JobSpecificationProperties.key_cancel_on_interrupt)) {
+        	job.setCancelOnInterrupt();
+        }
+        else if(jobRequestProperties.containsKey(ReservationSpecificationProperties.key_cancel_managed_reservation_on_interrupt)) {
+        	job.setCancelOnInterrupt();
+        }
+		//TODO be sure to clean-up fpath upon job completion!
+		return job;
+	}
+	
+	/*
+	 * Get minimal subset of the DUCC classpath for job driver & job processes
+	 * Cache the values unless asked to reload when testing
+	 */
+	private String[] cps = null;
+	private String getDuccClasspath(int type) {
+	    if (cps != null) {
+	        return cps[type];
+	    }
+	    DuccProperties props = new DuccProperties();
+	    try {
+	        props.load(IDuccEnv.DUCC_CLASSPATH_FILE);
+	    } catch (Exception e) {
+	        logger.error("getClasspath", null, "Using full classpath as failed to load " + IDuccEnv.DUCC_CLASSPATH_FILE);
+	        return System.getProperty("java.class.path");
+	    }
+	    // If reload specified don't cache the results (for ease of testing changes to the classpaths)
+	    if (props.getProperty("ducc.reload.file") != null) {
+	        return props.getProperty(type==0 ? "ducc.jobdriver.classpath" : "ducc.jobprocess.classpath");
+	    } else {
+	        cps = new String[2];
+	        cps[0] = props.getProperty("ducc.jobdriver.classpath");
+	        cps[1] = props.getProperty("ducc.jobprocess.classpath");
+	        return cps[type];
+	    }
+	}
+}

Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/factory/JobFactoryV2.java
------------------------------------------------------------------------------
    svn:eol-style = native