You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:37:56 UTC
svn commit: r1427956 [1/4] - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator: main/ main/java/
main/java/org/ main/java/org/apache/ main/java/org/apache/uima/
main/java/org/apache/uima/ducc/
main/java/org/apache/uima/ducc/orchestrator/ main/jav...
Author: cwiklik
Date: Wed Jan 2 19:37:55 2013
New Revision: 1427956
URL: http://svn.apache.org/viewvc?rev=1427956&view=rev
Log:
UIMA-2491
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Constants.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobDriverHostManager.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobFactory.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Orchestrator.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCheckpoint.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorComponent.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorConstants.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ProcessAccounting.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ReservationFactory.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateJobAccounting.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/StateManager.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Validate.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/authentication/DuccWebAdministrators.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/config/OrchestratorConfiguration.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/JdStateEventLogger.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/NodeInventoryEventLogger.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/OrchestratorEventListener.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/event/RMStateEventLogger.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/HealthMonitor.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MaintenanceThread.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/MqReaper.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/maintenance/NodeAccounting.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/monitor/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/monitor/Xmon.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/Checkpointable.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/ComponentHelper.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/utilities/MemorySpecification.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/resources/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/test/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/test/java/
uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/test/resources/
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Constants.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Constants.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Constants.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Constants.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+public interface Constants {
+
+ public static final int SYNC_LIMIT = 100;
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Constants.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobDriverHostManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobDriverHostManager.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobDriverHostManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobDriverHostManager.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.util.ArrayList;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.jd.JdConstants;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.cli.ReservationRequestProperties;
+import org.apache.uima.ducc.transport.event.cli.ReservationSpecificationProperties;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.DuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccReservationMap;
+import org.apache.uima.ducc.transport.event.common.IDuccState.ReservationState;
+
+
+public class JobDriverHostManager {
+
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(JobDriverHostManager.class.getName());
+
+ private static OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+ private static Messages messages = orchestratorCommonArea.getSystemMessages();
+ private static CommonConfiguration commonConfiguration = orchestratorCommonArea.getCommonConfiguration();
+
+ private static JobDriverHostManager hostManager = new JobDriverHostManager();
+
+ public static JobDriverHostManager getInstance() {
+ return hostManager;
+ }
+
+ private String jdHostClass = "job-driver";
+ private String jdHostDescription = "Job Driver";
+ private String jdHostMemorySize = "8GB";
+ private String jdHostUser = JdConstants.reserveUser;
+ private String jdHostNumberOfMachines = "1";
+
+ private ArrayList<String> keyList = new ArrayList<String>();
+ private TreeMap<String,NodeIdentity> nodeMap = new TreeMap<String,NodeIdentity>();
+
+ private AtomicBoolean assigned = new AtomicBoolean(false);
+
+ private DuccWorkReservation duccWorkReservation = null;
+
+ public JobDriverHostManager() {
+ }
+
+ private void updateAssigned() {
+ if(keyList.isEmpty()) {
+ assigned.set(false);
+ }
+ else {
+ assigned.set(true);
+ }
+ }
+
+ public void addNode(NodeIdentity node) {
+ synchronized(nodeMap) {
+ if(node != null) {
+ String key = node.getName();
+ nodeMap.put(key, node);
+ keyList.add(key);
+ }
+ updateAssigned();
+ }
+ }
+
+ public void delNode(NodeIdentity node) {
+ synchronized(nodeMap) {
+ if(node != null) {
+ String key = node.getName();
+ nodeMap.remove(key);
+ keyList.remove(key);
+ }
+ updateAssigned();
+ }
+ }
+
+ public int nodes() {
+ if(!assigned.get()) {
+ tryAssignment();
+ }
+ return nodeMap.size();
+ }
+
+ public NodeIdentity getNode() {
+ NodeIdentity retVal = null;
+ if(!assigned.get()) {
+ tryAssignment();
+ }
+ synchronized(nodeMap) {
+ if(!nodeMap.isEmpty()) {
+ String key = keyList.remove(0);
+ keyList.add(key);
+ retVal = nodeMap.get(key);
+ }
+ }
+ return retVal;
+ }
+
+ private void tryAssignment() {
+ String methodName = "tryAssignment";
+ synchronized(nodeMap) {
+ if(duccWorkReservation != null) {
+ if(duccWorkReservation.isDispatchable()) {
+ if(!duccWorkReservation.getReservationMap().isEmpty()) {
+ IDuccReservationMap map = duccWorkReservation.getReservationMap();
+ if(!map.isEmpty()) {
+ keyList = new ArrayList<String>();
+ nodeMap = new TreeMap<String,NodeIdentity>();
+ for (DuccId key : map.keySet()) {
+ IDuccReservation value = duccWorkReservation.getReservationMap().get(key);
+ NodeIdentity node = value.getNodeIdentity();
+ addNode(node);
+ logger.info(methodName, null, messages.fetchLabel("assigned")+node.getName()+" "+node.getIp());
+ }
+ }
+ }
+ }
+ }
+ }
+ return;
+ }
+
+ private void setConfiguration() {
+ String methodName="setConfiguration";
+ CommonConfiguration common = JobDriverHostManager.commonConfiguration;
+ if(common.jdHostClass != null) {
+ jdHostClass = common.jdHostClass;
+ logger.debug(methodName, null, messages.fetchLabel("jd.host.class")+jdHostClass);
+ }
+ else {
+ logger.debug(methodName, null, messages.fetchLabel("jd.host.class")+jdHostClass+" "+messages.fetch("(default)"));
+ }
+ if(common.jdHostDescription != null) {
+ jdHostDescription = common.jdHostDescription;
+ logger.debug(methodName, null, messages.fetchLabel("jd.host.description")+jdHostDescription);
+ }
+ else {
+ logger.debug(methodName, null, messages.fetchLabel("jd.host.description")+jdHostDescription+" "+messages.fetch("(default)"));
+ }
+ if(common.jdHostMemorySize != null) {
+ jdHostMemorySize = common.jdHostMemorySize;
+ logger.debug(methodName, null, messages.fetchLabel("jd.host.memory.size")+jdHostMemorySize);
+ }
+ else {
+ logger.debug(methodName, null, messages.fetchLabel("jd.host.memory.size")+jdHostMemorySize+" "+messages.fetch("(default)"));
+ }
+ if(common.jdHostNumberOfMachines != null) {
+ jdHostNumberOfMachines = common.jdHostNumberOfMachines;
+ logger.debug(methodName, null, messages.fetchLabel("jd.host.number.of.machines")+jdHostNumberOfMachines);
+ }
+ else {
+ logger.debug(methodName, null, messages.fetchLabel("jd.host.number.of.machines")+jdHostNumberOfMachines+" "+messages.fetch("(default)"));
+ }
+ if(common.jdHostUser != null) {
+ jdHostUser = common.jdHostUser;
+ logger.debug(methodName, null, messages.fetchLabel("jd.host.user")+jdHostUser);
+ }
+ else {
+ logger.debug(methodName, null, messages.fetchLabel("jd.host.user")+jdHostUser+" "+messages.fetch("(default)"));
+ }
+ }
+
+ private boolean processJdHostClass() {
+ String methodName = "processJdHostClass";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ boolean retVal = false;
+ if(commonConfiguration.jdHostClass != null) {
+ setConfiguration();
+ ReservationRequestProperties reservationRequestProperties = new ReservationRequestProperties();
+ reservationRequestProperties.put(ReservationSpecificationProperties.key_scheduling_class, jdHostClass);
+ reservationRequestProperties.put(ReservationSpecificationProperties.key_description, jdHostDescription);
+ reservationRequestProperties.put(ReservationSpecificationProperties.key_instance_memory_size, jdHostMemorySize);
+ reservationRequestProperties.put(ReservationSpecificationProperties.key_number_of_instances, jdHostNumberOfMachines);
+ reservationRequestProperties.put(ReservationSpecificationProperties.key_user, jdHostUser);
+ duccWorkReservation = ReservationFactory.getInstance().create(commonConfiguration, reservationRequestProperties);
+ DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+ workMap.addDuccWork(duccWorkReservation);
+ // state: Received
+ duccWorkReservation.stateChange(ReservationState.Received);
+ OrchestratorCheckpoint.getInstance().saveState();
+ // state: WaitingForResources
+ duccWorkReservation.stateChange(ReservationState.WaitingForResources);
+ OrchestratorCheckpoint.getInstance().saveState();
+ retVal = true;
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return retVal;
+ }
+
+ public void init() {
+ String methodName = "init";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ processJdHostClass();
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return ;
+ }
+
+ public void conditional() {
+ String methodName = "conditional";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+ if(workMap.size() == 0) {
+ logger.info(methodName, null, messages.fetch("make allocation for JD"));
+ init();
+ }
+ else {
+ logger.info(methodName, null, messages.fetch("bypass allocation for JD"));
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return ;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobDriverHostManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobFactory.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobFactory.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobFactory.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobFactory.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
+import org.apache.uima.ducc.common.utils.TimeStamp;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
+import org.apache.uima.ducc.common.utils.id.IDuccIdFactory;
+import org.apache.uima.ducc.transport.cmdline.JavaCommandLine;
+import org.apache.uima.ducc.transport.event.cli.JobRequestProperties;
+import org.apache.uima.ducc.transport.event.cli.JobSpecificationProperties;
+import org.apache.uima.ducc.transport.event.cli.ServiceRequestProperties;
+import org.apache.uima.ducc.transport.event.common.DuccProcess;
+import org.apache.uima.ducc.transport.event.common.DuccSchedulingInfo;
+import org.apache.uima.ducc.transport.event.common.DuccStandardInfo;
+import org.apache.uima.ducc.transport.event.common.DuccUimaAggregate;
+import org.apache.uima.ducc.transport.event.common.DuccUimaAggregateComponent;
+import org.apache.uima.ducc.transport.event.common.DuccUimaDeploymentDescriptor;
+import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.DuccWorkPopDriver;
+import org.apache.uima.ducc.transport.event.common.IDuccCommand;
+import org.apache.uima.ducc.transport.event.common.IDuccUimaAggregate;
+import org.apache.uima.ducc.transport.event.common.IDuccUimaAggregateComponent;
+import org.apache.uima.ducc.transport.event.common.IDuccUimaDeploymentDescriptor;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessType.ProcessType;
+import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
+import org.apache.uima.ducc.transport.event.common.IDuccUnits.MemoryUnits;
+import org.apache.uima.ducc.transport.event.common.IResourceState.ResourceState;
+
+
+public class JobFactory {
+ private static JobFactory jobFactory = new JobFactory();
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(JobFactory.class.getName());
+
+ public static JobFactory getInstance() {
+ return jobFactory;
+ }
+
+ private OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+ private IDuccIdFactory jobDuccIdFactory = orchestratorCommonArea.getJobDuccIdFactory();
+ private IDuccIdFactory serviceDuccIdFactory = orchestratorCommonArea.getServiceDuccIdFactory();
+ private JobDriverHostManager hostManager = orchestratorCommonArea.getHostManager();
+ private DuccIdFactory duccIdFactory = new DuccIdFactory();
+
+ private String java_classpath = System.getProperty("java.class.path");
+ private String classpath_order = System.getProperty("ducc.orchestrator.job.factory.classpath.order");
+
+ private int addEnvironment(DuccWorkJob job, String type, JavaCommandLine javaCommandLine, String environmentVariables) {
+ String methodName = "addEnvironment";
+ logger.trace(methodName, job.getDuccId(), "enter");
+ int retVal = 0;
+ if(environmentVariables != null) {
+ String[] envVarList = environmentVariables.split("\\s+");
+ for (String envVar : envVarList) {
+ String[] kv = envVar.split("=");
+ String envKey = kv[0].trim();
+ String envValue = kv[1].trim();
+ javaCommandLine.addEnvVar(envKey, envValue);
+ String message = "type:"+type+" "+"key:"+envKey+" "+"value:"+envValue;
+ logger.debug(methodName, job.getDuccId(), message);
+ }
+ retVal++;
+ }
+ logger.trace(methodName, job.getDuccId(), "exit");
+ return retVal;
+ }
+
+ private void checkSpec(DuccWorkJob job, JobRequestProperties jobRequestProperties) {
+ String methodName = "checkSpec";
+ logger.trace(methodName, job.getDuccId(), "enter");
+ jobRequestProperties.normalize();
+ Enumeration<Object> keys = jobRequestProperties.keys();
+ while(keys.hasMoreElements()) {
+ String key = (String) keys.nextElement();
+ if(!jobRequestProperties.isRecognized(key)) {
+ logger.warn(methodName, job.getDuccId(), "unrecognized: "+key);
+ }
+ }
+ logger.trace(methodName, job.getDuccId(), "exit");
+ return;
+ }
+
+ private ArrayList<String> toArrayList(String overrides) {
+ String methodName = "toArrayList";
+ logger.trace(methodName, null, "enter");
+ ArrayList<String> list = new ArrayList<String>();
+ if(overrides != null) {
+ String[] items = overrides.split(",");
+ for(String item : items) {
+ list.add(item.trim());
+ }
+ }
+ logger.trace(methodName, null, "exit");
+ return list;
+ }
+
+ /*
+ private static ArrayList<String> parseJvmArgs(String jvmArgs) {
+ ArrayList<String> args = new ArrayList<String>();
+ if(jvmArgs != null) {
+ String[] tokens = jvmArgs.split("-");
+ for(String token : tokens) {
+ String flag = token.trim();
+ if(flag.equals("")) {
+ }
+ else if(flag.equals("\"")) {
+ }
+ else if(flag.equals("'")) {
+ }
+ else {
+ args.add("-"+flag);
+ }
+ }
+ }
+ return args;
+ }
+ */
+
+ // tokenize by blanks, but keep quoted items together
+
+ private ArrayList<String> parseJvmArgs(String jvmArgs) {
+ ArrayList<String> matchList = new ArrayList<String>();
+ if(jvmArgs != null) {
+ Pattern regex = Pattern.compile("[^\\s\"']+|\"([^\"]*)\"|'([^']*)'");
+ Matcher regexMatcher = regex.matcher(jvmArgs);
+ while (regexMatcher.find()) {
+ if (regexMatcher.group(1) != null) {
+ // Add double-quoted string without the quotes
+ matchList.add(regexMatcher.group(1));
+ } else if (regexMatcher.group(2) != null) {
+ // Add single-quoted string without the quotes
+ matchList.add(regexMatcher.group(2));
+ } else {
+ // Add unquoted word
+ matchList.add(regexMatcher.group());
+ }
+ }
+ }
+ return matchList;
+ }
+
+ private void dump(DuccWorkJob job, IDuccUimaAggregate uimaAggregate) {
+ String methodName = "dump";
+ logger.info(methodName, job.getDuccId(), "brokerURL "+uimaAggregate.getBrokerURL());
+ logger.info(methodName, job.getDuccId(), "endpoint "+uimaAggregate.getEndpoint());
+ logger.info(methodName, job.getDuccId(), "description "+uimaAggregate.getDescription());
+ logger.info(methodName, job.getDuccId(), "name "+uimaAggregate.getName());
+ logger.info(methodName, job.getDuccId(), "thread-count "+uimaAggregate.getThreadCount());
+ List<IDuccUimaAggregateComponent> components = uimaAggregate.getComponents();
+ for(IDuccUimaAggregateComponent component : components) {
+ logger.info(methodName, job.getDuccId(), "descriptor "+component.getDescriptor());
+ List<String> overrides = component.getOverrides();
+ for(String override : overrides) {
+ logger.info(methodName, job.getDuccId(), "override "+override);
+ }
+ }
+ }
+
+ private void dump(DuccWorkJob job, IDuccUimaDeploymentDescriptor uimaDeploymentDescriptor) {
+ String methodName = "dump";
+ logger.info(methodName, job.getDuccId(), "uimaDeploymentDescriptor "+uimaDeploymentDescriptor);
+ }
+
+ public void logSweeper(String logDir, DuccId jobId) {
+ String methodName = "logSweeper";
+ if(logDir != null) {
+ if(jobId != null) {
+ if(!logDir.endsWith(File.separator)) {
+ logDir += File.separator;
+ }
+ logDir += jobId;
+ try {
+ File file = new File(logDir);
+ if(file.exists()) {
+ File dest = new File(logDir+"."+"sweep"+"."+java.util.Calendar.getInstance().getTime().toString());
+ file.renameTo(dest);
+ logger.warn(methodName, jobId, "renamed "+logDir);
+ }
+ }
+ catch(Throwable t) {
+ logger.warn(methodName, jobId, "unable to rename "+logDir, t);
+ }
+ }
+ else {
+ logger.warn(methodName, jobId, "jobId is null");
+ }
+ }
+ else {
+ logger.warn(methodName, jobId, "logDir is null");
+ }
+ }
+
+ private boolean isClasspathOrderUserBeforeDucc(String user_specified_classpath_order, DuccId jobId) {
+ String methodName = "isClasspathOrderUserBeforeDucc";
+ boolean retVal = false;
+ if(user_specified_classpath_order != null) {
+ if(user_specified_classpath_order.trim().equals("user-before-ducc")) {
+ logger.warn(methodName, jobId, "user specified classpath order: "+user_specified_classpath_order);
+ retVal = true;
+ }
+ }
+ else if(classpath_order != null) {
+ if(classpath_order.trim().equals("user-before-ducc")) {
+ retVal = true;
+ }
+ }
+ return retVal;
+ }
+
+ public DuccWorkJob create(CommonConfiguration common, JobRequestProperties jobRequestProperties) {
+ String methodName = "create";
+ DuccWorkJob job = new DuccWorkJob();
+ checkSpec(job, jobRequestProperties);
+ // id, type
+ String ddCr = jobRequestProperties.getProperty(JobSpecificationProperties.key_driver_descriptor_CR);
+ if(ddCr == null) {
+ job.setDuccType(DuccType.Service);
+ job.setDuccId(serviceDuccIdFactory.next());
+ }
+ else {
+ job.setDuccType(DuccType.Job);
+ job.setDuccId(jobDuccIdFactory.next());
+
+ }
+ // sweep out leftover logging trash
+ logSweeper(jobRequestProperties.getProperty(JobRequestProperties.key_log_directory), job.getDuccId());
+ // log
+ jobRequestProperties.specification(logger);
+ // classpath
+ String processClasspath = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_classpath);
+ logger.debug(methodName, job.getDuccId(), "process CP (spec):"+processClasspath);
+ logger.debug(methodName, job.getDuccId(), "java CP:"+java_classpath);
+ if(processClasspath != null) {
+ if(isClasspathOrderUserBeforeDucc(jobRequestProperties.getProperty(JobSpecificationProperties.key_classpath_order),job.getDuccId())) {
+ logger.debug(methodName, job.getDuccId(), "process:OrderUserBeforeDucc");
+ processClasspath=processClasspath+File.pathSeparator+java_classpath;
+ }
+ else {
+ processClasspath=java_classpath+File.pathSeparator+processClasspath;
+ }
+ }
+ else {
+ processClasspath=java_classpath;
+ }
+ logger.debug(methodName, job.getDuccId(), "process CP (combined):"+processClasspath);
+ // java command
+ String javaCmd = jobRequestProperties.getProperty(JobSpecificationProperties.key_jvm);
+ if(javaCmd == null) {
+ // Agent will set javaCmd for Driver and Processes
+ }
+ // driver
+ switch(job.getDuccType()) {
+ case Job:
+ String job_broker = jobRequestProperties.getProperty(JobRequestProperties.key_job_broker);
+ if(job_broker == null) {
+ job_broker = common.brokerUrl;
+ }
+ job.setJobBroker(job_broker);
+ String job_queue = jobRequestProperties.getProperty(JobRequestProperties.key_job_endpoint);
+ if(job_queue == null) {
+ job_queue = common.jdQueuePrefix+job.getDuccId();
+ }
+ job.setJobQueue(job_queue);
+ String crxml = jobRequestProperties.getProperty(JobSpecificationProperties.key_driver_descriptor_CR);
+ String crcfg = jobRequestProperties.getProperty(JobSpecificationProperties.key_driver_descriptor_CR_overrides);
+ String meta_time = jobRequestProperties.getProperty(JobRequestProperties.key_process_get_meta_time_max);
+ if(meta_time == null) {
+ meta_time = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.default_process_get_meta_time_max);
+ }
+ String wi_time = jobRequestProperties.getProperty(JobRequestProperties.key_process_per_item_time_max);
+ if(wi_time == null) {
+ wi_time = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.default_process_per_item_time_max);
+ }
+ String processExceptionHandler = jobRequestProperties.getProperty(JobRequestProperties.key_driver_exception_handler);
+ DuccWorkPopDriver driver = new DuccWorkPopDriver(job.getjobBroker(), job.getjobQueue(), crxml, crcfg, meta_time, wi_time, processExceptionHandler);
+ JavaCommandLine driverCommandLine = new JavaCommandLine(javaCmd);
+ driverCommandLine.setClassName(IDuccCommand.main);
+ driverCommandLine.addOption(IDuccCommand.arg_ducc_deploy_configruation);
+ driverCommandLine.addOption(IDuccCommand.arg_ducc_deploy_components);
+ driverCommandLine.addOption(IDuccCommand.arg_ducc_job_id+job.getDuccId().toString());
+ // classpath
+ String driverClasspath = jobRequestProperties.getProperty(JobSpecificationProperties.key_driver_classpath);
+ logger.debug(methodName, job.getDuccId(), "driver CP (spec):"+driverClasspath);
+ logger.debug(methodName, job.getDuccId(), "java CP:"+java_classpath);
+ if(driverClasspath != null) {
+ if(isClasspathOrderUserBeforeDucc(jobRequestProperties.getProperty(JobSpecificationProperties.key_classpath_order),job.getDuccId())) {
+ logger.debug(methodName, job.getDuccId(), "driver:OrderUserBeforeDucc");
+ driverClasspath=driverClasspath+File.pathSeparator+java_classpath;
+ }
+ else {
+ driverClasspath=java_classpath+File.pathSeparator+driverClasspath;
+ }
+ }
+ else {
+ driverClasspath=java_classpath;
+ }
+ logger.debug(methodName, job.getDuccId(), "driver CP (combined):"+driverClasspath);
+ driverCommandLine.setClasspath(driverClasspath);
+ String driver_jvm_args = jobRequestProperties.getProperty(JobRequestProperties.key_driver_jvm_args);
+ ArrayList<String> dTokens = parseJvmArgs(driver_jvm_args);
+ for(String token : dTokens) {
+ driverCommandLine.addOption(token);
+ }
+ String driverEnvironmentVariables = jobRequestProperties.getProperty(JobRequestProperties.key_driver_environment);
+ int envCountDriver = addEnvironment(job, "driver", driverCommandLine, driverEnvironmentVariables);
+ logger.info(methodName, job.getDuccId(), "driver env vars: "+envCountDriver);
+ logger.debug(methodName, job.getDuccId(), "driver: "+driverCommandLine.getCommand());
+ driverCommandLine.setLogDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
+ driver.setCommandLine(driverCommandLine);
+ //
+ NodeIdentity nodeIdentity = hostManager.getNode();
+ DuccId duccId = duccIdFactory.next();
+ duccId.setFriendly(0);
+ DuccProcess driverProcess = new DuccProcess(duccId,nodeIdentity,ProcessType.Pop);
+ driverProcess.setResourceState(ResourceState.Allocated);
+ driverProcess.setNodeIdentity(nodeIdentity);
+ driver.getProcessMap().put(driverProcess.getDuccId(), driverProcess);
+ //
+ orchestratorCommonArea.getProcessAccounting().addProcess(duccId, job.getDuccId());
+ //
+ job.setDriver(driver);
+ break;
+ case Service:
+ break;
+ }
+ // standard info
+ DuccStandardInfo standardInfo = new DuccStandardInfo();
+ job.setStandardInfo(standardInfo);
+ standardInfo.setUser(jobRequestProperties.getProperty(JobSpecificationProperties.key_user));
+ standardInfo.setDateOfSubmission(TimeStamp.getCurrentMillis());
+ standardInfo.setDateOfCompletion(null);
+ standardInfo.setDescription(jobRequestProperties.getProperty(JobSpecificationProperties.key_description));
+ standardInfo.setLogDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
+ standardInfo.setWorkingDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_working_directory));
+ String notifications = jobRequestProperties.getProperty(JobSpecificationProperties.key_notifications);
+ if(notifications == null) {
+ standardInfo.setNotifications(null);
+ }
+ else {
+ String[] notificationsArray = notifications.split(" ,");
+ for(String notification : notificationsArray) {
+ notification.trim();
+ }
+ standardInfo.setNotifications(notificationsArray);
+ }
+ // scheduling info
+ DuccSchedulingInfo schedulingInfo = new DuccSchedulingInfo();
+ String ducc_rm_share_quantum = DuccPropertiesResolver.getInstance().getFileProperty(DuccPropertiesResolver.ducc_rm_share_quantum);
+ if(ducc_rm_share_quantum != null) {
+ ducc_rm_share_quantum = ducc_rm_share_quantum.trim();
+ if(ducc_rm_share_quantum.length() > 0) {
+ schedulingInfo.setShareMemorySize(ducc_rm_share_quantum);
+ }
+ }
+ job.setSchedulingInfo(schedulingInfo);
+ schedulingInfo.setSchedulingClass(jobRequestProperties.getProperty(JobSpecificationProperties.key_scheduling_class));
+ schedulingInfo.setSchedulingPriority(jobRequestProperties.getProperty(JobSpecificationProperties.key_scheduling_priority));
+ schedulingInfo.setSharesMax(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_deployments_max));
+ schedulingInfo.setSharesMin(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_deployments_min));
+ schedulingInfo.setThreadsPerShare(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_thread_count));
+ schedulingInfo.setShareMemorySize(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_memory_size));
+ schedulingInfo.setShareMemoryUnits(MemoryUnits.GB);
+ String process_DD = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_DD);
+ if(process_DD != null) {
+ // user DD
+ IDuccUimaDeploymentDescriptor uimaDeploymentDescriptor = new DuccUimaDeploymentDescriptor(process_DD);
+ job.setUimaDeployableConfiguration(uimaDeploymentDescriptor);
+ dump(job, uimaDeploymentDescriptor);
+ }
+ else {
+ // UIMA aggregate
+ String name = common.jdQueuePrefix+job.getDuccId().toString();
+ String description = job.getStandardInfo().getDescription();
+ int threadCount = Integer.parseInt(job.getSchedulingInfo().getThreadsPerShare());
+ String brokerURL = job.getjobBroker();;
+ String endpoint = job.getjobQueue();
+ ArrayList<IDuccUimaAggregateComponent> components = new ArrayList<IDuccUimaAggregateComponent>();
+ String CMDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM);
+ if(CMDescriptor != null) {
+ ArrayList<String> CMOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CM_overrides));
+ IDuccUimaAggregateComponent componentCM = new DuccUimaAggregateComponent(CMDescriptor, CMOverrides);
+ components.add(componentCM);
+ }
+ String AEDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE);
+ if(AEDescriptor != null) {
+ ArrayList<String> AEOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_AE_overrides));
+ IDuccUimaAggregateComponent componentAE = new DuccUimaAggregateComponent(AEDescriptor, AEOverrides);
+ components.add(componentAE);
+ }
+ String CCDescriptor = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC);
+ if(CCDescriptor != null) {
+ ArrayList<String> CCOverrides = toArrayList(jobRequestProperties.getProperty(JobSpecificationProperties.key_process_descriptor_CC_overrides));
+ IDuccUimaAggregateComponent componentCC = new DuccUimaAggregateComponent(CCDescriptor, CCOverrides);
+ components.add(componentCC);
+ }
+ IDuccUimaAggregate uimaAggregate = new DuccUimaAggregate(name,description,threadCount,brokerURL,endpoint,components);
+ job.setUimaDeployableConfiguration(uimaAggregate);
+ dump(job, uimaAggregate);
+ }
+ // pipelines
+ JavaCommandLine pipelineCommandLine = new JavaCommandLine(javaCmd);
+ pipelineCommandLine.setClassName("main:provided-by-Process-Manager");
+ pipelineCommandLine.setClasspath(processClasspath);
+ String process_jvm_args = jobRequestProperties.getProperty(JobRequestProperties.key_process_jvm_args);
+ ArrayList<String> pTokens = parseJvmArgs(process_jvm_args);
+ for(String token : pTokens) {
+ pipelineCommandLine.addOption(token);
+ }
+ String processEnvironmentVariables = jobRequestProperties.getProperty(JobRequestProperties.key_process_environment);
+ int envCountProcess = addEnvironment(job, "process", pipelineCommandLine, processEnvironmentVariables);
+ logger.info(methodName, job.getDuccId(), "process env vars: "+envCountProcess);
+ logger.debug(methodName, job.getDuccId(), "pipeline: "+pipelineCommandLine.getCommand());
+ pipelineCommandLine.setLogDirectory(jobRequestProperties.getProperty(JobSpecificationProperties.key_log_directory));
+ job.setCommandLine(pipelineCommandLine);
+ // process_initialization_failures_cap
+ String failures_cap = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_initialization_failures_cap);
+ try {
+ long process_failures_cap = Long.parseLong(failures_cap);
+ if(process_failures_cap > 0) {
+ job.setProcessInitFailureCap(process_failures_cap);
+ }
+ else {
+ logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_initialization_failures_cap+": "+failures_cap);
+ }
+ }
+ catch(Exception e) {
+ logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_initialization_failures_cap+": "+failures_cap);
+ }
+ // process_failures_limit
+ String failures_limit = jobRequestProperties.getProperty(JobSpecificationProperties.key_process_failures_limit);
+ try {
+ long process_failures_limit = Long.parseLong(failures_limit);
+ if(process_failures_limit > 0) {
+ job.setProcessFailureLimit(process_failures_limit);
+ }
+ else {
+ logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_failures_limit+": "+failures_limit);
+ }
+ }
+ catch(Exception e) {
+ logger.warn(methodName, job.getDuccId(), "invalid "+JobSpecificationProperties.key_process_failures_limit+": "+failures_limit);
+ }
+
+ //
+ // Set the service dependency, if there is one.
+ //
+ String depstr = jobRequestProperties.getProperty(JobSpecificationProperties.key_service_dependency);
+ if ( depstr == null ) {
+ logger.debug(methodName, job.getDuccId(), "No service dependencies");
+ } else {
+ logger.debug(methodName, job.getDuccId(), "Adding service dependency", depstr);
+ String[] deps = depstr.split(",");
+ job.setServiceDependencies(deps);
+ }
+
+ String ep = jobRequestProperties.getProperty(ServiceRequestProperties.key_service_request_endpoint);
+ if ( ep == null ) {
+ logger.debug(methodName, job.getDuccId(), "No service endpoint");
+ } else {
+ logger.debug(methodName, job.getDuccId(), "Adding service endpoint", ep);
+ job.setServiceEndpoint(ep);
+ }
+ //TODO be sure to clean-up fpath upon job completion!
+ return job;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/JobFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+
+
+public class ORTracer implements Processor {
+
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(ORTracer.class.getName());
+ private static final DuccId jobid = null;
+
+ private String name = null;
+
+ public ORTracer(String name) {
+ this.name = name;
+ }
+
+
+ public void process(Exchange arg0) throws Exception {
+ String location = "process";
+ logger.info(location, jobid, name);
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/ORTracer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Orchestrator.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Orchestrator.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Orchestrator.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Orchestrator.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import org.apache.uima.ducc.transport.event.CancelJobDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.CancelServiceDuccEvent;
+import org.apache.uima.ducc.transport.event.JdStateDuccEvent;
+import org.apache.uima.ducc.transport.event.NodeInventoryUpdateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorAbbreviatedStateDuccEvent;
+import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
+import org.apache.uima.ducc.transport.event.RmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SmStateDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitJobDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitReservationDuccEvent;
+import org.apache.uima.ducc.transport.event.SubmitServiceDuccEvent;
+
+public interface Orchestrator {
+ public void reconcileRmState(RmStateDuccEvent duccEvent);
+ public void reconcileSmState(SmStateDuccEvent duccEvent);
+ public void reconcileJdState(JdStateDuccEvent duccEvent);
+ public void reconcileNodeInventory(NodeInventoryUpdateDuccEvent duccEvent);
+ public OrchestratorStateDuccEvent getState();
+ public OrchestratorAbbreviatedStateDuccEvent getAbbreviatedState();
+ public void startJob(SubmitJobDuccEvent duccEvent);
+ public void stopJob(CancelJobDuccEvent duccEvent);
+ public void startReservation(SubmitReservationDuccEvent duccEvent);
+ public void stopReservation(CancelReservationDuccEvent duccEvent);
+ public void startService(SubmitServiceDuccEvent duccEvent);
+ public void stopService(CancelServiceDuccEvent duccEvent);
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/Orchestrator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCheckpoint.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCheckpoint.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCheckpoint.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCheckpoint.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.IOHelper;
+import org.apache.uima.ducc.orchestrator.utilities.Checkpointable;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+
+
+public class OrchestratorCheckpoint {
+
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(OrchestratorCheckpoint.class.getName());
+
+ private static OrchestratorCommonArea orchestratorCommonArea = OrchestratorCommonArea.getInstance();
+ private static Messages messages = orchestratorCommonArea.getSystemMessages();
+
+ private static String fileName = orchestratorCommonArea.getStateDirectory()+File.separator+"orchestrator.ckpt";
+
+ private static OrchestratorCheckpoint orchestratorCheckpoint = new OrchestratorCheckpoint();
+
+ public static OrchestratorCheckpoint getInstance() {
+ return orchestratorCheckpoint;
+ }
+
+ public OrchestratorCheckpoint() {
+ IOHelper.mkdirs(orchestratorCommonArea.getStateDirectory());
+ return;
+ }
+
+ private volatile boolean saveEnabled = false;
+ private volatile boolean restoreEnabled = false;
+
+ private volatile String status = "off";
+
+ public boolean switchOnOff(String position) {
+ String methodName = "switchOnOff";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ if(position != null) {
+ String desiredPosition = position.toLowerCase();
+ if(desiredPosition.equals("off")) {
+ resetSaveEnabled();
+ resetRestoreEnabled();
+ status = desiredPosition;
+ logger.debug(methodName, null, messages.fetchLabel("reset to")+position);
+ }
+ else if(desiredPosition.equals("on")) {
+ setSaveEnabled();
+ setRestoreEnabled();
+ status = desiredPosition;
+ logger.debug(methodName, null, messages.fetchLabel("set to")+position);
+ }
+ else {
+ logger.warn(methodName, null, messages.fetchLabel("ignored")+position);
+ }
+ }
+ else {
+ logger.debug(methodName, null, messages.fetchLabel("missing, using")+status);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return restoreEnabled && saveEnabled;
+ }
+
+ public boolean isSaveEnabled() {
+ String methodName = "isSaveEnabled";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ logger.debug(methodName, null, saveEnabled);
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return saveEnabled;
+ }
+
+ public void setSaveEnabled() {
+ String methodName = "setSaveEnabled";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ saveEnabled = true;
+ logger.debug(methodName, null, saveEnabled);
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+ public void resetSaveEnabled() {
+ String methodName = "resetSaveEnabled";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ saveEnabled = false;
+ logger.debug(methodName, null, saveEnabled);
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+ public boolean isRestoreEnabled() {
+ String methodName = "isRestoreEnabled";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ logger.debug(methodName, null, restoreEnabled);
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return restoreEnabled;
+ }
+
+ public void setRestoreEnabled() {
+ String methodName = "setRestoreEnabled";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ restoreEnabled = true;
+ logger.debug(methodName, null, restoreEnabled);
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+ public void resetRestoreEnabled() {
+ String methodName = "resetRestoreEnabled";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ restoreEnabled = false;
+ logger.debug(methodName, null, restoreEnabled);
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return;
+ }
+
+ public boolean saveState() {
+ String methodName = "saveState";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ boolean retVal = false;
+ if(saveEnabled) {
+ DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ try
+ {
+ logger.info(methodName, null, messages.fetchLabel("saving to")+fileName);
+ FileOutputStream fos = null;
+ ObjectOutputStream out = null;
+ fos = new FileOutputStream(fileName);
+ out = new ObjectOutputStream(fos);
+ Checkpointable checkpointable = orchestratorCommonArea.getCheckpointable();
+ out.writeObject(checkpointable);
+ out.close();
+ retVal = true;
+ logger.info(methodName, null, messages.fetchLabel("saved")+fileName);
+ }
+ catch(IOException e)
+ {
+ logger.error(methodName, null, e);
+ }
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ }
+ else {
+ logger.debug(methodName, null, messages.fetchLabel("bypass saving to")+fileName);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return retVal;
+ }
+
+ public boolean restoreState() {
+ String methodName = "restoreState";
+ logger.trace(methodName, null, messages.fetch("enter"));
+ boolean retVal = false;
+ if(saveEnabled) {
+ DuccWorkMap workMap = orchestratorCommonArea.getWorkMap();
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ try
+ {
+ logger.info(methodName, null, messages.fetchLabel("restoring from")+fileName);
+ FileInputStream fis = null;
+ ObjectInputStream in = null;
+ fis = new FileInputStream(fileName);
+ in = new ObjectInputStream(fis);
+ Checkpointable checkpointable = (Checkpointable)in.readObject();
+ orchestratorCommonArea.setCheckpointable(checkpointable);
+ in.close();
+ retVal = true;
+ logger.info(methodName, null, messages.fetch("restored"));
+ }
+ catch(IOException e)
+ {
+ logger.warn(methodName, null, e);
+ }
+ catch(ClassNotFoundException e)
+ {
+ logger.error(methodName, null, e);
+ }
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ }
+ else {
+ logger.info(methodName, null, messages.fetchLabel("bypass restoring from")+fileName);
+ }
+ logger.trace(methodName, null, messages.fetch("exit"));
+ return retVal;
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCheckpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java?rev=1427956&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java Wed Jan 2 19:37:55 2013
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.orchestrator;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.uima.ducc.common.IDuccEnv;
+import org.apache.uima.ducc.common.config.CommonConfiguration;
+import org.apache.uima.ducc.common.internationalization.Messages;
+import org.apache.uima.ducc.common.persistence.IPropertiesFileManager;
+import org.apache.uima.ducc.common.persistence.PropertiesFileManager;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.IDuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.common.utils.id.DuccIdFactory;
+import org.apache.uima.ducc.common.utils.id.IDuccIdFactory;
+import org.apache.uima.ducc.orchestrator.utilities.Checkpointable;
+import org.apache.uima.ducc.orchestrator.utilities.ComponentHelper;
+import org.apache.uima.ducc.transport.event.common.DuccWorkMap;
+import org.apache.uima.ducc.transport.event.common.history.HistoryPersistenceManager;
+import org.apache.uima.ducc.transport.event.jd.DriverStatusReport;
+
+
+public class OrchestratorCommonArea {
+
+ private static OrchestratorCommonArea orchestratorCommonArea = null;
+
+ private static final DuccLogger logger = DuccLoggerComponents.getOrLogger(OrchestratorCommonArea.class.getName());
+
+ public static OrchestratorCommonArea getInstance() {
+ assert(orchestratorCommonArea != null);
+ return orchestratorCommonArea;
+ }
+
+ public static void initialize(CommonConfiguration commonConfiguration) throws IOException {
+ orchestratorCommonArea = new OrchestratorCommonArea();
+ orchestratorCommonArea.commonConfiguration = commonConfiguration;
+ orchestratorCommonArea.init();
+ }
+
+ private OrchestratorCommonArea() {
+ }
+
+ private CommonConfiguration commonConfiguration = null;
+
+ public CommonConfiguration getCommonConfiguration() {
+ return commonConfiguration;
+ }
+
+ private HistoryPersistenceManager historyPersistenceManager = null;
+
+ private void init() {
+ ComponentHelper.oneInstance(IDuccEnv.DUCC_STATE_DIR,"orchestrator");
+ setPropertiesFileManager(new PropertiesFileManager(IDuccLoggerComponents.abbrv_orchestrator, IDuccEnv.DUCC_STATE_DIR, constOrchestratorProperties, false, true));
+ setJobDuccIdFactory(new DuccIdFactory(propertiesFileManager,constJobSeqNo));
+ setServiceDuccIdFactory(new DuccIdFactory(propertiesFileManager,constServiceSeqNo));
+ setReservationDuccIdFactory(new DuccIdFactory(propertiesFileManager,constReservationSeqNo));
+ workMap = new DuccWorkMap();
+ driverStatusReportMap = new ConcurrentHashMap<DuccId,DriverStatusReport>();
+ processAccounting = new ProcessAccounting();
+ OrchestratorCheckpoint.getInstance().switchOnOff(commonConfiguration.orchestratorCheckpoint);
+ OrchestratorCheckpoint.getInstance().restoreState();
+ hostManager = JobDriverHostManager.getInstance();
+ historyPersistenceManager = HistoryPersistenceManager.getInstance();
+ }
+
+ public String getStateDirectory() {
+ return IDuccEnv.DUCC_STATE_DIR;
+ }
+
+ private static final String constOrchestratorProperties = "orchestrator.properties";
+ private static final String constJobSeqNo = "job.seqno";
+ private static final String constServiceSeqNo = "service.seqno";
+ private static final String constReservationSeqNo = "reservation.seqno";
+
+ // **********
+
+ private IPropertiesFileManager propertiesFileManager = null;
+
+ private void setPropertiesFileManager(IPropertiesFileManager instance) {
+ propertiesFileManager = instance;
+ }
+
+ public IPropertiesFileManager getPropertiesFileManager() {
+ assert(propertiesFileManager != null);
+ return propertiesFileManager;
+ }
+
+ // **********
+
+ private IDuccIdFactory jobDuccIdFactory = null;
+
+ private void setJobDuccIdFactory(IDuccIdFactory instance) {
+ jobDuccIdFactory = instance;
+ }
+
+ public IDuccIdFactory getJobDuccIdFactory() {
+ return jobDuccIdFactory;
+ }
+
+ // **********
+
+ private IDuccIdFactory serviceDuccIdFactory = null;
+
+ private void setServiceDuccIdFactory(IDuccIdFactory instance) {
+ serviceDuccIdFactory = instance;
+ }
+
+ public IDuccIdFactory getServiceDuccIdFactory() {
+ return serviceDuccIdFactory;
+ }
+
+ // **********
+
+ private IDuccIdFactory reservationDuccIdFactory = null;
+
+ private void setReservationDuccIdFactory(IDuccIdFactory instance) {
+ reservationDuccIdFactory = instance;
+ }
+
+ public IDuccIdFactory getReservationDuccIdFactory() {
+ return reservationDuccIdFactory;
+ }
+
+ // **********
+
+ @SuppressWarnings("unchecked")
+ public Checkpointable getCheckpointable() {
+ String methodName = "getCheckpointable";
+ DuccWorkMap ckptWorkMap;
+ ConcurrentHashMap<DuccId,DuccId> ckptProcessToJobMap;
+ long t0 = System.currentTimeMillis();
+ synchronized(workMap) {
+ ckptWorkMap = (DuccWorkMap)SerializationUtils.clone(workMap);
+ ckptProcessToJobMap = (ConcurrentHashMap<DuccId,DuccId>)SerializationUtils.clone(processAccounting.getProcessToJobMap());
+ }
+ long t1 = System.currentTimeMillis();
+ long elapsed = t1 - t0;
+ if(elapsed > Constants.SYNC_LIMIT) {
+ logger.debug(methodName, null, "elapsed msecs: "+elapsed);
+ }
+ return new Checkpointable(ckptWorkMap,ckptProcessToJobMap);
+ }
+
+ public void setCheckpointable(Checkpointable checkpointable) {
+ synchronized(workMap) {
+ workMap = checkpointable.getWorkMap();
+ processAccounting = new ProcessAccounting(checkpointable.getProcessToJobMap());
+ }
+ }
+
+ // **********
+
+ private ProcessAccounting processAccounting;
+
+ public ProcessAccounting getProcessAccounting() {
+ return processAccounting;
+ }
+
+ // **********
+
+ private DuccWorkMap workMap = null;
+
+ public DuccWorkMap getWorkMap() {
+ return workMap;
+ }
+
+ public void setWorkMap(DuccWorkMap workMap) {
+ this.workMap = workMap;
+ }
+
+ // **********
+
+ private ConcurrentHashMap<DuccId,DriverStatusReport> driverStatusReportMap = null;
+
+ public ConcurrentHashMap<DuccId,DriverStatusReport> getDriverStatusReportMap() {
+ return driverStatusReportMap;
+ }
+
+ public void setDriverStatusReportMap(ConcurrentHashMap<DuccId,DriverStatusReport> driverStatusReportMap) {
+ this.driverStatusReportMap = driverStatusReportMap;
+ }
+
+ // **********
+
+ private Messages systemMessages= Messages.getInstance();
+ private Messages userMessages= Messages.getInstance();
+
+ public void initSystemMessages(String language, String country) {
+ systemMessages = Messages.getInstance(language,country);
+ }
+
+ public void initUserMessages(String language, String country) {
+ userMessages = Messages.getInstance(language,country);
+ }
+
+ public Messages getSystemMessages() {
+ return systemMessages;
+ }
+
+ public Messages getUserMessages() {
+ return userMessages;
+ }
+
+ // **********
+
+ private JobDriverHostManager hostManager = null;
+
+ public JobDriverHostManager getHostManager() {
+ return hostManager;
+ }
+
+ // **********
+
+ public HistoryPersistenceManager getHistoryPersistencemanager() {
+ return historyPersistenceManager;
+ }
+
+ // **********
+
+ private boolean signatureRequired = true;
+
+ public void setSignatureRequired() {
+ signatureRequired = true;
+ }
+
+ public void resetSignatureRequired() {
+ signatureRequired = false;
+ }
+
+ public boolean isSignatureRequired() {
+ return signatureRequired;
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/main/java/org/apache/uima/ducc/orchestrator/OrchestratorCommonArea.java
------------------------------------------------------------------------------
svn:eol-style = native