You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by bu...@apache.org on 2018/07/13 14:04:33 UTC
svn commit: r1835841 - in
/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service:
main/PullService.java protocol/builtin/DefaultServiceProtocolHandler.java
Author: burn
Date: Fri Jul 13 14:04:33 2018
New Revision: 1835841
URL: http://svn.apache.org/viewvc?rev=1835841&view=rev
Log:
UIMA-5822 Delay first work request
Modified:
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java?rev=1835841&r1=1835840&r2=1835841&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java Fri Jul 13 14:04:33 2018
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -67,10 +67,10 @@ public class PullService implements ISer
private String clientURL;
private IRegistryClient registryClient;
// ******************************************
-
+
// internal error handler
private IServiceErrorHandler errorHandler=null;
- //
+ //
private IServiceMonitor serviceMonitor=null;
// internal transport to communicate with remote client
private IServiceTransport transport=null;
@@ -84,12 +84,12 @@ public class PullService implements ISer
// holds Future to every process thread
private List<Future<String>> threadHandleList =
new ArrayList<>();
-
+
private Lock initLock = new ReentrantLock();
-
- public PullService(String type) {
+
+ public PullService(String type) {
this.type = type;
-
+
}
public String getType() {
return type;
@@ -134,12 +134,12 @@ public class PullService implements ISer
registryClient = new DefaultRegistryClient(target);
}
-
+
@Override
public void initialize() throws ServiceInitializationException {
// only one thread can call this method
initLock.lock();
-
+
try {
if ( initialized ) {
// Already initialized
@@ -179,13 +179,13 @@ public class PullService implements ISer
.withDoneLatch(stopLatch)
.withInitCompleteLatch(threadsReady)
.build();
-
-
- // first initialize Processors. The ServiceThreadFactory creates
+
+
+ // first initialize Processors. The ServiceThreadFactory creates
// as many threads as defined in 'scaleout'
- threadPool =
+ threadPool =
new ScheduledThreadPoolExecutor(scaleout, new ServiceThreadFactory());
-
+
// Create and start worker threads that pull Work Items from a client.
// Each worker thread calls processor.initialize() and counts down the
// 'threadsReady' latch. When all threads finish initializing they all
@@ -198,10 +198,10 @@ public class PullService implements ISer
initializeMonitor();
initializeTransport();
-
+
initialized = true;
-
-
+
+
} catch( ServiceInitializationException e) {
throw e;
} catch( InterruptedException e) {
@@ -210,13 +210,13 @@ public class PullService implements ISer
throw new ServiceInitializationException("Service interrupted during initialization - shutting down process threads");
} catch( Exception e) {
throw new ServiceInitializationException("",e);
- }
+ }
finally {
initLock.unlock();
}
}
-
+
@Override
public void start() throws IllegalStateException, ExecutionException, ServiceException {
if ( !initialized ) {
@@ -252,7 +252,7 @@ public class PullService implements ISer
stopTransport();
stopProtocolHandler();
stopServiceProcessor();
- // monitor should be stopped last to keep posting updates to observer
+ // monitor should be stopped last to keep posting updates to observer
stopMonitor();
}
@@ -260,8 +260,8 @@ public class PullService implements ISer
for (Future<String> future : threadHandleList) {
// print the return value of Future, notice the output delay in console
// because Future.get() waits for task to get completed
- logger.log(Level.INFO,
- "Thread:" + Thread.currentThread().getName() + " Terminated " + new Date() + "::" + future.get());
+ String result = future.get();
+ logger.log(Level.INFO, "Thread:" + Thread.currentThread().getName() + " Terminated " + new Date() + "::" + result);
}
}
@@ -307,7 +307,7 @@ public class PullService implements ISer
}
}
private void stopProtocolHandler() {
-
+
}
private void stopTransport() {
transport.stop();
Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1835841&r1=1835840&r2=1835841&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java Fri Jul 13 14:04:33 2018
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,8 +44,8 @@ import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;
/**
- *
- * This protocol handler is a Runnable
+ *
+ * This protocol handler is a Runnable
*
*/
public class DefaultServiceProtocolHandler implements IServiceProtocolHandler {
@@ -66,13 +66,13 @@ public class DefaultServiceProtocolHandl
private IService service;
// forces process threads to initialize serially
private ReentrantLock initLock = new ReentrantLock();
-
+
private static AtomicInteger idGenerator = new AtomicInteger();
-
- private DefaultServiceProtocolHandler(Builder builder) {
- this.initLatch = builder.initLatch;
- this.stopLatch = builder.stopLatch;
+
+ private DefaultServiceProtocolHandler(Builder builder) {
+ this.initLatch = builder.initLatch;
+ this.stopLatch = builder.stopLatch;
this.service = builder.service;
this.transport = builder.transport;
this.processor = builder.processor;
@@ -138,7 +138,7 @@ public class DefaultServiceProtocolHandl
throw new TransportException("Received invalid content (null) in response from client - rejecting request");
}
o = XStreamUtils.unmarshall(content);
-
+
} catch ( Exception e) {
if ( !running ) {
throw new TransportException("Service stopping - rejecting request");
@@ -171,7 +171,7 @@ public class DefaultServiceProtocolHandl
}
private IMetaTaskTransaction callGet(IMetaTaskTransaction transaction) throws Exception {
- transaction.setType(Type.Get);
+ transaction.setType(Type.Get);
if ( logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "ProtocolHandler calling GET");
}
@@ -179,7 +179,7 @@ public class DefaultServiceProtocolHandl
}
/**
* Block until service start() is called
- *
+ *
* @throws ServiceInitializationException
*/
private void awaitStart() throws ServiceInitializationException {
@@ -194,19 +194,25 @@ public class DefaultServiceProtocolHandl
// we may fail in initialize() in which case the ServiceInitializationException
// is thrown
initialize();
-
+
// now wait for application to call start
awaitStart();
-
+
// all threads intialized, enter running state
IMetaTaskTransaction transaction = null;
-
+
if ( logger.isLoggable(Level.INFO)) {
logger.log(Level.INFO, ".............. Thread "+Thread.currentThread().getId() + " ready to process");
}
-
+ logger.log(Level.INFO, "Wait for the initialized state to propagate to the SM " +
+ "so any processing errors are not treates as initialization failures");
+ try {
+ Thread.sleep(30000);
+ } catch (InterruptedException e1) {
+ };
+
while (running) {
try {
@@ -218,13 +224,13 @@ public class DefaultServiceProtocolHandl
break;
}
if (transaction.getMetaTask() == null || transaction.getMetaTask().getUserSpaceTask() == null ) {
- // the client has no tasks to give.
+ // the client has no tasks to give.
noTaskStrategy.handleNoTaskSupplied();
continue;
}
Object task = transaction.getMetaTask().getUserSpaceTask();
-
- // send ACK
+
+ // send ACK
transaction = callAck(transaction);
if (!running) {
break;
@@ -259,8 +265,8 @@ public class DefaultServiceProtocolHandl
}).start();
running = false;
}
-
-
+
+
} catch( IllegalStateException e) {
break;
@@ -269,14 +275,14 @@ public class DefaultServiceProtocolHandl
}
catch (Exception e) {
logger.log(Level.WARNING,"",e);
- }
+ }
}
stopLatch.countDown();
logger.log(Level.INFO,"ProtocolHandler terminated");
return String.valueOf(Thread.currentThread().getId());
}
-
+
private void delegateStop() {
service.stop();
}
@@ -302,8 +308,8 @@ public class DefaultServiceProtocolHandl
public void setTransport(IServiceTransport transport) {
this.transport = transport;
}
-
-
+
+
public static class Builder {
private IServiceTransport transport;
private IServiceProcessor processor;
@@ -320,15 +326,15 @@ public class DefaultServiceProtocolHandl
public Builder withProcessor(IServiceProcessor processor) {
this.processor = processor;
return this;
- }
+ }
public Builder withInitCompleteLatch(CountDownLatch initLatch) {
this.initLatch = initLatch;
return this;
- }
+ }
public Builder withDoneLatch(CountDownLatch stopLatch) {
this.stopLatch = stopLatch;
return this;
- }
+ }
public Builder withNoTaskStrategy(INoTaskAvailableStrategy strategy) {
this.strategy = strategy;
return this;