You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by bu...@apache.org on 2018/07/13 14:04:33 UTC

svn commit: r1835841 - in /uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service: main/PullService.java protocol/builtin/DefaultServiceProtocolHandler.java

Author: burn
Date: Fri Jul 13 14:04:33 2018
New Revision: 1835841

URL: http://svn.apache.org/viewvc?rev=1835841&view=rev
Log:
UIMA-5822 Delay first work request

Modified:
    uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
    uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java

Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java?rev=1835841&r1=1835840&r2=1835841&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/main/PullService.java Fri Jul 13 14:04:33 2018
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -67,10 +67,10 @@ public class PullService implements ISer
 	private String clientURL;
 	private IRegistryClient registryClient;
 	// ******************************************
-	
+
 	// internal error handler
 	private IServiceErrorHandler errorHandler=null;
-	// 
+	//
 	private IServiceMonitor serviceMonitor=null;
 	// internal transport to communicate with remote client
 	private IServiceTransport transport=null;
@@ -84,12 +84,12 @@ public class PullService implements ISer
 	// holds Future to every process thread
 	private List<Future<String>> threadHandleList =
 			new ArrayList<>();
-	
+
 	private Lock initLock = new ReentrantLock();
-	
-	public PullService(String type) { 
+
+	public PullService(String type) {
 		this.type = type;
-		
+
 	}
 	public String getType() {
 		return type;
@@ -134,12 +134,12 @@ public class PullService implements ISer
 		registryClient = new DefaultRegistryClient(target);
 
 	}
-	
+
 	@Override
 	public void initialize() throws ServiceInitializationException {
 		// only one thread can call this method
 		initLock.lock();
-		
+
 		try {
 			if ( initialized ) {
 				// Already initialized
@@ -179,13 +179,13 @@ public class PullService implements ISer
 					   .withDoneLatch(stopLatch)
 					   .withInitCompleteLatch(threadsReady)
 					   .build();
-					   
-			
-			// first initialize Processors. The ServiceThreadFactory creates 
+
+
+			// first initialize Processors. The ServiceThreadFactory creates
 			// as many threads as defined in 'scaleout'
-			threadPool = 
+			threadPool =
 					new ScheduledThreadPoolExecutor(scaleout, new ServiceThreadFactory());
-			
+
 	    	// Create and start worker threads that pull Work Items from a client.
 			// Each worker thread calls processor.initialize() and counts down the
 			// 'threadsReady' latch. When all threads finish initializing they all
@@ -198,10 +198,10 @@ public class PullService implements ISer
 
 			initializeMonitor();
 			initializeTransport();
-			
+
 			initialized = true;
-			
-			
+
+
 		} catch( ServiceInitializationException e) {
 			throw e;
 		} catch( InterruptedException e) {
@@ -210,13 +210,13 @@ public class PullService implements ISer
 			throw new ServiceInitializationException("Service interrupted during initialization - shutting down process threads");
 		} catch( Exception e) {
 			throw new ServiceInitializationException("",e);
-		}  
+		}
 		finally {
 			initLock.unlock();
 		}
 
 	}
-	
+
 	@Override
 	public void start() throws IllegalStateException, ExecutionException, ServiceException {
 		if ( !initialized ) {
@@ -252,7 +252,7 @@ public class PullService implements ISer
 		stopTransport();
 		stopProtocolHandler();
 		stopServiceProcessor();
-        // monitor should be stopped last to keep posting updates to observer		
+        // monitor should be stopped last to keep posting updates to observer
 		stopMonitor();
 	}
 
@@ -260,8 +260,8 @@ public class PullService implements ISer
 		for (Future<String> future : threadHandleList) {
 			// print the return value of Future, notice the output delay in console
 			// because Future.get() waits for task to get completed
-			logger.log(Level.INFO,
-					"Thread:" + Thread.currentThread().getName() + " Terminated " + new Date() + "::" + future.get());
+			String result = future.get();
+			logger.log(Level.INFO, "Thread:" + Thread.currentThread().getName() + " Terminated " + new Date() + "::" + result);
 		}
 	}
 
@@ -307,7 +307,7 @@ public class PullService implements ISer
 		}
 	}
 	private void stopProtocolHandler() {
-		
+
 	}
 	private void stopTransport() {
 		transport.stop();

Modified: uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java?rev=1835841&r1=1835840&r2=1835841&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-pullservice/src/main/java/org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.java Fri Jul 13 14:04:33 2018
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,8 +44,8 @@ import org.apache.uima.util.Level;
 import org.apache.uima.util.Logger;
 
 /**
- * 
- * This protocol handler is a Runnable 
+ *
+ * This protocol handler is a Runnable
  *
  */
 public class DefaultServiceProtocolHandler implements IServiceProtocolHandler {
@@ -66,13 +66,13 @@ public class DefaultServiceProtocolHandl
 	private IService service;
 	// forces process threads to initialize serially
 	private ReentrantLock initLock = new ReentrantLock();
-	
+
 	private static AtomicInteger idGenerator = new AtomicInteger();
 
-	
-	private DefaultServiceProtocolHandler(Builder builder) { 
-		this.initLatch = builder.initLatch; 
-		this.stopLatch = builder.stopLatch; 
+
+	private DefaultServiceProtocolHandler(Builder builder) {
+		this.initLatch = builder.initLatch;
+		this.stopLatch = builder.stopLatch;
 		this.service = builder.service;
 		this.transport = builder.transport;
 		this.processor = builder.processor;
@@ -138,7 +138,7 @@ public class DefaultServiceProtocolHandl
 				throw new TransportException("Received invalid content (null) in response from client - rejecting request");
 			}
 			o = XStreamUtils.unmarshall(content);
-			
+
 		} catch ( Exception e) {
 			if ( !running ) {
 				throw new TransportException("Service stopping - rejecting request");
@@ -171,7 +171,7 @@ public class DefaultServiceProtocolHandl
 	}
 
 	private IMetaTaskTransaction callGet(IMetaTaskTransaction transaction) throws Exception {
-		transaction.setType(Type.Get); 
+		transaction.setType(Type.Get);
 		if ( logger.isLoggable(Level.FINE)) {
 			logger.log(Level.FINE, "ProtocolHandler calling GET");
 		}
@@ -179,7 +179,7 @@ public class DefaultServiceProtocolHandl
 	}
 	/**
 	 * Block until service start() is called
-	 * 
+	 *
 	 * @throws ServiceInitializationException
 	 */
 	private void awaitStart() throws ServiceInitializationException {
@@ -194,19 +194,25 @@ public class DefaultServiceProtocolHandl
 		// we may fail in initialize() in which case the ServiceInitializationException
 		// is thrown
 		initialize();
-		
+
 		// now wait for application to call start
 		awaitStart();
-		
+
 		// all threads intialized, enter running state
 
 		IMetaTaskTransaction transaction = null;
-		
+
 		if ( logger.isLoggable(Level.INFO)) {
 			logger.log(Level.INFO, ".............. Thread "+Thread.currentThread().getId() + " ready to process");
 		}
 
-		
+		logger.log(Level.INFO, "Wait for the initialized state to propagate to the SM " +
+		          "so any processing errors are not treates as initialization failures");
+		try {
+			Thread.sleep(30000);
+		} catch (InterruptedException e1) {
+		};
+
 		while (running) {
 
 			try {
@@ -218,13 +224,13 @@ public class DefaultServiceProtocolHandl
 					break;
 				}
 				if (transaction.getMetaTask() == null || transaction.getMetaTask().getUserSpaceTask() == null ) {
-					// the client has no tasks to give. 
+					// the client has no tasks to give.
 					noTaskStrategy.handleNoTaskSupplied();
 					continue;
 				}
 				Object task = transaction.getMetaTask().getUserSpaceTask();
-				
-				// send ACK 
+
+				// send ACK
 				transaction = callAck(transaction);
 				if (!running) {
 					break;
@@ -259,8 +265,8 @@ public class DefaultServiceProtocolHandl
 					}).start();
 					running = false;
 				}
-					
-				
+
+
 
 			} catch( IllegalStateException e) {
 				break;
@@ -269,14 +275,14 @@ public class DefaultServiceProtocolHandl
 			}
 			catch (Exception e) {
 				logger.log(Level.WARNING,"",e);
-			} 		
+			}
 		}
 		stopLatch.countDown();
 		logger.log(Level.INFO,"ProtocolHandler terminated");
 		return String.valueOf(Thread.currentThread().getId());
 	}
 
-	
+
 	private void delegateStop() {
 		service.stop();
 	}
@@ -302,8 +308,8 @@ public class DefaultServiceProtocolHandl
 	public void setTransport(IServiceTransport transport) {
 		this.transport = transport;
 	}
-	
-	
+
+
 	 public static class Builder {
 			private IServiceTransport transport;
 			private IServiceProcessor processor;
@@ -320,15 +326,15 @@ public class DefaultServiceProtocolHandl
 			public Builder withProcessor(IServiceProcessor processor) {
 				this.processor = processor;
 				return this;
-			}			
+			}
 			public Builder withInitCompleteLatch(CountDownLatch initLatch) {
 				this.initLatch = initLatch;
 				return this;
-			}			
+			}
 			public Builder withDoneLatch(CountDownLatch stopLatch) {
 				this.stopLatch = stopLatch;
 				return this;
-			}			
+			}
 			public Builder withNoTaskStrategy(INoTaskAvailableStrategy strategy) {
 				this.strategy = strategy;
 				return this;