You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2013/01/02 20:49:56 UTC
svn commit: r1427978 [6/7] - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src: main/ main/java/
main/java/org/ main/java/org/apache/ main/java/org/apache/uima/
main/java/org/apache/uima/ducc/ main/java/org/apache/uima/ducc/transport/
main/java/...
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/JobManagerStateReconciler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/JobManagerStateReconciler.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/JobManagerStateReconciler.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/JobManagerStateReconciler.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.common;
+
+import java.util.Map;
+
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.cmdline.ICommandLine;
+import org.apache.uima.ducc.transport.event.common.IDuccWork;
+
+
+/**
+ * Reconciles two given Maps containing Job Manager's state. The left Map
+ * is what Job Manager posts at regular intervals. It contains the most up to date
+ * information about Jobs and Processes. The right Map is the local Map that a
+ * component maintains. It reflects the state of the Job Manager and it should stay in
+ * sync with the left Map. The reconciliation is done across Jobs as well as Process
+ * Maps. The reconciliation detects:
+ * <ul>
+ * <li>Jobs added</li>
+ * <li>Jobs removed</li>
+ * <li>Processes added to Jobs</li>
+ * <li>Processes removed from Jobs</li>
+ * <li>Processes in both Job Process Maps that are different (by state, PID)</li>
+ * </ul>
+ * <p>
+ * This class uses registered callback handlers to notify an application of events
+ * related to what the reconciler detects. An application must register the following
+ * callback handlers:
+ * <ul>
+ * <li><@link JobChangesHandler></li>
+ * <li><@link JobProcessChangesHandler)></li>
+ * </ul>
+ * The <@code reconcile()> method will fail if callback handlers are not injected
+ * prior to calling it.
+ * <p>
+ * The reconciliation is performed in multiple stages. First, new Jobs are detected and
+ * added to an intermediate Map which is than passed as an argument to
+ * {@code JobChangesHandler.onNewJobs()} handler. Next, removed Jobs are detected and
+ * added to an intermediate Map which is than passed as an argument to
+ * {@code JobChangesHandler.onRemovedJobs()} handler. Next, on per Job basis, new
+ * Processes are detected and added to an intermediate Map which is than passed as
+ * an argument to {@code JobProcessChangesHandler.onNewJobProcesses()} handler. Following,
+ * that, on per Job basis, removed Processes are detected and added to an intermediate
+ * Map which is than passed as an argument to
+ * {@code JobProcessChangesHandler.onRemovedJobProcesses()} handler. Finally, for
+ * each Process that exists in both Process Maps, changes are detected and those
+ * Processes are passed as arguments to {@code JobProcessChangesHandler.onProcessChanges()}
+ * handler.
+ *
+ */
+public interface JobManagerStateReconciler {
+ /**
+ * Main method the reconciles Job Manager's state.
+ *
+ * @param left - most current state Map sent by Job Manager
+ * @param right - current Job Manager state Map maintained by a component
+ */
+ public void reconcile(Map<DuccId, IDuccWork> left, Map<DuccId, IDuccWork> right);
+
+ /**
+ * Injects callback listener to receive Job changes
+ *
+ * @param callback - callback to notify of job changes
+ */
+ public void setWorkChangesHandler(WorkChangesHandler callback);
+ /**
+ * Injects callback listener to receive Job process changes
+ *
+ * @param callback - callback to notify of process changes
+ */
+ public void setWorkProcessChanges(WorkProcessChangesHandler callback);
+
+ /**
+ * Callback listener to receive notifications when job changes are detected
+ * during reconciliation. An application *must* inject instance of this
+ * callback listener *before* calling reconcile() method.
+ *
+ */
+ public interface WorkChangesHandler {
+ /**
+ * Called when new Jobs are detected during reconciliation. This method
+ * is called once, when the reconciliation finishes.
+ *
+ * @param newJobsMap - map containing new jobs
+ */
+ public void onNewWork(Map<DuccId, IDuccWork> newWorkMap);
+ /**
+ * Called when removed Jobs are detected during reconciliation. This method
+ * is called once, when the reconciliation finishes.
+ *
+ * @param removedJobsMap - map containing removed jobs
+ */
+ public void onRemovedWork(Map<DuccId, IDuccWork> removedWorkMap);
+
+ /**
+ * Called when a Job in both Job Maps has a different internal state. That
+ * can be due to status change, etc. This method is called once per each
+ * Job that has a different state in both Maps.
+ *
+ * @param left - Job with a new internal state
+ * @param right - local Job which must be sync'ed with left
+ */
+ public void onWorkChanges(IDuccWork left, IDuccWork right);
+ }
+ /**
+ * Callback listener to receive notifications when job's process changes are
+ * detected during reconciliation. An application *must* inject instance of this
+ * callback listener *before* calling reconcile() method.
+ *
+ */
+ public interface WorkProcessChangesHandler {
+ /**
+ * Called when new processes are added to existing Jobs. This method
+ * can be called multiple times. It is called for each Job whose process(es)
+ * were added.
+ *
+ * @param newJobProcessMap - Map containing new processes
+ * @param newJobProcessMapToUpdate - local Process Map to update
+ */
+ public void onNewWorkProcesses(IDuccWork work, ICommandLine commandLine, Map<DuccId, IDuccProcess> newWorkProcessMap, Map<DuccId, IDuccProcess> newWorkProcessMapToUpdate);
+ /**
+ * Called when removed processes are detected. This method can be called
+ * multiple times. It is called for each Job whose process(es) where removed.
+ *
+ * @param removedJobProcessMap - Map containing removed processes
+ * @param newJobProcessMapToUpdate - local Process Map to update
+ */
+ public void onRemovedWorkProcesses(DuccId jobId, Map<DuccId, IDuccProcess> removedWorkProcessMap, Map<DuccId, IDuccProcess> newWorkProcessMapToUpdate);
+ /**
+ * Called when a Process in both Process Maps has a different internal state. That
+ * can be due to assigned PID, status, etc. This method is called once per each
+ * process that has a different state in both Maps.
+ *
+ * @param left - Process with a new internal state
+ * @param right - local Process which must be sync'ed with left
+ */
+ public void onProcessChanges(IDuccWork job, IDuccProcess left, IDuccProcess right);
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/JobManagerStateReconciler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/Rationale.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/Rationale.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/Rationale.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/Rationale.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.common;
+
+public class Rationale implements IRationale {
+
+ private static final long serialVersionUID = 1L;
+
+ private String text = null;
+
+ public Rationale() {
+ setText("unspecified");
+ }
+
+ public Rationale(String text) {
+ setText(text);
+ }
+
+ @Override
+ public String getText() {
+ return text;
+ }
+
+ private void setText(String text) {
+ this.text = text;
+ }
+
+ @Override
+ public String toString() {
+ return getText();
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/Rationale.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/TimeWindow.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/TimeWindow.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/TimeWindow.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/TimeWindow.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.common;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class TimeWindow implements ITimeWindow {
+
+ /**
+ * please increment this sUID when removing or modifying a field
+ */
+ private static final long serialVersionUID = 1L;
+ private String timeStart = null;
+ private String timeEnd = null;
+
+ public TimeWindow() {
+ }
+
+ @Override
+ public String getStart() {
+ return timeStart;
+ }
+
+ @Override
+ public void setStart(String time) {
+ this.timeStart = time;
+ }
+
+ @Override
+ public long getStartLong() {
+ long retVal = -1;
+ try {
+ retVal = Long.parseLong(getStart());
+ }
+ catch(Exception e) {
+ }
+ return retVal;
+ }
+
+ @Override
+ public void setStartLong(long time) {
+ setStart(String.valueOf(time));
+ }
+
+ @Override
+ public String getEnd() {
+ return timeEnd;
+ }
+
+ @Override
+ public void setEnd(String time) {
+ this.timeEnd = time;
+ }
+
+ @Override
+ public long getEndLong() {
+ long retVal = -1;
+ try {
+ retVal = Long.parseLong(getEnd());
+ }
+ catch(Exception e) {
+ }
+ return retVal;
+ }
+
+ @Override
+ public void setEndLong(long time) {
+ setEnd(String.valueOf(time));
+ }
+
+ @Override
+ public String getDiff() {
+ return ""+getElapsedMillis();
+ }
+
+ @Override
+ public String getElapsed() {
+ String elapsed = "";
+ long elapsedTime = Long.valueOf(getDiff());
+ SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ elapsed = dateFormat.format(new Date(elapsedTime));
+ return elapsed;
+ }
+
+ @Override
+ public String getElapsed(IDuccWorkJob job) {
+ String retVal = null;
+ if(isEstimated()) {
+ //if(job.isCompleted()) {
+ long current = System.currentTimeMillis();
+ long elapsed = getElapsedMillis();
+ IDuccStandardInfo stdInfo = job.getStandardInfo();
+ long t1 = stdInfo.getDateOfCompletionMillis();
+ if(t1 == 0) {
+ t1 = current;
+ }
+ long t0 = stdInfo.getDateOfSubmissionMillis();
+ if(t0 == 0) {
+ t0 = current;
+ }
+ long tmax = t1-t0;
+ if(elapsed > tmax) {
+ elapsed = t1 - getStartLong();
+ }
+ SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ retVal = dateFormat.format(new Date(elapsed));
+ //}
+ }
+ else {
+ retVal = getElapsed();
+ }
+ return retVal;
+ }
+
+ @Override
+ public long getElapsedMillis() {
+ String t0 = getStart();
+ String t1 = getEnd();
+ String t = ""+System.currentTimeMillis();
+ if(t0 == null) {
+ t0 = t;
+ }
+ if(t1 == null) {
+ t1 = t;
+ }
+ Long l1 = Long.valueOf(t1);
+ Long l0 = Long.valueOf(t0);
+ Long diff = l1-l0;
+ return diff.longValue();
+ }
+
+ @Override
+ public boolean isEstimated() {
+ boolean retVal = false;
+ if(getStart() == null) {
+ retVal = true;
+ }
+ else if(getEnd() == null) {
+ retVal = true;
+ }
+ return retVal;
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/TimeWindow.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/Util.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/Util.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/Util.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/Util.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.common;
+
+public class Util {
+
+ public static boolean compare(Object obj1,Object obj2) {
+ boolean retVal = false;
+ if (obj1 == null) {
+ if (obj2 == null) {
+ retVal = true;
+ }
+ }
+ else {
+ if(obj1.equals(obj2)) {
+ retVal = true;
+ }
+ }
+ return retVal;
+ }
+
+ public static boolean compareNotNull(Object obj1,Object obj2) {
+ boolean retVal = false;
+ if (obj1 != null) {
+ if (obj2 != null) {
+ if(obj1.equals(obj2)) {
+ retVal = true;
+ }
+ }
+ }
+ return retVal;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/Util.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/HistoryPersistenceManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/HistoryPersistenceManager.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/HistoryPersistenceManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/HistoryPersistenceManager.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.common.history;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.ListIterator;
+
+import org.apache.uima.ducc.common.IDuccEnv;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.IOHelper;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
+
+
+public class HistoryPersistenceManager implements IHistoryPersistenceManager {
+
+ private static HistoryPersistenceManager instance = new HistoryPersistenceManager();
+
+ public static HistoryPersistenceManager getInstance() {
+ return instance;
+ }
+
+ private static final DuccLogger logger = DuccLoggerComponents.getTrLogger(HistoryPersistenceManager.class.getName());
+
+ private String historyDirectory_jobs = IDuccEnv.DUCC_HISTORY_JOBS_DIR;
+ private String historyDirectory_reservations = IDuccEnv.DUCC_HISTORY_RESERVATIONS_DIR;
+ private String historyDirectory_services = IDuccEnv.DUCC_HISTORY_SERVICES_DIR;
+
+ private String dwj = "dwj";
+ private String dwr = "dwr";
+ private String dws = "dws";
+
+ private enum Verbosity {
+ QUIET,
+ SPEAK,
+ }
+
+ public HistoryPersistenceManager() {
+ mkdirs();
+ }
+
+ private void mkdirs() {
+ IOHelper.mkdirs(historyDirectory_jobs);
+ IOHelper.mkdirs(historyDirectory_reservations);
+ IOHelper.mkdirs(historyDirectory_services);
+ }
+
+ private String normalize(String id) {
+ String retVal = id;
+ return retVal;
+ }
+
+ @Override
+ public void jobSaveConditional(IDuccWorkJob duccWorkJob) throws IOException {
+ String id = normalize(""+duccWorkJob.getDuccId().getFriendly());
+ String fileName = historyDirectory_jobs+File.separator+id+"."+dwj;
+ File file = new File(fileName);
+ if(!file.exists()) {
+ jobSave(duccWorkJob);
+ }
+ }
+
+ @Override
+ public void jobSave(IDuccWorkJob duccWorkJob) throws IOException {
+ String id = normalize(""+duccWorkJob.getDuccId().getFriendly());
+ String fileName = historyDirectory_jobs+File.separator+id+"."+dwj;
+ FileOutputStream fos = null;
+ ObjectOutputStream out = null;
+ fos = new FileOutputStream(fileName);
+ out = new ObjectOutputStream(fos);
+ out.writeObject(duccWorkJob);
+ out.close();
+ }
+ @Override
+ public IDuccWorkJob jobRestore(String fileName) {
+ return jobRestore(fileName, Verbosity.SPEAK);
+ }
+
+ private IDuccWorkJob jobRestore(String fileName, Verbosity level) {
+ String methodName = "jobRestore";
+ IDuccWorkJob job = null;
+ try {
+ logger.trace(methodName, null, "restore:"+fileName);
+ FileInputStream fis = null;
+ ObjectInputStream in = null;
+ fis = new FileInputStream(historyDirectory_jobs+File.separator+fileName);
+ in = new ObjectInputStream(fis);
+ job = (IDuccWorkJob) in.readObject();
+ in.close();
+ }
+ catch(Exception e) {
+ switch(level) {
+ case QUIET:
+ break;
+ case SPEAK:
+ logger.warn(methodName, null, "unable to restore:"+fileName, e);
+ break;
+ }
+ }
+ return job;
+ }
+
+ @Override
+ public IDuccWorkJob jobRestore(DuccId duccId) {
+ String fileName = duccId.getFriendly()+"."+dwj;
+ return jobRestore(fileName, Verbosity.QUIET);
+ }
+
+ @Override
+ public ArrayList<String> jobList() {
+ ArrayList<String> retVal = new ArrayList<String>();
+ File folder = new File(historyDirectory_jobs);
+ File[] listOfFiles = folder.listFiles();
+ if(listOfFiles != null) {
+ for (int i = 0; i < listOfFiles.length; i++) {
+ if (listOfFiles[i].isFile()) {
+ String name = listOfFiles[i].getName();
+ if(name.endsWith("."+dwj)) {
+ retVal.add(name);
+ }
+ }
+ }
+ }
+ return retVal;
+ }
+
+ @Override
+ public ArrayList<IDuccWorkJob> jobRestore() throws IOException, ClassNotFoundException {
+ ArrayList<IDuccWorkJob> retVal = new ArrayList<IDuccWorkJob>();
+ ArrayList<String> jobFileNames = jobList();
+ ListIterator<String> listIterator = jobFileNames.listIterator();
+ while(listIterator.hasNext()) {
+ String fileName = listIterator.next();
+ IDuccWorkJob job = jobRestore(fileName);
+ if(job != null) {
+ retVal.add(job);
+ }
+ }
+ return retVal;
+ }
+
+ @Override
+ public void reservationSaveConditional(IDuccWorkReservation duccWorkReservation) throws IOException {
+ String id = normalize(""+duccWorkReservation.getDuccId().getFriendly());
+ String fileName = historyDirectory_jobs+File.separator+id+"."+dwr;
+ File file = new File(fileName);
+ if(!file.exists()) {
+ reservationSave(duccWorkReservation);
+ }
+ }
+
+ @Override
+ public void reservationSave(IDuccWorkReservation duccWorkReservation) throws IOException {
+ String id = normalize(""+duccWorkReservation.getDuccId().getFriendly());
+ String fileName = historyDirectory_reservations+File.separator+id+"."+dwr;
+ FileOutputStream fos = null;
+ ObjectOutputStream out = null;
+ fos = new FileOutputStream(fileName);
+ out = new ObjectOutputStream(fos);
+ out.writeObject(duccWorkReservation);
+ out.close();
+ }
+
+ @Override
+ public IDuccWorkReservation reservationRestore(String fileName) {
+ return reservationRestore(fileName, Verbosity.SPEAK);
+ }
+
+ private IDuccWorkReservation reservationRestore(String fileName, Verbosity level) {
+ String methodName = "reservationRestore";
+ IDuccWorkReservation reservation = null;
+ try {
+ logger.trace(methodName, null, "restore:"+fileName);
+ FileInputStream fis = null;
+ ObjectInputStream in = null;
+ fis = new FileInputStream(historyDirectory_reservations+File.separator+fileName);
+ in = new ObjectInputStream(fis);
+ reservation = (IDuccWorkReservation) in.readObject();
+ in.close();
+ }
+ catch(Exception e) {
+ switch(level) {
+ case QUIET:
+ break;
+ case SPEAK:
+ logger.warn(methodName, null, "unable to restore:"+fileName);
+ break;
+ }
+ }
+ return reservation;
+ }
+
+ @Override
+ public ArrayList<String> reservationList() {
+ ArrayList<String> retVal = new ArrayList<String>();
+ File folder = new File(historyDirectory_reservations);
+ File[] listOfFiles = folder.listFiles();
+ if(listOfFiles != null) {
+ for (int i = 0; i < listOfFiles.length; i++) {
+ if (listOfFiles[i].isFile()) {
+ String name = listOfFiles[i].getName();
+ if(name.endsWith("."+dwr)) {
+ retVal.add(name);
+ }
+ }
+ }
+ }
+ return retVal;
+ }
+
+ @Override
+ public ArrayList<IDuccWorkReservation> reservationRestore() throws IOException, ClassNotFoundException {
+ ArrayList<IDuccWorkReservation> retVal = new ArrayList<IDuccWorkReservation>();
+ ArrayList<String> reservationFileNames = reservationList();
+ ListIterator<String> listIterator = reservationFileNames.listIterator();
+ while(listIterator.hasNext()) {
+ String fileName = listIterator.next();
+ IDuccWorkReservation reservation = reservationRestore(fileName);
+ if(reservation != null) {
+ retVal.add(reservation);
+ }
+ }
+ return retVal;
+ }
+
+ @Override
+ public IDuccWorkReservation reservationRestore(DuccId duccId) {
+ String fileName = duccId.getFriendly()+"."+dwr;
+ return reservationRestore(fileName, Verbosity.QUIET);
+ }
+
+ @Override
+ public void serviceSaveConditional(IDuccWorkService duccWorkService)
+ throws IOException {
+ String id = normalize(""+duccWorkService.getDuccId().getFriendly());
+ String fileName = historyDirectory_services+File.separator+id+"."+dws;
+ File file = new File(fileName);
+ if(!file.exists()) {
+ serviceSave(duccWorkService);
+ }
+ }
+
+ @Override
+ public void serviceSave(IDuccWorkService duccWorkService)
+ throws IOException {
+ String id = normalize(""+duccWorkService.getDuccId().getFriendly());
+ String fileName = historyDirectory_services+File.separator+id+"."+dws;
+ FileOutputStream fos = null;
+ ObjectOutputStream out = null;
+ fos = new FileOutputStream(fileName);
+ out = new ObjectOutputStream(fos);
+ out.writeObject(duccWorkService);
+ out.close();
+ }
+
+ @Override
+ public IDuccWorkService serviceRestore(String fileName) {
+ return serviceRestore(fileName, Verbosity.SPEAK);
+ }
+
+ private IDuccWorkService serviceRestore(String fileName, Verbosity level) {
+ String methodName = "serviceRestore";
+ IDuccWorkService service = null;
+ try {
+ logger.trace(methodName, null, "restore:"+fileName);
+ FileInputStream fis = null;
+ ObjectInputStream in = null;
+ fis = new FileInputStream(historyDirectory_services+File.separator+fileName);
+ in = new ObjectInputStream(fis);
+ service = (IDuccWorkService) in.readObject();
+ in.close();
+ }
+ catch(Exception e) {
+ switch(level) {
+ case QUIET:
+ break;
+ case SPEAK:
+ logger.warn(methodName, null, "unable to restore:"+fileName);
+ break;
+ }
+ }
+ return service;
+ }
+
+ @Override
+ public ArrayList<String> serviceList() {
+ ArrayList<String> retVal = new ArrayList<String>();
+ File folder = new File(historyDirectory_services);
+ File[] listOfFiles = folder.listFiles();
+ if(listOfFiles != null) {
+ for (int i = 0; i < listOfFiles.length; i++) {
+ if (listOfFiles[i].isFile()) {
+ String name = listOfFiles[i].getName();
+ if(name.endsWith("."+dws)) {
+ retVal.add(name);
+ }
+ }
+ }
+ }
+ return retVal;
+ }
+
+ @Override
+ public ArrayList<IDuccWorkService> serviceRestore() throws IOException,
+ ClassNotFoundException {
+ ArrayList<IDuccWorkService> retVal = new ArrayList<IDuccWorkService>();
+ ArrayList<String> serviceFileNames = serviceList();
+ ListIterator<String> listIterator = serviceFileNames.listIterator();
+ while(listIterator.hasNext()) {
+ String fileName = listIterator.next();
+ IDuccWorkService service = serviceRestore(fileName);
+ if(service != null) {
+ retVal.add(service);
+ }
+ }
+ return retVal;
+ }
+
+ @Override
+ public IDuccWorkService serviceRestore(DuccId duccId) {
+ String fileName = duccId.getFriendly()+"."+dws;
+ return serviceRestore(fileName, Verbosity.QUIET);
+ }
+
+ ///// <test>
+
+ private static int doJobs(HistoryPersistenceManager hpm) throws IOException, ClassNotFoundException {
+ ArrayList<IDuccWorkJob> duccWorkJobs = hpm.jobRestore();
+ ListIterator<IDuccWorkJob> listIterator = duccWorkJobs.listIterator();
+ int acc = 0;
+ while(listIterator.hasNext()) {
+ IDuccWorkJob duccWorkJob = listIterator.next();
+ System.out.println(duccWorkJob.getId());
+ acc++;
+ }
+ return acc;
+ }
+
+ private static int doReservations(HistoryPersistenceManager hpm) throws IOException, ClassNotFoundException {
+ ArrayList<IDuccWorkReservation> duccWorkReservations = hpm.reservationRestore();
+ ListIterator<IDuccWorkReservation> listIterator = duccWorkReservations.listIterator();
+ int acc = 0;
+ while(listIterator.hasNext()) {
+ IDuccWorkReservation duccWorkReservation = listIterator.next();
+ System.out.println(duccWorkReservation.getId());
+ acc++;
+ }
+ return acc;
+ }
+
+ public static void main(String[] args) throws IOException, ClassNotFoundException {
+ String ducc_home = System.getenv("DUCC_HOME");
+ if(ducc_home == null) {
+ System.out.println("DUCC_HOME not set in environment");
+ return;
+ }
+ if(ducc_home.trim() == "") {
+ System.out.println("DUCC_HOME not set in environment");
+ return;
+ }
+ HistoryPersistenceManager hpm = new HistoryPersistenceManager();
+ int jobs = doJobs(hpm);
+ System.out.println("jobs: "+jobs);
+ int reservations = doReservations(hpm);
+ System.out.println("reservations: "+reservations);
+ }
+
+ ///// </test>
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/HistoryPersistenceManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.common.history;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkJob;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation;
+import org.apache.uima.ducc.transport.event.common.IDuccWorkService;
+
+
+public interface IHistoryPersistenceManager {
+
+ public void jobSaveConditional(IDuccWorkJob duccWorkJob) throws IOException;
+ public void jobSave(IDuccWorkJob duccWorkJob) throws IOException;
+ public IDuccWorkJob jobRestore(String fileName);
+ public IDuccWorkJob jobRestore(DuccId duccId);
+ public ArrayList<String> jobList();
+ public ArrayList<IDuccWorkJob> jobRestore() throws IOException, ClassNotFoundException;
+
+ public void reservationSaveConditional(IDuccWorkReservation duccWorkReservation) throws IOException;
+ public void reservationSave(IDuccWorkReservation duccWorkReservation) throws IOException;
+ public IDuccWorkReservation reservationRestore(String fileName);
+ public IDuccWorkReservation reservationRestore(DuccId duccId);
+ public ArrayList<String> reservationList();
+ public ArrayList<IDuccWorkReservation> reservationRestore() throws IOException, ClassNotFoundException;
+
+ public void serviceSaveConditional(IDuccWorkService duccWorkService) throws IOException;
+ public void serviceSave(IDuccWorkService duccWorkService) throws IOException;
+ public IDuccWorkService serviceRestore(String fileName);
+ public IDuccWorkService serviceRestore(DuccId duccId);
+ public ArrayList<String> serviceList();
+ public ArrayList<IDuccWorkService> serviceRestore() throws IOException, ClassNotFoundException;
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/delegate/DuccEventDelegateListener.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/delegate/DuccEventDelegateListener.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/delegate/DuccEventDelegateListener.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/delegate/DuccEventDelegateListener.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.delegate;
+
+import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
+
+public interface DuccEventDelegateListener {
+ public void setDuccEventDispatcher( DuccEventDispatcher eventDispatcher );
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/delegate/DuccEventDelegateListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.jd;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+import org.apache.uima.ducc.common.utils.DuccLoggerComponents;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.IDuccPerWorkItemStatistics;
+import org.apache.uima.ducc.transport.event.common.IDuccUimaDeploymentDescriptor;
+import org.apache.uima.ducc.transport.event.common.IRationale;
+import org.apache.uima.ducc.transport.event.common.Rationale;
+import org.apache.uima.ducc.transport.event.common.Util;
+import org.apache.uima.ducc.transport.event.common.IDuccCompletionType.JobCompletionType;
+import org.apache.uima.ducc.transport.event.jd.IDriverState.DriverState;
+
+
+@SuppressWarnings("serial")
+public class DriverStatusReport implements Serializable {
+
+ private static DuccLogger duccOut = DuccLoggerComponents.getJdOut(DriverStatusReport.class.getName());
+ private static DuccId jobid = null;
+
+ private DuccId duccId = null;
+ private String jdJmxUrl = null;
+
+ private volatile DriverState driverState = DriverState.NotRunning;
+ private JobCompletionType jobCompletionType = JobCompletionType.Undefined;
+ private IRationale jobCompletionRationale = null;
+
+ private long now = 0;
+ private long clientInitStart = 0;
+ private long clientInitEnd = 0;
+
+ private AtomicBoolean terminateDriver = new AtomicBoolean(false);
+
+ private AtomicBoolean atLeastOneService = new AtomicBoolean(false);
+
+ private AtomicBoolean workItemsPending = new AtomicBoolean(true);
+
+ private AtomicLong workItemsTotal = new AtomicLong(-1);
+
+ private AtomicInteger workItemsFetched = new AtomicInteger(0);
+
+ private AtomicInteger workItemsProcessingStarted = new AtomicInteger(0);
+ private AtomicInteger workItemsProcessingCompleted = new AtomicInteger(0);
+ private AtomicInteger workItemsProcessingError = new AtomicInteger(0);
+
+ //private AtomicInteger workItemsQueued = new AtomicInteger(0);
+ //private AtomicInteger workItemsDequeued = new AtomicInteger(0);
+
+ private AtomicInteger workItemsRetry = new AtomicInteger(0);
+ private AtomicInteger workItemsPreempted = new AtomicInteger(0);
+
+ private AtomicInteger threadCount = new AtomicInteger(0);
+
+ private ConcurrentHashMap<String,DuccId> casQueuedMap = new ConcurrentHashMap<String,DuccId>();
+ private ConcurrentHashMap<String,HashMap<String,String>> casOperatingMap = new ConcurrentHashMap<String,HashMap<String,String>>();
+
+ private ConcurrentHashMap<Integer,DuccId> limboMap = new ConcurrentHashMap<Integer,DuccId>();
+
+ private ConcurrentHashMap<DuccId,String> killProcessMap = new ConcurrentHashMap<DuccId,String>();
+ private AtomicBoolean killJob = new AtomicBoolean(false);
+
+ private AtomicLong mostRecentWorkItemStart = new AtomicLong(0);
+
+ private IDuccPerWorkItemStatistics perWorkItemStatistics = null;
+ @Deprecated
+ private PerformanceMetricsSummaryMap performanceMetricsSummaryMap = null;
+
+ private ConcurrentHashMap<String,String> pendingProcessAssignmentMap = new ConcurrentHashMap<String,String>();
+
+ private IDuccUimaDeploymentDescriptor uimaDeploymentDescriptor = null;
+
+ private DuccProcessWorkItemsMap duccProcessWorkItemsMap = new DuccProcessWorkItemsMap();
+
+ public DriverStatusReport(DuccId duccId, String jdJmxUrl) {
+ setJdJmxUrl(jdJmxUrl);
+ setDuccId(duccId);
+ }
+
+ /*
+ * DuccId
+ */
+ public void setDuccId(DuccId duccId) {
+ this.duccId = duccId;
+ }
+
+ public DuccId getDuccId() {
+ return duccId;
+ }
+
+ /*
+ * Id
+ */
+ public String getId() {
+ return duccId.toString();
+ }
+
+ /*
+ * DuccProcessWorkItemsMap
+ */
+ public void setDuccProcessWorkItemsMap(DuccProcessWorkItemsMap duccProcessWorkItemsMap) {
+ this.duccProcessWorkItemsMap = duccProcessWorkItemsMap;
+ }
+
+ public DuccProcessWorkItemsMap getDuccProcessWorkItemsMap() {
+ return duccProcessWorkItemsMap;
+ }
+
+ /*
+ * JdJmxUrl
+ */
+ public void setJdJmxUrl(String jdJmxUrl) {
+ this.jdJmxUrl = jdJmxUrl;
+ }
+
+ public String getJdJmxUrl() {
+ return jdJmxUrl;
+ }
+
+ /*
+ * hashCode
+ */
+ public int getHashCode() {
+ return duccId.hashCode();
+ }
+
+ private void setDriverState(DriverState driverState) {
+ String methodName = "setDriverState";
+ synchronized(this) {
+ String prev = getDriverState().toString();
+ switch(this.driverState) {
+ case Completed:
+ break;
+ default:
+ this.driverState = driverState;
+ }
+ String curr = getDriverState().toString();
+ duccOut.debug(methodName, duccId, "current:"+curr+" "+"previous:"+prev);
+ }
+ }
+
+ public DriverState getDriverState() {
+ String methodName = "getDriverState";
+ synchronized(this) {
+ String curr = driverState.toString();
+ duccOut.debug(methodName, duccId, "current:"+curr);
+ return driverState;
+ }
+ }
+
+ public boolean isTerminateDriver() {
+ return terminateDriver.get();
+ }
+
+ public void setTerminateDriver() {
+ terminateDriver.set(true);
+ calculateState();
+ }
+
+
+ private void setJobCompletion(JobCompletionType jobCompletionType, IRationale rationale) {
+ this.jobCompletionType = jobCompletionType;
+ this.jobCompletionRationale = rationale;
+ }
+
+ /*
+ private void setJobCompletionType(JobCompletionType jobCompletionType) {
+ this.jobCompletionType = jobCompletionType;
+ }
+ */
+
+ public JobCompletionType getJobCompletionType() {
+ return jobCompletionType;
+ }
+
+ /*
+ private void setJobCompletionRationale(IRationale rationale) {
+ this.jobCompletionRationale = rationale;
+ }
+ */
+
+ public IRationale getJobCompletionRationale() {
+ return jobCompletionRationale;
+ }
+
+ public void setClientInitStart(long time) {
+ clientInitStart = time;
+ }
+
+ public long getClientInitStart() {
+ return clientInitStart;
+ }
+
+ public void setClientInitEnd(long time) {
+ clientInitEnd = time;
+ }
+
+ public long getClientInitEnd() {
+ return clientInitEnd;
+ }
+
+ public void setNow() {
+ now = System.currentTimeMillis();
+ }
+
+ public long getNow() {
+ return now;
+ }
+
+ public void setInitializing() {
+ setClientInitStart(System.currentTimeMillis());
+ setClientInitEnd(0);
+ setDriverState(DriverState.Initializing);
+ logReport();
+ }
+
+ public void setInitializingCompleted() {
+ setClientInitEnd(System.currentTimeMillis());
+ setDriverState(DriverState.Idle);
+ logReport();
+ }
+
+ public void setInitializingFailed(IRationale rationale) {
+ setClientInitEnd(System.currentTimeMillis());
+ setDriverState(DriverState.Completed);
+ setJobCompletion(JobCompletionType.DriverInitializationFailure, rationale);
+ logReport();
+ }
+
+ public void setExcessiveInitializationFailures(IRationale rationale) {
+ setClientInitEnd(System.currentTimeMillis());
+ setDriverState(DriverState.Completed);
+ setJobCompletion(JobCompletionType.ProcessInitializationFailure, rationale);
+ logReport();
+ }
+
+ public boolean getAtLeastOneService() {
+ String methodName = "getAtLeastOneService";
+ boolean retVal = atLeastOneService.get();
+ duccOut.debug(methodName, jobid, retVal);
+ return retVal;
+ }
+
+ public void setAtLeastOneService() {
+ String methodName = "setAtLeastOneService";
+ if(!atLeastOneService.get()) {
+ atLeastOneService.set(true);
+ duccOut.debug(methodName, jobid, atLeastOneService.get());
+ calculateState();
+ logReport();
+ }
+ }
+
+ public void setWorkItemsPending() {
+ String methodName = "setWorkItemsPending";
+ if(!workItemsPending.getAndSet(true)) {
+ duccOut.debug(methodName, jobid, true);
+ calculateState();
+ logReport();
+ }
+ }
+
+ public void resetWorkItemsPending() {
+ String methodName = "resetWorkItemsPending";
+ if(workItemsPending.getAndSet(false)) {
+ duccOut.debug(methodName, jobid, false);
+ calculateState();
+ logReport();
+ }
+ }
+
+ public boolean isPending() {
+ String methodName = "isPending";
+ boolean retVal = workItemsPending.get();
+ duccOut.debug(methodName, jobid, retVal);
+ return retVal;
+ }
+
+ public void setWorkItemsTotal(long total) {
+ workItemsTotal.set(total);
+ logReport();
+ }
+
+ public long getWorkItemsTotal() {
+ return workItemsTotal.get();
+ }
+
+ public void setWorkItemsFetched(int update) {
+ int expect = workItemsFetched.get();
+ while(expect < update) {
+ workItemsFetched.compareAndSet(expect, update);
+ expect = workItemsFetched.get();
+ }
+ logReport();
+ }
+
+ public int getWorkItemsFetched() {
+ return workItemsFetched.get();
+ }
+
+ public void setMostRecentStart(long time) {
+ mostRecentWorkItemStart.set(time);
+ }
+
+ public long getMostRecentStart() {
+ return mostRecentWorkItemStart.get();
+ }
+
+ public void countWorkItemsProcessingStarted() {
+ workItemsProcessingStarted.incrementAndGet();
+ calculateState();
+ logReport();
+ }
+
+ public int getWorkItemsProcessingStarted() {
+ return workItemsProcessingStarted.get();
+ }
+
+ public void countWorkItemsProcessingCompleted() {
+ workItemsProcessingCompleted.incrementAndGet();
+ calculateState();
+ logReport();
+ }
+
+ public int getWorkItemsProcessingCompleted() {
+ return workItemsProcessingCompleted.get();
+ }
+
+ public int getWorkItemsOperating() {
+ return casOperatingMap.size();
+ }
+
+ public void countWorkItemsProcessingError() {
+ workItemsProcessingError.incrementAndGet();
+ calculateState();
+ logReport();
+ }
+
+ public int getWorkItemsProcessingError() {
+ return workItemsProcessingError.get();
+ }
+
+ public void countWorkItemsRetry() {
+ workItemsRetry.incrementAndGet();
+ calculateState();
+ logReport();
+ }
+
+ public int getWorkItemsRetry() {
+ return workItemsRetry.get();
+ }
+
+ public void countWorkItemsPreempted() {
+ workItemsPreempted.incrementAndGet();
+ calculateState();
+ logReport();
+ }
+
+ public int getWorkItemsPreempted() {
+ return workItemsPreempted.get();
+ }
+
+ public int getThreadCount() {
+ return threadCount.get();
+ }
+
+ public void setThreadCount(int threadCount) {
+ this.threadCount.set(threadCount);
+ }
+
+ public void killProcess(DuccId processId, String casId) {
+ killProcessMap.put(processId, casId);
+ }
+
+ public boolean isKillProcess(DuccId processId) {
+ boolean retVal = false;
+ if(killProcessMap.containsKey(processId)) {
+ retVal = true;
+ }
+ return retVal;
+ }
+
+ public Iterator<DuccId> getKillDuccIds() {
+ return killProcessMap.keySet().iterator();
+ }
+
+ public void killJob(JobCompletionType jobCompletionType, IRationale jobCompletionRationale) {
+ killJob.getAndSet(true);
+ setJobCompletion(jobCompletionType, jobCompletionRationale);
+ }
+
+ public boolean isKillJob() {
+ return killJob.get();
+ }
+
+ public void limboAdd(int seqNo, DuccId pDuccId) {
+ limboMap.put(new Integer(seqNo), pDuccId);
+ }
+
+ public void limboRemove(int seqNo, DuccId pDuccId) {
+ limboMap.remove(new Integer(seqNo));
+ }
+
+ public ConcurrentHashMap<Integer,DuccId> getLimboMap() {
+ return limboMap;
+ }
+
+ public ConcurrentHashMap<String,DuccId> getCasQueuedMap() {
+ return casQueuedMap;
+ }
+
+ public int getWorkItemsQueued() {
+ return casQueuedMap.size();
+ }
+
+ public void workItemQueued(String casId, DuccId jobId) {
+ casQueuedMap.put(casId, jobId);
+ logReport();
+ }
+
+ public void workItemDequeued(String casId) {
+ casQueuedMap.remove(casId);
+ logReport();
+ }
+
+ public int getWorkItemsDispatched() {
+ return getWorkItemsQueued()+getWorkItemsOperating();
+ }
+
+ public void workItemPendingProcessAssignmentAdd(String casId) {
+ pendingProcessAssignmentMap.put(casId, casId);
+ }
+
+ public void workItemPendingProcessAssignmentRemove(String casId) {
+ pendingProcessAssignmentMap.remove(casId);
+ }
+
+ public int getWorkItemPendingProcessAssignmentCount() {
+ int retVal = 0;
+ retVal = pendingProcessAssignmentMap.size();
+ return retVal;
+ }
+
+ public boolean isWorkItemPendingProcessAssignment() {
+ boolean retVal = false;
+ if(getWorkItemPendingProcessAssignmentCount() > 0) {
+ retVal = true;
+ }
+ return retVal;
+ }
+
+ private static final String keyNodeIP = "nodeIP";
+ private static final String keyPID = "PID";
+
+ public void workItemOperatingStart(String casId, String nodeIP, String PID) {
+ HashMap<String,String> operatingCAS = new HashMap<String,String>();
+ operatingCAS.put(keyNodeIP, nodeIP);
+ operatingCAS.put(keyPID, PID);
+ casOperatingMap.put(casId, operatingCAS);
+ workItemDequeued(casId);
+ logReport();
+ }
+
+ public void workItemOperatingEnd(String casId) {
+ casOperatingMap.remove(casId);
+ logReport();
+ }
+
+ public boolean isOperating(String nodeIP, String PID) {
+ boolean retVal = false;
+ synchronized(this) {
+ Iterator<String> iterator = casOperatingMap.keySet().iterator();
+ while(iterator.hasNext()) {
+ String casId = iterator.next();
+ HashMap<String,String> casOperating = casOperatingMap.get(casId);
+ if(Util.compare(nodeIP,casOperating.get(keyNodeIP))) {
+ if(Util.compare(PID,casOperating.get(keyPID))) {
+ retVal = true;
+ }
+ }
+ }
+ }
+ return retVal;
+ }
+
+ public boolean isStarted() {
+ int started = getWorkItemsProcessingStarted();
+ return started > 0;
+ }
+
+ public boolean isProcessing() {
+ String methodName = "isProcessing";
+ synchronized(this) {
+ int fetched = getWorkItemsFetched();
+ int completed = getWorkItemsProcessingCompleted();
+ int error = getWorkItemsProcessingError();
+ boolean retVal = fetched != (completed + error);
+ duccOut.debug(methodName, jobid, "fetched:"+fetched+" "+"completed:"+completed+" "+"error:"+error);
+ return retVal;
+ }
+ }
+
+ public boolean isComplete() {
+ boolean retVal = false;
+ switch(getDriverState()) {
+ case Completing:
+ case Completed:
+ retVal = true;
+ break;
+ }
+ return retVal;
+ }
+
+ private void calculateState() {
+ String methodName = "calculateState";
+ switch(getDriverState()) {
+ case Initializing:
+ if(getAtLeastOneService()) {
+ setDriverState(DriverState.Running);
+ }
+ break;
+ case Idle:
+ if(isProcessing()) {
+ setDriverState(DriverState.Running);
+ }
+ else if(isPending()) {
+ setDriverState(DriverState.Idle);
+ }
+ else {
+ setDriverState(DriverState.Completing);
+ }
+ break;
+ case Running:
+ if(!isProcessing()) {
+ if(isPending()) {
+ setDriverState(DriverState.Idle);
+ }
+ else {
+ setDriverState(DriverState.Completing);
+ }
+ }
+ break;
+ case Completing:
+ if(isTerminateDriver()) {
+ setDriverState(DriverState.Completed);
+ if(getWorkItemsProcessingError() == 0) {
+ setJobCompletion(JobCompletionType.EndOfJob, new Rationale("job driver status reported as normal completion"));
+ }
+ }
+ break;
+ case Completed:
+ break;
+ }
+ duccOut.debug(methodName, duccId, "state:"+getDriverState());
+ }
+
+ public String getLogReport() {
+ return "state:"+driverState+" "+"threads:"+getThreadCount()+" "
+ +"total:"+getWorkItemsTotal()+" "
+ +"fetched:"+getWorkItemsFetched()+" "
+ +"started:"+getWorkItemsProcessingStarted()+" "
+ +"completed:"+getWorkItemsProcessingCompleted()+" "
+ +"error:"+getWorkItemsProcessingError()+" "
+ +"queued:"+getWorkItemsQueued()+" "
+ +"in-progress:"+casOperatingMap.size()+" "
+ +"pending:"+isPending()+" "
+ +"unassigned:"+getWorkItemPendingProcessAssignmentCount()+" "
+ +"retry:"+getWorkItemsRetry();
+ }
+
+ public void logReport() {
+ String methodName = "logReport";
+ duccOut.debug(methodName, duccId, getLogReport());
+ }
+
+ public DriverStatusReport deepCopy() {
+ return (DriverStatusReport)SerializationUtils.clone(this);
+ }
+
+ public void setPerWorkItemStatistics(IDuccPerWorkItemStatistics perWorkItemStatistics) {
+ this.perWorkItemStatistics = perWorkItemStatistics;
+ }
+
+ public IDuccPerWorkItemStatistics getPerWorkItemStatistics() {
+ return perWorkItemStatistics;
+ }
+
+ @Deprecated
+ public PerformanceMetricsSummaryMap getPerformanceMetricsSummaryMap() {
+ return performanceMetricsSummaryMap;
+ }
+
+ public IDuccUimaDeploymentDescriptor getUimaDeploymentDescriptor() {
+ return uimaDeploymentDescriptor;
+ }
+
+ public void setUimaDeploymentDescriptor(IDuccUimaDeploymentDescriptor uimaDeploymentDescriptor) {
+ this.uimaDeploymentDescriptor = uimaDeploymentDescriptor;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DriverStatusReport.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DuccProcessWorkItemsMap.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DuccProcessWorkItemsMap.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DuccProcessWorkItemsMap.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DuccProcessWorkItemsMap.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.jd;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.common.utils.id.DuccId;
+import org.apache.uima.ducc.transport.event.common.DuccProcessWorkItems;
+import org.apache.uima.ducc.transport.event.common.IDuccProcessWorkItems;
+
+
+public class DuccProcessWorkItemsMap extends ConcurrentHashMap<DuccId, IDuccProcessWorkItems> {
+
+ private static final long serialVersionUID = 1L;
+
+ private IDuccProcessWorkItems totals = new DuccProcessWorkItems();
+
+ public IDuccProcessWorkItems getTotals() {
+ return totals;
+ }
+
+ private IDuccProcessWorkItems get(DuccId duccId) {
+ IDuccProcessWorkItems retVal = super.get(duccId);
+ if(retVal == null) {
+ retVal = new DuccProcessWorkItems();
+ super.put(duccId, retVal);
+ }
+ return retVal;
+ }
+
+ public void done(DuccId processId, long time) {
+ get(processId).done(time);
+ getTotals().done(time);
+ }
+
+ public void dispatch(DuccId processId) {
+ get(processId).dispatch();
+ getTotals().dispatch();
+ }
+
+ public void error(DuccId processId) {
+ get(processId).error();
+ getTotals().error();
+ }
+
+ public void retry(DuccId processId) {
+ get(processId).retry();
+ getTotals().retry();
+ }
+
+ public void preempt(DuccId processId) {
+ get(processId).preempt();
+ getTotals().preempt();
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/DuccProcessWorkItemsMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/IDriverState.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/IDriverState.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/IDriverState.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/IDriverState.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.jd;
+
+public interface IDriverState {
+
+ public enum DriverState {
+ NotRunning, // UIMA-AS client is not running
+ Initializing, // UIMA-AS client is initializing
+ Idle, // CAS processing inactive (and one or more not yet processed)
+ Running, // CAS processing active
+ Completing, // Driver processing is completing
+ Completed, // Driver processing is completed
+ Undefined // None of the above
+ };
+
+ public DriverState getJobState();
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/IDriverState.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceMetricsSummaryItem.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceMetricsSummaryItem.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceMetricsSummaryItem.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceMetricsSummaryItem.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.jd;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PerformanceMetricsSummaryItem implements Serializable {
+ /**
+ * please increment this sUID when removing or modifying a field
+ */
+ private static final long serialVersionUID = 1L;
+
+ private String name;
+ private String uniqueName;
+
+ private AtomicLong analysisTime = new AtomicLong(0);
+ private AtomicLong numProcessed = new AtomicLong(0);
+
+ private AtomicLong analysisTimeMin = new AtomicLong(-1);
+ private AtomicLong analysisTimeMax = new AtomicLong(-1);
+
+ public PerformanceMetricsSummaryItem(String name, String uniqueName) {
+ this.name = name;
+ this.uniqueName = uniqueName;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getUniqueName() {
+ return uniqueName;
+ }
+
+ public long getAnalysisTime() {
+ return analysisTime.get();
+ }
+
+ public long getAnalysisTimeMin() {
+ return analysisTimeMin.get();
+ }
+
+ public long getAnalysisTimeMax() {
+ return analysisTimeMax.get();
+ }
+
+ public long getNumProcessed() {
+ return numProcessed.get();
+ }
+
+ //
+
+ private void updateAnalysisTimeMin(long delta) {
+ long currentValue = analysisTimeMin.get();
+ if(currentValue < 0) {
+ analysisTimeMin.compareAndSet(currentValue, delta);
+ currentValue = analysisTimeMin.get();
+ }
+ while(currentValue > delta) {
+ analysisTimeMin.compareAndSet(currentValue, delta);
+ currentValue = analysisTimeMin.get();
+ }
+ }
+
+ private void updateAnalysisTimeMax(long delta) {
+ long currentValue = analysisTimeMax.get();
+ if(currentValue < 0) {
+ analysisTimeMax.compareAndSet(currentValue, delta);
+ currentValue = analysisTimeMax.get();
+ }
+ while(currentValue < delta) {
+ analysisTimeMax.compareAndSet(currentValue, delta);
+ currentValue = analysisTimeMax.get();
+ }
+ }
+
+ public long addAndGetAnalysisTime(long delta) {
+ updateAnalysisTimeMin(delta);
+ updateAnalysisTimeMax(delta);
+ return analysisTime.addAndGet(delta);
+ }
+
+ public long addAndGetNumProcessed(long delta) {
+ return numProcessed.addAndGet(delta);
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceMetricsSummaryItem.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceMetricsSummaryMap.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceMetricsSummaryMap.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceMetricsSummaryMap.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceMetricsSummaryMap.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.jd;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.uima.aae.monitor.statistics.AnalysisEnginePerformanceMetrics;
+import org.apache.uima.ducc.common.utils.DuccLogger;
+
+
+public class PerformanceMetricsSummaryMap implements Serializable {
+
+ /**
+ * please increment this sUID when removing or modifying a field
+ */
+ private static final long serialVersionUID = 1L;
+
+ private ConcurrentHashMap<String,PerformanceMetricsSummaryItem> map = new ConcurrentHashMap<String,PerformanceMetricsSummaryItem>();
+
+ private AtomicInteger casCount = new AtomicInteger(0);
+
+ private String getKey(AnalysisEnginePerformanceMetrics item) {
+ String key = "?";
+ try {
+ String uniqueName = item.getUniqueName();
+ String delim = "Components,";
+ key = delim+uniqueName.split(delim,2)[1];
+ }
+ catch(Throwable t) {
+ }
+ return key;
+ }
+
+ private void addEntry(String key, String name) {
+ synchronized(map) {
+ if(!map.containsKey(key)) {
+ PerformanceMetricsSummaryItem summaryItem = new PerformanceMetricsSummaryItem(name,key);
+ map.put(key, summaryItem);
+ }
+ }
+ }
+
+ /**
+ * For each unique name in completed work item's performance metrics list:
+ *
+ * 1. accumulate analysis time
+ * 2. accumulate number processed
+ *
+ * Also, accumulate number of (CR provided) CASes processed.
+ *
+ */
+ public void update(DuccLogger duccLogger, List<AnalysisEnginePerformanceMetrics> list) {
+ String methodName = "update";
+ int count = casCount.addAndGet(1);
+ for(AnalysisEnginePerformanceMetrics item : list ) {
+ String key = getKey(item);
+ String name = item.getName();
+ addEntry(key,name);
+ PerformanceMetricsSummaryItem summaryItem = map.get(key);
+ synchronized(map) {
+ long timeBefore = summaryItem.getAnalysisTime();
+ long timeItem = item.getAnalysisTime();
+ long timeAfter = summaryItem.addAndGetAnalysisTime(item.getAnalysisTime());
+ long numbBefore = summaryItem.getNumProcessed();
+ long numbItem = item.getNumProcessed();
+ long numbAfter = summaryItem.addAndGetNumProcessed(item.getNumProcessed());
+ if(duccLogger != null) {
+ String t0 = "count:"+count;
+ String t1 = "Numb before:"+numbBefore+" item:"+numbItem+" after:"+numbAfter;
+ String t2 = "Time before:"+timeBefore+" item:"+timeItem+" after:"+timeAfter;
+ String text = t0+" "+t1+" "+t2;
+ duccLogger.debug(methodName, null, text);
+ }
+ }
+ }
+ }
+
+ public void update(List<AnalysisEnginePerformanceMetrics> list) {
+ update(null, list);
+ }
+
+ public Set<Entry<String, PerformanceMetricsSummaryItem>> entrySet() {
+ return map.entrySet();
+ }
+
+ public int size() {
+ return map.size();
+ }
+
+ public int casCount() {
+ return casCount.get();
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceMetricsSummaryMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummary.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummary.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummary.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummary.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.jd;
+
+public class PerformanceSummary extends PerformanceSummaryWriter {
+
+ public PerformanceSummary(String dirname) {
+ super(dirname);
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummary.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryBase.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryBase.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryBase.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryBase.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.jd;
+
+import org.apache.uima.ducc.common.utils.IOHelper;
+
+public class PerformanceSummaryBase {
+
+ public static final String job_performance_summary_ser = "job-performance-summary.ser";
+
+ protected String filename = null;
+ protected PerformanceMetricsSummaryMap summaryMap = null;
+
+ protected PerformanceSummaryBase(String dirname) {
+ init(dirname);
+ }
+
+ protected void init(String dirname) {
+ this.filename = IOHelper.marryDir2File(dirname,job_performance_summary_ser);
+ this.summaryMap = new PerformanceMetricsSummaryMap();
+ }
+
+ public PerformanceMetricsSummaryMap getSummaryMap() {
+ return summaryMap;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryBase.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryReader.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryReader.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryReader.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryReader.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.jd;
+
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+
+public class PerformanceSummaryReader extends PerformanceSummaryBase {
+
+ public PerformanceSummaryReader(String dirname) {
+ super(dirname);
+ }
+
+ public PerformanceMetricsSummaryMap readSummary() {
+ PerformanceMetricsSummaryMap map = null;
+ try {
+ FileInputStream fis = new FileInputStream(filename);
+ ObjectInputStream in = new ObjectInputStream(fis);
+ summaryMap = (PerformanceMetricsSummaryMap)in.readObject();
+ in.close();
+ map = getSummaryMap();
+ }
+ catch(Exception e) {
+ System.err.println("PerformanceMetricsSummaryMap.readSummary() could not read file: "+ filename);
+ }
+ return map;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryWriter.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryWriter.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryWriter.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryWriter.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.jd;
+
+import java.io.FileOutputStream;
+import java.io.ObjectOutputStream;
+
+public class PerformanceSummaryWriter extends PerformanceSummaryReader {
+
+ public PerformanceSummaryWriter(String dirname) {
+ super(dirname);
+ }
+
+ public void writeSummary() {
+ try {
+ FileOutputStream fos = new FileOutputStream(filename);
+ ObjectOutputStream out = new ObjectOutputStream(fos);
+ out.writeObject(summaryMap);
+ out.close();
+ return;
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/PerformanceSummaryWriter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/UimaStatistic.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/UimaStatistic.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/UimaStatistic.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/UimaStatistic.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.jd;
+
+public class UimaStatistic implements Comparable<UimaStatistic> {
+
+ private String shortName;
+ private long analysisTime;
+ private long analysisMinTime;
+ private long analysisMaxTime;
+ private String longName;
+
+ public UimaStatistic (String shortName, String longName, long analysisTime, long anMinTime, long anMaxTime) {
+ this.shortName = shortName;
+ this.analysisTime = analysisTime;
+ this.longName = longName;
+ this.analysisMinTime = anMinTime;
+ this.analysisMaxTime = anMaxTime;
+ }
+
+ @Override
+ public int compareTo(UimaStatistic other) {
+ return - Long.signum(analysisTime - other.analysisTime);
+ }
+
+ @Override
+ public String toString() {
+// return "UimaStatistic [name=" + shortName + ", analysisTime=" + analysisTime
+// + ", longName=" + longName + "]";
+ return String.format(" %s: %.2f",shortName, analysisTime/(1000.0*ViewJobPerformanceSummary.cascount));
+ }
+
+ public String getShortName() {
+ return shortName;
+ }
+
+ public long getAnalysisTime() {
+ return analysisTime;
+ }
+
+ public long getAnalysisMinTime() {
+ return analysisMinTime;
+ }
+
+ public long getAnalysisMaxTime() {
+ return analysisMaxTime;
+ }
+
+ public String getLongName() {
+ return longName;
+ }
+
+ public String getToolTip() {
+ return shortName + " ("+ longName + ")";
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/UimaStatistic.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/ViewJobPerformanceSummary.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/ViewJobPerformanceSummary.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/ViewJobPerformanceSummary.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/ViewJobPerformanceSummary.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.jd;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map.Entry;
+
+public class ViewJobPerformanceSummary {
+
+ public static int cascount;
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ if (args.length<1) {
+ System.out.println("this command takes one arg: job-log-dir");
+ System.exit(1);
+ }
+ PerformanceSummary performanceSummary = new PerformanceSummary(args[0]);
+ PerformanceMetricsSummaryMap performanceMetricsSummaryMap = performanceSummary.readSummary();
+ if (performanceMetricsSummaryMap == null || performanceMetricsSummaryMap.size() == 0) {
+ System.err.println("Null map");
+ System.exit(1);
+ }
+ cascount = performanceMetricsSummaryMap.casCount();
+ ArrayList <UimaStatistic> uimaStats = new ArrayList<UimaStatistic>();
+ uimaStats.clear();
+ long analysisTime = 0;
+ try {
+ for (Entry<String, PerformanceMetricsSummaryItem> entry : performanceMetricsSummaryMap.entrySet()) {
+ String key = entry.getKey();
+ int posName = key.lastIndexOf('=');
+ long anTime = entry.getValue().getAnalysisTime();
+ long anMinTime = entry.getValue().getAnalysisTimeMin();
+ long anMaxTime = entry.getValue().getAnalysisTimeMax();
+ analysisTime += anTime;
+ if (posName > 0) {
+ String shortname = key.substring(posName+1);
+ UimaStatistic stat = new UimaStatistic(shortname,
+ entry.getKey(), anTime, anMinTime, anMaxTime);
+ uimaStats.add(stat);
+ }
+ }
+ Collections.sort(uimaStats);
+ int numstats = uimaStats.size();
+ System.out.println("Job = "+args[0]);
+ System.out.printf("Processed %d workitems, Average time = %.1f seconds%n", cascount, analysisTime/(1000.0*cascount));
+ System.out.println("Component breakdown (ave time per workitem in sec):");
+ for (int i = 0; i < numstats; ++i) {
+ System.out.println(uimaStats.get(i).toString());
+ }
+ } catch (Exception e) {
+ System.err.println("Problem parsing PerformanceMetricSummaryMap");
+ }
+ }
+
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/jd/ViewJobPerformanceSummary.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/rm/IResource.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/rm/IResource.java?rev=1427978&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/rm/IResource.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/rm/IResource.java Wed Jan 2 19:49:53 2013
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.transport.event.rm;
+
+import java.io.Serializable;
+
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+
+
+/**
+ * This interface defines exactly one "resource" or "share". The ID is unique within the specific machine, so that
+ * if two resource instances are found on the same machine, they can be uniquely identified.
+ */
+public interface IResource extends Serializable
+{
+ /**
+ * Returns the unique id, of the share. Share IDs last only as long as they're assigned to a job and
+ * won't be reused once they are reclaimed (the job vacates the share).
+ */
+ DuccId getId();
+
+ /**
+ * Returns the node identity where the share resides.
+ */
+ NodeIdentity getNodeId(); // The node where this resource resides, as provided by the Node Agent
+
+ /**
+ * If true, this share has been purged because its node went AWOL.
+ */
+ boolean isPurged();
+
+ /**
+ * Returns the number of quantum shares this resource occupies.
+ */
+ int countShares();
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/rm/IResource.java
------------------------------------------------------------------------------
svn:eol-style = native