You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by cw...@apache.org on 2019/08/02 17:30:47 UTC
svn commit: r1864248 [7/7] - in
/uima/uima-ducc/trunk/uima-ducc-pullservice/src:
main/java/org/apache/uima/ducc/ps/
main/java/org/apache/uima/ducc/ps/net/iface/
main/java/org/apache/uima/ducc/ps/net/impl/
main/java/org/apache/uima/ducc/ps/sd/ main/java...
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/test/ae/NoOpAE.java Fri Aug 2 17:30:46 2019
@@ -36,115 +36,114 @@ import org.apache.uima.resource.Resource
import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
+public class NoOpAE extends CasAnnotator_ImplBase {
+ Logger logger;
-public class NoOpAE extends CasAnnotator_ImplBase
-{
- Logger logger;
- static boolean initComplete = false;
- String AE_Identifier = "*^^^^^^^^^ AE ";
- private static AtomicLong processCount = new AtomicLong();
- String errorSequence;
-
- @Override
- public void initialize(UimaContext uimaContext) throws ResourceInitializationException
- {
- processCount.set(0);
- super.initialize(uimaContext);
- errorSequence = System.getProperty("ProcessFail");
- if ( Objects.isNull(errorSequence)) {
- errorSequence="";
- }
-
- long tid = Thread.currentThread().getId();
-
- Map<String, String> env = System.getenv();
- RuntimeMXBean rmxb = ManagementFactory.getRuntimeMXBean();
- String pid = rmxb.getName();
-
- logger = UIMAFramework.getLogger(NoOpAE.class);
- if ( logger == null ) {
- System.out.println("Is this nuts or what, no logger!");
- }
+ static boolean initComplete = false;
+
+ String AE_Identifier = "*^^^^^^^^^ AE ";
+
+ private static AtomicLong processCount = new AtomicLong();
- if ( initComplete ) {
- logger.log(Level.INFO, "Init bypassed in PID:TID " + pid + ":" + tid + ", already completed. ");
- return;
- } else {
- if ( logger != null )
- logger.log(Level.INFO, "Init procedes in PID:TIDs " + pid + ":" + tid + " Environment:");
- File workingdir = new File(System.getProperty("user.dir"));
- File[] files = workingdir.listFiles();
- if ( logger != null )
- logger.log(Level.INFO, "Working directory " + workingdir.toString() + " has " + files.length + " files.");
- }
-
-
- if ( logger != null )
- logger.log(Level.INFO, "^^-------> AE process " + pid + " TID " + tid + " initialization OK");
- return;
+ String errorSequence;
+
+ @Override
+ public void initialize(UimaContext uimaContext) throws ResourceInitializationException {
+ processCount.set(0);
+ super.initialize(uimaContext);
+ errorSequence = System.getProperty("ProcessFail");
+ if (Objects.isNull(errorSequence)) {
+ errorSequence = "";
}
+ long tid = Thread.currentThread().getId();
- void dolog(Object ... args)
- {
- StringBuffer sb = new StringBuffer();
- for ( Object s : args ) {
- sb.append(s);
- sb.append(" ");
- }
- String s = sb.toString();
- System.out.println("FROM PRINTLN: " + s);
- if ( logger != null )
- logger.log(Level.INFO, "FROM LOGGER:" + s);
+ Map<String, String> env = System.getenv();
+ RuntimeMXBean rmxb = ManagementFactory.getRuntimeMXBean();
+ String pid = rmxb.getName();
+
+ logger = UIMAFramework.getLogger(NoOpAE.class);
+ if (logger == null) {
+ System.out.println("Is this nuts or what, no logger!");
}
- public void destroy()
- {
- System.out.println(AE_Identifier + " Destroy is called (0)");
- dolog("Destroy is called (1) !");
- try {
- Thread.sleep(3000); // simulate actual work being done here
- } catch (InterruptedException e) {
+ if (initComplete) {
+ logger.log(Level.INFO,
+ "Init bypassed in PID:TID " + pid + ":" + tid + ", already completed. ");
+ return;
+ } else {
+ if (logger != null)
+ logger.log(Level.INFO, "Init procedes in PID:TIDs " + pid + ":" + tid + " Environment:");
+ File workingdir = new File(System.getProperty("user.dir"));
+ File[] files = workingdir.listFiles();
+ if (logger != null)
+ logger.log(Level.INFO,
+ "Working directory " + workingdir.toString() + " has " + files.length + " files.");
+ }
+
+ if (logger != null)
+ logger.log(Level.INFO, "^^-------> AE process " + pid + " TID " + tid + " initialization OK");
+ return;
+ }
+
+ void dolog(Object... args) {
+ StringBuffer sb = new StringBuffer();
+ for (Object s : args) {
+ sb.append(s);
+ sb.append(" ");
+ }
+ String s = sb.toString();
+ System.out.println("FROM PRINTLN: " + s);
+ if (logger != null)
+ logger.log(Level.INFO, "FROM LOGGER:" + s);
+ }
+
+ public void destroy() {
+ System.out.println(AE_Identifier + " Destroy is called (0)");
+ dolog("Destroy is called (1) !");
+ try {
+ Thread.sleep(3000); // simulate actual work being done here
+ } catch (InterruptedException e) {
+ }
+ System.out.println(AE_Identifier + " Destroy exits");
+ }
+
+ @Override
+ public void process(CAS cas) throws AnalysisEngineProcessException {
+ long val = processCount.incrementAndGet();
+ // String data = cas.getSofaDataString();
+ String[] errors = errorSequence.split(",");
+ synchronized (NoOpAE.class) {
+ for (String inx : errors) {
+ if (inx != null && inx.trim().length() > 0) {
+ long errorSeq = Long.parseLong(inx.trim());
+ if (errorSeq == val) {
+ System.out.println(">>>> Error: errorSeq:" + errorSeq + " processCount:" + val);
+ throw new AnalysisEngineProcessException(new RuntimeException("Simulated Exception"));
+ }
+
}
- System.out.println(AE_Identifier + " Destroy exits");
+ }
+ }
+ try {
+ // int n = getRandomNumberInRange(2000,3000);
+ int n = getRandomNumberInRange(250, 450);
+ // System.out.println(" AE Sleeping for "+n + " millis");
+ Thread.sleep(n);
+
+ } catch (InterruptedException e) {
+
+ }
+ }
+
+ private static int getRandomNumberInRange(int min, int max) {
+
+ if (min >= max) {
+ throw new IllegalArgumentException("max must be greater than min");
}
- @Override
- public void process(CAS cas) throws AnalysisEngineProcessException
- {
- long val = processCount.incrementAndGet();
- //String data = cas.getSofaDataString();
- String[] errors = errorSequence.split(",");
- synchronized(NoOpAE.class) {
- for( String inx : errors) {
- if ( inx != null && inx.trim().length() > 0 ) {
- long errorSeq = Long.parseLong(inx.trim());
- if ( errorSeq == val ) {
- System.out.println(">>>> Error: errorSeq:"+errorSeq+" processCount:"+val);
- throw new AnalysisEngineProcessException(new RuntimeException("Simulated Exception"));
- }
-
- }
- }
- }
- try {
- //int n = getRandomNumberInRange(2000,3000);
- int n = getRandomNumberInRange(250,450);
- //System.out.println(" AE Sleeping for "+n + " millis");
- Thread.sleep(n);
-
- } catch( InterruptedException e) {
-
- }
- }
- private static int getRandomNumberInRange(int min, int max) {
-
- if (min >= max) {
- throw new IllegalArgumentException("max must be greater than min");
- }
-
- Random r = new Random();
- return r.nextInt((max - min) + 1) + min;
- }
+ Random r = new Random();
+ return r.nextInt((max - min) + 1) + min;
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitProtocolHandlerTestCase.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitProtocolHandlerTestCase.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitProtocolHandlerTestCase.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitProtocolHandlerTestCase.java Fri Aug 2 17:30:46 2019
@@ -47,133 +47,137 @@ import org.junit.Before;
public class JunitProtocolHandlerTestCase extends Client {
- CountDownLatch threadsReady;
- CountDownLatch stopLatch;
- @Before
- public void setUp() throws Exception {
- System.setProperty(AbstractServiceProcessor.CLASSPATH_SWITCH_PROP,"true");
- }
- @After
- public void tearDown() throws Exception {
-
- }
- private IServiceTransport initializeTransport() throws Exception {
- int scaleout = 1;
- ITargetURI targetUrl = new HttpTargetURI("http://localhost:"+super.getPort()+"/"+super.getApp());
- DefaultRegistryClient registryClient =
- new DefaultRegistryClient(targetUrl);
-
- HttpServiceTransport transport =
- new HttpServiceTransport(registryClient, scaleout);
- transport.initialize();
- return transport;
- }
- // TODO Currently this test hangs
- //@Test
- public void testProtocolHandlerBasicConnectivity() throws Exception
- {
- int scaleout = 2;
-
- threadsReady = new CountDownLatch(scaleout);
- stopLatch = new CountDownLatch(scaleout);
-
- IServiceTransport transport =
- initializeTransport();
- String analysisEngineDescriptor = "TestAAE";
-
- UimaServiceProcessor processor =
- new UimaServiceProcessor(analysisEngineDescriptor);
- ServiceMockup service =
- new ServiceMockup(transport, processor, stopLatch);
- INoTaskAvailableStrategy waitStrategy =
- new DefaultNoTaskAvailableStrategy(1000);
- DefaultServiceProtocolHandler protocolHandler =
- new DefaultServiceProtocolHandler.Builder()
- .withProcessor(processor)
- .withService(service)
- .withNoTaskStrategy(waitStrategy)
- .withTransport(transport)
- .withDoneLatch(stopLatch)
- .withInitCompleteLatch(threadsReady)
- .build();
-
- service.setProtocolHandler(protocolHandler);
-
- service.initialize();
-
- service.start();
-
-// assertThat("Response Code", http.getResponseCode(), (equal((HttpStatus.OK_200)));
- }
-
- private class ServiceMockup implements IService {
- private CountDownLatch stopLatch;
- private IServiceTransport transport;
- private IServiceProtocolHandler protocolHandler;
- private IServiceProcessor processor;
- private int scaleout = 2;
- ScheduledThreadPoolExecutor threadPool;
-
- public ServiceMockup(IServiceTransport transport, IServiceProcessor processor, CountDownLatch stopLatch) {
- this.transport = transport;
-
- this.processor = processor;
- this.stopLatch = stopLatch;
- }
- public void setProtocolHandler( IServiceProtocolHandler protocolHandler) {
- this.protocolHandler = protocolHandler;
- }
- @Override
- public void start() throws ServiceException {
- try {
- stopLatch.await();
- } catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- }
-
- @Override
- public void stop() {
- threadPool.shutdown();
- protocolHandler.stop();
- transport.stop(false);
- processor.stop();
- }
- @Override
- public void quiesceAndStop() {
- protocolHandler.quiesceAndStop();
- threadPool.shutdown();
- transport.stop(true);
- processor.stop();
- }
- @Override
- public void initialize() throws ServiceInitializationException {
-
- List<Future<String>> threadHandleList = new ArrayList<Future<String>>();
- threadPool = new ScheduledThreadPoolExecutor(scaleout, new ServiceThreadFactory());
-
- // Create and start worker threads that pull Work Items from a client
- for (int j = 0; j < scaleout; j++) {
- threadHandleList.add(threadPool.submit(protocolHandler));
- }
- try {
- // wait until all process threads initialize
- threadsReady.await();
-
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- threadPool.shutdownNow();
- throw new ServiceInitializationException(
- "Service interrupted during initialization - shutting down process threads");
- }
- }
-
- @Override
- public String getType() {
- // TODO Auto-generated method stub
- return null;
- }
-
- }
+ CountDownLatch threadsReady;
+
+ CountDownLatch stopLatch;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(AbstractServiceProcessor.CLASSPATH_SWITCH_PROP, "true");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ }
+
+ private IServiceTransport initializeTransport() throws Exception {
+ int scaleout = 1;
+ ITargetURI targetUrl = new HttpTargetURI(
+ "http://localhost:" + super.getPort() + "/" + super.getApp());
+ DefaultRegistryClient registryClient = new DefaultRegistryClient(targetUrl);
+
+ HttpServiceTransport transport = new HttpServiceTransport(registryClient, scaleout);
+ transport.initialize();
+ return transport;
+ }
+
+ // TODO Currently this test hangs
+ // @Test
+ public void testProtocolHandlerBasicConnectivity() throws Exception {
+ int scaleout = 2;
+
+ threadsReady = new CountDownLatch(scaleout);
+ stopLatch = new CountDownLatch(scaleout);
+
+ IServiceTransport transport = initializeTransport();
+ String analysisEngineDescriptor = "TestAAE";
+
+ UimaServiceProcessor processor = new UimaServiceProcessor(analysisEngineDescriptor);
+ ServiceMockup service = new ServiceMockup(transport, processor, stopLatch);
+ INoTaskAvailableStrategy waitStrategy = new DefaultNoTaskAvailableStrategy(1000);
+ DefaultServiceProtocolHandler protocolHandler = new DefaultServiceProtocolHandler.Builder()
+ .withProcessor(processor).withService(service).withNoTaskStrategy(waitStrategy)
+ .withTransport(transport).withDoneLatch(stopLatch).withInitCompleteLatch(threadsReady)
+ .build();
+
+ service.setProtocolHandler(protocolHandler);
+
+ service.initialize();
+
+ service.start();
+
+ // assertThat("Response Code", http.getResponseCode(), (equal((HttpStatus.OK_200)));
+ }
+
+ private class ServiceMockup implements IService {
+ private CountDownLatch stopLatch;
+
+ private IServiceTransport transport;
+
+ private IServiceProtocolHandler protocolHandler;
+
+ private IServiceProcessor processor;
+
+ private int scaleout = 2;
+
+ ScheduledThreadPoolExecutor threadPool;
+
+ public ServiceMockup(IServiceTransport transport, IServiceProcessor processor,
+ CountDownLatch stopLatch) {
+ this.transport = transport;
+
+ this.processor = processor;
+ this.stopLatch = stopLatch;
+ }
+
+ public void setProtocolHandler(IServiceProtocolHandler protocolHandler) {
+ this.protocolHandler = protocolHandler;
+ }
+
+ @Override
+ public void start() throws ServiceException {
+ try {
+ stopLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ }
+
+ @Override
+ public void stop() {
+ threadPool.shutdown();
+ protocolHandler.stop();
+ transport.stop(false);
+ processor.stop();
+ }
+
+ @Override
+ public void quiesceAndStop() {
+ protocolHandler.quiesceAndStop();
+ threadPool.shutdown();
+ transport.stop(true);
+ processor.stop();
+ }
+
+ @Override
+ public void initialize() throws ServiceInitializationException {
+
+ List<Future<String>> threadHandleList = new ArrayList<Future<String>>();
+ threadPool = new ScheduledThreadPoolExecutor(scaleout, new ServiceThreadFactory());
+
+ // Create and start worker threads that pull Work Items from a client
+ for (int j = 0; j < scaleout; j++) {
+ threadHandleList.add(threadPool.submit(protocolHandler));
+ }
+ try {
+ // wait until all process threads initialize
+ threadsReady.await();
+
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ threadPool.shutdownNow();
+ throw new ServiceInitializationException(
+ "Service interrupted during initialization - shutting down process threads");
+ }
+ }
+
+ @Override
+ public String getType() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ }
}
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java?rev=1864248&r1=1864247&r2=1864248&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/test/java/org/apache/uima/ducc/ps/transport/JunitTransportTestCase.java Fri Aug 2 17:30:46 2019
@@ -66,14 +66,17 @@ public class JunitTransportTestCase {
return new HashMap<Long, XStream>();
}
};
+
@Before
public void setUp() throws Exception {
- System.setProperty(AbstractServiceProcessor.CLASSPATH_SWITCH_PROP,"true");
+ System.setProperty(AbstractServiceProcessor.CLASSPATH_SWITCH_PROP, "true");
}
+
@After
public void tearDown() throws Exception {
-
+
}
+
private int getJettyPort() {
while (true) {
ServerSocket socket = null;