You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by de...@apache.org on 2015/02/21 16:12:49 UTC
svn commit: r1661378 - in
/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src:
main/java/org/apache/uima/ducc/container/common/
main/java/org/apache/uima/ducc/container/jd/blacklist/
main/java/org/apache/uima/ducc/container/jd/cas/ main/java/org/apac...
Author: degenaro
Date: Sat Feb 21 15:12:48 2015
New Revision: 1661378
URL: http://svn.apache.org/r1661378
Log:
UIMA-4069 DUCC Job Driver (JD) system classpath
blacklist nodes that OR has reported as down whose JPs continue to request work items
add "hints" to JP response: Process-Blacklisted, Job-Killed, WorkItems-Exhausted
Added:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/blacklist/
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/blacklist/JobProcessBlacklist.java (with props)
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/TransactionHelper.java (with props)
Modified:
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/RemoteWorkerProcess.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/MetaCasTransaction.java
uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestMessageHandler.java
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java?rev=1661378&r1=1661377&r2=1661378&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/common/Standardize.java Sat Feb 21 15:12:48 2015
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.container.c
public class Standardize {
public enum Label {
+ text,
limit,
classname,
exception,
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/blacklist/JobProcessBlacklist.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/blacklist/JobProcessBlacklist.java?rev=1661378&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/blacklist/JobProcessBlacklist.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/blacklist/JobProcessBlacklist.java Sat Feb 21 15:12:48 2015
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.jd.blacklist;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.uima.ducc.container.common.MessageBuffer;
+import org.apache.uima.ducc.container.common.Standardize;
+import org.apache.uima.ducc.container.common.logger.IComponent;
+import org.apache.uima.ducc.container.common.logger.ILogger;
+import org.apache.uima.ducc.container.common.logger.Logger;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
+
+public class JobProcessBlacklist {
+
+ private static ILogger logger = Logger.getLogger(JobProcessBlacklist.class, IComponent.Id.JD.name());
+
+ private static ConcurrentHashMap<IRemoteWorkerProcess,Long> map = new ConcurrentHashMap<IRemoteWorkerProcess,Long>();
+
+ private static JobProcessBlacklist instance = new JobProcessBlacklist();
+
+ public static JobProcessBlacklist getInstance() {
+ return instance;
+ }
+
+ private static boolean disabled = false;
+
+ public void add(IRemoteWorkerProcess rwp) {
+ String location = "add";
+ if(!disabled) {
+ if(rwp != null) {
+ if(!map.containsKey(rwp)) {
+ Long time = new Long(System.currentTimeMillis());
+ map.put(rwp, time);
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.node.get()+rwp.getNodeName());
+ mb.append(Standardize.Label.pid.get()+rwp.getPid());
+ logger.debug(location, ILogger.null_id, mb.toString());
+ }
+ }
+ }
+ }
+
+ public void remove(IRemoteWorkerProcess rwp) {
+ String location = "remove";
+ if(!disabled) {
+ if(rwp != null) {
+ map.remove(rwp);
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.node.get()+rwp.getNodeName());
+ mb.append(Standardize.Label.pid.get()+rwp.getPid());
+ logger.debug(location, ILogger.null_id, mb.toString());
+ }
+ }
+ }
+
+ public boolean includes(IRemoteWorkerProcess rwp) {
+ String location = "includes";
+ boolean retVal = false;
+ if(!disabled) {
+ if(rwp != null) {
+ retVal = map.containsKey(rwp);
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.node.get()+rwp.getNodeName());
+ mb.append(Standardize.Label.pid.get()+rwp.getPid());
+ mb.append(Standardize.Label.size.get()+map.size());
+ mb.append(Standardize.Label.value.get()+retVal);
+ logger.debug(location, ILogger.null_id, mb.toString());
+ }
+ }
+ return retVal;
+ }
+
+ public void disable() {
+ disabled = true;
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/blacklist/JobProcessBlacklist.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/blacklist/JobProcessBlacklist.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java?rev=1661378&r1=1661377&r2=1661378&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/cas/CasManagerStats.java Sat Feb 21 15:12:48 2015
@@ -40,6 +40,14 @@ public class CasManagerStats {
private ConcurrentHashMap<String,AtomicInteger> retryReasonsMap = new ConcurrentHashMap<String,AtomicInteger>();
+ public boolean isExhausted() {
+ boolean retVal = false;
+ if(getCrTotal() == getEnded()) {
+ retVal = true;
+ }
+ return retVal;
+ }
+
public int getUnfinishedWorkCount() {
return crTotal.get() - getEnded();
}
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java?rev=1661378&r1=1661377&r2=1661378&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/fsm/wi/ActionGet.java Sat Feb 21 15:12:48 2015
@@ -18,11 +18,12 @@
*/
package org.apache.uima.ducc.container.jd.fsm.wi;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.uima.ducc.common.jd.files.workitem.IWorkItemStateKeeper;
import org.apache.uima.ducc.container.common.MessageBuffer;
import org.apache.uima.ducc.container.common.MetaCasHelper;
+import org.apache.uima.ducc.container.common.Standardize;
import org.apache.uima.ducc.container.common.fsm.iface.IAction;
import org.apache.uima.ducc.container.common.fsm.iface.IEvent;
import org.apache.uima.ducc.container.common.fsm.iface.IFsm;
@@ -31,6 +32,7 @@ import org.apache.uima.ducc.container.co
import org.apache.uima.ducc.container.common.logger.Logger;
import org.apache.uima.ducc.container.jd.JobDriver;
import org.apache.uima.ducc.container.jd.JobDriverHelper;
+import org.apache.uima.ducc.container.jd.blacklist.JobProcessBlacklist;
import org.apache.uima.ducc.container.jd.cas.CasManager;
import org.apache.uima.ducc.container.jd.log.LoggerHelper;
import org.apache.uima.ducc.container.jd.mh.RemoteWorkerProcess;
@@ -43,13 +45,17 @@ import org.apache.uima.ducc.container.jd
import org.apache.uima.ducc.container.jd.wi.WiTracker;
import org.apache.uima.ducc.container.net.iface.IMetaCas;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Hint;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.JdState;
+import org.apache.uima.ducc.container.net.impl.TransactionHelper;
public class ActionGet implements IAction {
private static Logger logger = Logger.getLogger(ActionGet.class, IComponent.Id.JD.name());
-
- private AtomicBoolean warned = new AtomicBoolean(false);
+
+ private ConcurrentHashMap<IRemoteWorkerProcess, Long> warnedJobDiscontinued = new ConcurrentHashMap<IRemoteWorkerProcess, Long>();
+ private ConcurrentHashMap<IRemoteWorkerProcess, Long> warnedProcessDiscontinued = new ConcurrentHashMap<IRemoteWorkerProcess, Long>();
+ private ConcurrentHashMap<IRemoteWorkerProcess, Long> warnedExhausted = new ConcurrentHashMap<IRemoteWorkerProcess, Long>();
@Override
public String getName() {
@@ -74,12 +80,28 @@ public class ActionGet implements IActio
jd.advanceJdState(JdState.Active);
CasManager cm = jd.getCasManager();
IMetaCas metaCas = null;
+ JobProcessBlacklist jobProcessBlacklist = JobProcessBlacklist.getInstance();
if(cm.getCasManagerStats().isKillJob()) {
- if(!warned.getAndSet(true)) {
+ if(!warnedJobDiscontinued.containsKey(rwp)) {
+ MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
+ mb.append(Standardize.Label.node.get()+rwp.getNodeName());
+ mb.append(Standardize.Label.pid.get()+rwp.getPid());
+ mb.append(Standardize.Label.text.get()+"job discontinued");
+ logger.warn(location, ILogger.null_id, mb.toString());
+ warnedJobDiscontinued.put(rwp, new Long(System.currentTimeMillis()));
+ }
+ TransactionHelper.addResponseHint(trans, Hint.Killed);
+ }
+ else if(jobProcessBlacklist.includes(rwp)) {
+ if(!warnedProcessDiscontinued.containsKey(rwp)) {
MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
- mb.append("this and future requests refused due to pending kill job");
- logger.info(location, ILogger.null_id, mb.toString());
+ mb.append(Standardize.Label.node.get()+rwp.getNodeName());
+ mb.append(Standardize.Label.pid.get()+rwp.getPid());
+ mb.append(Standardize.Label.text.get()+"process discontinued");
+ logger.warn(location, ILogger.null_id, mb.toString());
+ warnedProcessDiscontinued.put(rwp, new Long(System.currentTimeMillis()));
}
+ TransactionHelper.addResponseHint(trans, Hint.Blacklisted);
}
else {
metaCas = cm.getMetaCas();
@@ -114,6 +136,17 @@ public class ActionGet implements IActio
MessageBuffer mb = LoggerHelper.getMessageBuffer(actionData);
mb.append("No CAS found for processing");
logger.info(location, ILogger.null_id, mb.toString());
+ if(cm.getCasManagerStats().isExhausted()) {
+ if(!warnedExhausted.containsKey(rwp)) {
+ MessageBuffer mbx = LoggerHelper.getMessageBuffer(actionData);
+ mbx.append(Standardize.Label.node.get()+rwp.getNodeName());
+ mbx.append(Standardize.Label.pid.get()+rwp.getPid());
+ mbx.append(Standardize.Label.text.get()+"all CASes processed");
+ logger.info(location, ILogger.null_id, mbx.toString());
+ warnedExhausted.put(rwp, new Long(System.currentTimeMillis()));
+ }
+ TransactionHelper.addResponseHint(trans, Hint.Exhausted);
+ }
}
//
fsm.transition(event, actionData);
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java?rev=1661378&r1=1661377&r2=1661378&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/MessageHandler.java Sat Feb 21 15:12:48 2015
@@ -19,9 +19,9 @@
package org.apache.uima.ducc.container.jd.mh;
import java.io.File;
+import java.util.ArrayList;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,14 +37,15 @@ import org.apache.uima.ducc.container.co
import org.apache.uima.ducc.container.dgen.DgenManager;
import org.apache.uima.ducc.container.jd.JobDriver;
import org.apache.uima.ducc.container.jd.JobDriverHelper;
+import org.apache.uima.ducc.container.jd.blacklist.JobProcessBlacklist;
import org.apache.uima.ducc.container.jd.cas.CasManager;
import org.apache.uima.ducc.container.jd.cas.CasManagerStats;
import org.apache.uima.ducc.container.jd.fsm.wi.ActionData;
import org.apache.uima.ducc.container.jd.fsm.wi.WiFsm;
import org.apache.uima.ducc.container.jd.mh.iface.INodeInfo;
import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo;
-import org.apache.uima.ducc.container.jd.mh.iface.IProcessInfo;
import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo.CompletionType;
+import org.apache.uima.ducc.container.jd.mh.iface.IProcessInfo;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerThread;
import org.apache.uima.ducc.container.jd.mh.impl.OperatingInfo;
import org.apache.uima.ducc.container.jd.wi.IRunningWorkItemStatistics;
@@ -55,6 +56,7 @@ import org.apache.uima.ducc.container.jd
import org.apache.uima.ducc.container.jd.wi.perf.IWorkItemPerformanceKeeper;
import org.apache.uima.ducc.container.net.iface.IMetaCas;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Hint;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.JdState;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Type;
@@ -65,8 +67,6 @@ public class MessageHandler implements I
private AtomicInteger gets = new AtomicInteger(0);
private AtomicInteger acks = new AtomicInteger(0);
- private ConcurrentLinkedQueue<IRemoteWorkerThread> legacyList = new ConcurrentLinkedQueue<IRemoteWorkerThread>();
-
private ConcurrentHashMap<String,String> failedInitializationMap = new ConcurrentHashMap<String,String>();
private ConcurrentHashMap<IRemoteWorkerThread,IRemoteWorkerThread> wipMap = new ConcurrentHashMap<IRemoteWorkerThread,IRemoteWorkerThread>();
@@ -203,17 +203,19 @@ public class MessageHandler implements I
mb.append(Standardize.Label.pid.get()+processInfo.getPid());
logger.trace(location, ILogger.null_id, mb.toString());
ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
+ JobProcessBlacklist jobProcessBlacklist = JobProcessBlacklist.getInstance();
for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
IRemoteWorkerThread rwt = entry.getKey();
if(rwt.comprises(processInfo)) {
- if(legacyList.contains(rwt)) {
+ RemoteWorkerProcess rwp = new RemoteWorkerProcess(rwt);
+ if(jobProcessBlacklist.includes(rwp)) {
MessageBuffer mb1 = new MessageBuffer();
mb1.append(Standardize.Label.remote.get()+rwt.toString());
mb1.append(Standardize.Label.status.get()+"already kaput");
logger.trace(location, ILogger.null_id, mb1.toString());
}
else {
- legacyList.add(rwt);
+ jobProcessBlacklist.add(rwp);
MessageBuffer mb1 = new MessageBuffer();
mb1.append(Standardize.Label.remote.get()+rwt.toString());
mb1.append(Standardize.Label.status.get()+"transition to down");
@@ -248,17 +250,19 @@ public class MessageHandler implements I
mb.append(Standardize.Label.pid.get()+processInfo.getPid());
logger.trace(location, ILogger.null_id, mb.toString());
ConcurrentHashMap<IRemoteWorkerThread, IWorkItem> map = JobDriver.getInstance().getRemoteThreadMap();
+ JobProcessBlacklist jobProcessBlacklist = JobProcessBlacklist.getInstance();
for(Entry<IRemoteWorkerThread, IWorkItem> entry : map.entrySet()) {
IRemoteWorkerThread rwt = entry.getKey();
if(rwt.comprises(processInfo)) {
- if(legacyList.contains(rwt)) {
+ RemoteWorkerProcess rwp = new RemoteWorkerProcess(rwt);
+ if(jobProcessBlacklist.includes(rwp)) {
MessageBuffer mb1 = new MessageBuffer();
mb1.append(Standardize.Label.remote.get()+rwt.toString());
mb1.append(Standardize.Label.status.get()+"already kaput");
logger.trace(location, ILogger.null_id, mb1.toString());
}
else {
- legacyList.add(rwt);
+ jobProcessBlacklist.add(rwp);
MessageBuffer mb1 = new MessageBuffer();
mb1.append(Standardize.Label.remote.get()+rwt.toString());
mb1.append(Standardize.Label.status.get()+"transition to down");
@@ -388,6 +392,7 @@ public class MessageHandler implements I
String location = "handleMetaCasTransation";
RemoteWorkerThread rwt = null;
try {
+ trans.setResponseHints(new ArrayList<Hint>());
rwt = new RemoteWorkerThread(trans);
block(rwt);
MessageBuffer mb = new MessageBuffer();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/RemoteWorkerProcess.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/RemoteWorkerProcess.java?rev=1661378&r1=1661377&r2=1661378&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/RemoteWorkerProcess.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/jd/mh/RemoteWorkerProcess.java Sat Feb 21 15:12:48 2015
@@ -22,6 +22,7 @@ import org.apache.uima.ducc.container.co
import org.apache.uima.ducc.container.common.logger.ILogger;
import org.apache.uima.ducc.container.common.logger.Logger;
import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerThread;
import org.apache.uima.ducc.container.net.iface.IMetaCasRequester;
public class RemoteWorkerProcess implements IRemoteWorkerProcess {
@@ -49,6 +50,13 @@ public class RemoteWorkerProcess impleme
setPid(pid);
}
+ public RemoteWorkerProcess(IRemoteWorkerThread rwt) {
+ setNodeName(rwt.getNodeName());
+ setNodeAddress(rwt.getNodeAddress());
+ setPidName(rwt.getPidName());
+ setPid(rwt.getPid());
+ }
+
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java?rev=1661378&r1=1661377&r2=1661378&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/iface/IMetaCasTransaction.java Sat Feb 21 15:12:48 2015
@@ -19,6 +19,7 @@
package org.apache.uima.ducc.container.net.iface;
import java.io.Serializable;
+import java.util.List;
import org.apache.uima.ducc.container.net.impl.TransactionId;
@@ -28,6 +29,15 @@ public interface IMetaCasTransaction ext
public Type getType();
public void setType(Type value);
+
+ public enum Hint {
+ Blacklisted, // the requesting JP has been blacklisted (no workitems will ever be assigned)
+ Killed, // the JD has been killed
+ Exhausted, // the workitems have all been processed (successfully or otherwise)
+ };
+
+ public List<Hint> getResponseHints();
+ public void setResponseHints(List<Hint> value);
public enum Direction { Request, Response };
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/MetaCasTransaction.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/MetaCasTransaction.java?rev=1661378&r1=1661377&r2=1661378&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/MetaCasTransaction.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/MetaCasTransaction.java Sat Feb 21 15:12:48 2015
@@ -18,6 +18,8 @@
*/
package org.apache.uima.ducc.container.net.impl;
+import java.util.List;
+
import org.apache.uima.ducc.container.net.iface.IMetaCas;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
@@ -30,6 +32,8 @@ public class MetaCasTransaction implemen
private Type type = null;
private Direction direction = null;
+ private List<Hint> hints = null;
+
private String providerKey = null;
private String providerName = null;
private int providerPort = 0;
@@ -149,6 +153,16 @@ public class MetaCasTransaction implemen
}
@Override
+ public List<Hint> getResponseHints() {
+ return hints;
+ }
+
+ @Override
+ public void setResponseHints(List<Hint> value) {
+ hints = value;
+ }
+
+ @Override
public TransactionId getTransactionId() {
return transactionId;
}
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/TransactionHelper.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/TransactionHelper.java?rev=1661378&view=auto
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/TransactionHelper.java (added)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/TransactionHelper.java Sat Feb 21 15:12:48 2015
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+package org.apache.uima.ducc.container.net.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.uima.ducc.container.common.MessageBuffer;
+import org.apache.uima.ducc.container.common.Standardize;
+import org.apache.uima.ducc.container.common.logger.IComponent;
+import org.apache.uima.ducc.container.common.logger.ILogger;
+import org.apache.uima.ducc.container.common.logger.Logger;
+import org.apache.uima.ducc.container.jd.mh.RemoteWorkerProcess;
+import org.apache.uima.ducc.container.jd.mh.iface.remote.IRemoteWorkerProcess;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
+import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Hint;
+
+public class TransactionHelper {
+
+ private static Logger logger = Logger.getLogger(TransactionHelper.class, IComponent.Id.JD.name());
+
+ public static void addResponseHint(IMetaCasTransaction trans, Hint hint) {
+ String location = "addResponseHint";
+ if(trans != null) {
+ if(hint != null) {
+ List<Hint> hints = trans.getResponseHints();
+ if(hints == null) {
+ hints = new ArrayList<Hint>();
+ trans.setResponseHints(hints);
+ }
+ if(!hints.contains(hint)) {
+ hints.add(hint);
+ IRemoteWorkerProcess rwp = new RemoteWorkerProcess(trans);
+ MessageBuffer mb = new MessageBuffer();
+ mb.append(Standardize.Label.node.get()+rwp.getNodeName());
+ mb.append(Standardize.Label.pid.get()+rwp.getPid());
+ mb.append(Standardize.Label.value.get()+hint);
+ logger.debug(location, ILogger.null_id, mb.toString());
+ }
+ }
+ }
+ }
+}
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/TransactionHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/net/impl/TransactionHelper.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestMessageHandler.java
URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestMessageHandler.java?rev=1661378&r1=1661377&r2=1661378&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestMessageHandler.java (original)
+++ uima/sandbox/uima-ducc/trunk/uima-ducc-container/src/test/java/org/apache/uima/ducc/container/jd/test/TestMessageHandler.java Sat Feb 21 15:12:48 2015
@@ -28,6 +28,7 @@ import java.util.Random;
import org.apache.uima.ducc.common.container.FlagsHelper;
import org.apache.uima.ducc.container.jd.JobDriver;
+import org.apache.uima.ducc.container.jd.blacklist.JobProcessBlacklist;
import org.apache.uima.ducc.container.jd.mh.MessageHandler;
import org.apache.uima.ducc.container.jd.mh.iface.IOperatingInfo;
import org.apache.uima.ducc.container.jd.mh.iface.IProcessInfo;
@@ -47,6 +48,11 @@ public class TestMessageHandler extends
private boolean enforce = true;
+ public void setUp() throws Exception {
+ super.setUp();
+ JobProcessBlacklist.getInstance().disable();
+ }
+
private MetaCasTransaction create(String node, int pid, int tid, Type type) {
MetaCasTransaction mct = new MetaCasTransaction();
mct.setRequesterNodeName(node);