You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by bu...@apache.org on 2017/09/21 20:39:40 UTC

svn commit: r1809225 - in /uima/uima-ducc/trunk: uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/ uima-ducc-container/src/main/java/org/apache/uima/ducc/cont...

Author: burn
Date: Thu Sep 21 20:39:39 2017
New Revision: 1809225

URL: http://svn.apache.org/viewvc?rev=1809225&view=rev
Log:
UIMA-5551 Added optional registry support for pull services

Added:
    uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java   (with props)
    uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java   (with props)
    uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java   (with props)
Modified:
    uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/DuccServiceDriver.java
    uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/ServiceDriver.java
    uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/task/transport/HttpTaskTransportHandler.java
    uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
    uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
    uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
    uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceConfiguration.java

Added: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java?rev=1809225&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java Thu Sep 21 20:39:39 2017
@@ -0,0 +1,66 @@
+package org.apache.uima.ducc.container.sd;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ * 
+ * Loads a configuration properties file from the classpath used to 
+ * connect pull services to their clients via a registry.
+ * e.g. the Incremental Ingestion services:
+ *    -  TAS, Note & Summary need to find the DB
+ *    -  Note & Summary need to find TAS
+ *    -  DB and TAS must register their locations.
+ *
+ * Replaces placeholders of the form ${environment-variable}
+ */
+public class ConfigurationProperties {
+
+  private static final String service_configuration_property = "ducc.service.configuration";
+  private static ConfigurationProperties instance = null;
+  private Properties props;
+  
+  synchronized static public Properties getProperties() {
+    if (instance == null) {
+      instance = new ConfigurationProperties();
+    }
+    return instance.props;
+  }
+  
+  private ConfigurationProperties() {
+    String propertyFile = System.getProperty(service_configuration_property);
+    if (propertyFile == null) {
+      throw new IllegalArgumentException("Missing value for system property: " + service_configuration_property);
+    }
+    InputStream inputStream = ConfigurationProperties.class.getClassLoader().getResourceAsStream(propertyFile);
+    if (inputStream != null) {
+      props = new Properties();
+      try {
+        props.load(inputStream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      for ( Entry<Object, Object> entry : props.entrySet()) {
+        String value = (String) entry.getValue();
+        // Replace placeholders of the form ${env-var-name}
+        int i = value.indexOf("${");
+        if (i >= 0) {
+          do {
+            int j = value.indexOf('}', i);
+            if (j > 0) {
+              String envKey = value.substring(i+2, j);
+              String envValue = System.getenv(envKey);
+              value = value.substring(0,i) + envValue + value.substring(j+1);
+            }
+            i = value.indexOf("${", i+2);   // Check if more to replace
+          } while(i >= 0);
+          props.put(entry.getKey(), value);    // Update with expanded value
+        }
+      }
+    } else {
+      throw new RuntimeException("Failed to find " + propertyFile + " in the classpath");
+    }
+  }
+}

Propchange: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/DuccServiceDriver.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/DuccServiceDriver.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/DuccServiceDriver.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/DuccServiceDriver.java Thu Sep 21 20:39:39 2017
@@ -97,9 +97,9 @@ public class DuccServiceDriver implement
 			
 			protocolHandler = new DuccServiceTaskProtocolHandler(taskAllocator);
 			try {
-				
+				logger.log(Level.INFO, "Initializing protocol handler ...");
 				protocolHandler.initialize(properties);
-				logger.log(Level.INFO, "... Protocol handler initialized ...");
+				logger.log(Level.INFO, "Initializing transport ...");
 				transport.setTaskProtocolHandler(protocolHandler);
 				transport.initialize(properties);
 				

Added: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java?rev=1809225&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java Thu Sep 21 20:39:39 2017
@@ -0,0 +1,66 @@
+package org.apache.uima.ducc.container.sd;
+
+/**
+ * Maintains a registry of services and the details needed to provide access. The format of the
+ * details is application specific, e.g. could be simply "host:port"
+ *
+ */
+public interface ServiceRegistry {
+
+  /**
+   * Establish connection to the specified registry
+   * 
+   * @param location
+   *          - The location of the registry
+   * @return - true if connection succeeds
+   */
+  boolean initialize(String location);
+  
+  /**
+   * Register an instance of the service along with application-specific details
+   *
+   * @param name
+   *          - The name of the service
+   * @param address
+   *          - The address/url of the service instance
+   * @param details
+   *          - Extra details about the instance
+   *
+   * @return - null or the previous details if this address was already registered
+   */
+  String register(String name, String address, String details);
+
+  /**
+   * Queries all registered instances. Returns an array of instances, each holding a 2-element array
+   * holding the address & details
+   *
+   * @param name
+   *          - service name
+   * @return - a Nx2 2-D array of addresses & details
+   */
+  String[][] query(String name);
+
+  /**
+   * Fetches the address of a service instance, blocks if none available. 
+   * If more than 1 is available chooses which to return using an appropriate algorithm,
+   * e.g. least-used or round-robin or random or ...
+   *
+   * @param name
+   *          - service name
+   * @return - address
+   */
+  String fetch(String name);
+
+  /**
+   * Remove an entry.
+   *
+   * @param name
+   *          - service name
+   * @param address
+   *          - Indicates which instance to remove.
+   *
+   * @return - true if succeeds
+   */
+  boolean unregister(String name, String address);
+
+}
\ No newline at end of file

Propchange: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java?rev=1809225&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java Thu Sep 21 20:39:39 2017
@@ -0,0 +1,225 @@
+package org.apache.uima.ducc.container.sd;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements a simple service registry using the shared filesystem. 
+ * 
+ * A registered entry consists of:
+ *  - a folder named for for the service in a shared base registry directory. 
+ *  - a file in this folder whose name represents the address of the service instance,  
+ *    holding any additional details about the service instance.
+ *    
+ * The service name and address are encoded into valid filenames.
+ * A shutdown hook is created so entries are removed during normal process termination.
+ *
+ * Entries can only be be removed by their creator. Entries should have the same visibility as the
+ * base registry directory. 
+ *
+ * NOTE: Alternative implementations could be supported by putting a class with the same name 
+ * earlier in the classpath, or by replacing this one with a proxy that loads an arbitrary class.
+ * 
+ */
+public class ServiceRegistry_impl implements ServiceRegistry {
+
+  Map<String, Map<String, String>> serviceNameMap = new HashMap<String, Map<String, String>>();
+
+  Map<String, String> serviceMap;
+
+  private File registryDir;
+
+  private static ServiceRegistry instance = new ServiceRegistry_impl();
+  
+  public static ServiceRegistry getInstance() {
+    return instance;
+  }
+  
+  public boolean initialize(String location) {
+    registryDir = new File(location);
+    if (!registryDir.exists()) {
+      registryDir.mkdir();
+    }
+    return registryDir.canWrite();
+  }
+
+  // Create file "address" holding "details"
+  // Return previous value if already registered, or null if not.
+  @Override
+  public String register(String name, String address, String details) {
+    String prevDetails = null;
+    File serviceDir = new File(registryDir, encode(name));
+    if (!serviceDir.exists()) {
+      serviceDir.mkdir();
+    }
+    // Could be a race condition if the dir is deleted at this time
+    File instanceFile = new File(serviceDir, encode(address));
+    if (instanceFile.exists()) {
+      try (BufferedReader reader = new BufferedReader(new FileReader(instanceFile))) {
+        prevDetails = reader.readLine();
+      } catch (IOException e) {
+        System.err.println("ERROR: Failed to read previous details when updating registry file: "
+                + instanceFile.getAbsolutePath() + " - " + e);
+        prevDetails = e.toString();
+      }
+    }
+    try (PrintWriter writer = new PrintWriter(instanceFile)) {
+      writer.println(details);
+    } catch (FileNotFoundException e) {
+      System.err.println("ERROR: Failed to create registry file: " + instanceFile.getAbsolutePath()
+              + " - " + e);
+    }
+
+    // Ensure that this instance is unregistered when process ends
+    Runtime.getRuntime().addShutdownHook(new UnregisterHook(this, name, address));
+    
+    return prevDetails;
+  }
+
+  // Return first address or block if none available
+  @Override
+  public synchronized String fetch(String name) {
+    
+    File serviceDir = new File(registryDir, encode(name));
+    while(true) {
+      if (serviceDir.exists()) {
+        File[] files = serviceDir.listFiles();
+        if (files.length > 0) {
+          return decode(files[0].getName());
+        }
+      }
+      try {
+        System.out.println("!! fetch will try "+name+" again in 15 secs");
+        Thread.sleep(15000);
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  // Return an array of 2-element arrays holding address & details
+  @Override
+  public String[][] query(String name) {
+    File serviceDir = new File(registryDir, encode(name));
+    if (!serviceDir.exists()) {
+      return new String[0][];
+    }
+    File[] files = serviceDir.listFiles();
+    String[][] addrs = new String[files.length][];
+    for (int i = 0; i < addrs.length; ++i) {
+      String details;
+      try (BufferedReader reader = new BufferedReader(new FileReader(files[i]))) {
+        details = reader.readLine();
+      } catch (IOException e) {
+        details = "ERROR: Failed to read instanceId from registry file: " + files[i].getAbsolutePath() + " - " + e;
+      }
+      String[] adPair = new String[2];
+      adPair[0] = decode(files[i].getName());
+      adPair[1] = details;
+      addrs[i] = adPair;
+    }
+    return addrs;
+  }
+
+  // Remove the file "name/address"
+  // Remove the directory when no instances left
+  @Override
+  public boolean unregister(String name, String address) {
+    File serviceDir = new File(registryDir, encode(name));
+    if (serviceDir.exists()) {
+      File instanceFile = new File(serviceDir, encode(address));
+      if (instanceFile.exists()) {
+        instanceFile.delete();
+        if (serviceDir.list().length == 0) {
+          // Could be a race if another thread is about to create an entry !
+          serviceDir.delete();
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // Encode/decode characters that are illegal in filenames.
+  // Only '/' is illegal but must also %-encode '%'
+  static String[] badChars = { "%", "%25", "/", "%2F" };
+
+  private String encode(String name) {
+    for (int i = 0; i < badChars.length; i += 2) {
+      name = name.replaceAll(badChars[i], badChars[i + 1]);
+    }
+    return name;
+  }
+
+  private String decode(String name) {
+    for (int i = 0; i < badChars.length; i += 2) {
+      name = name.replaceAll(badChars[i + 1], badChars[i]);
+    }
+    return name;
+  }
+
+  private class UnregisterHook extends Thread {
+    private ServiceRegistry registry;
+    private String name;
+    private String address;
+    UnregisterHook(ServiceRegistry registry, String name, String address) {
+      this.registry = registry;
+      this.name = name;
+      this.address = address;
+    }
+    public void run() {
+      registry.unregister(name, address);
+      System.out.println("!! Unregistered " + address + " in shotdown hook");
+    }
+  }
+
+  /* ================================================================================= */
+
+  public static void main(String[] args) throws InterruptedException {
+    if (args.length != 3) {
+      System.out.println("Usage: service-name address-prefix num");
+      return;
+    }
+    String name = args[0];
+    ServiceRegistry reg = ServiceRegistry_impl.getInstance();
+    String[][] addrs = reg.query(name);
+    System.out.println("Service " + name + " has " + addrs.length + " instances");
+
+    int num = Integer.valueOf(args[2]);
+    for (int i = 0; i < num; ++i) {
+      reg.register(name, args[1] + i, "100" + i);
+      addrs = reg.query(name);
+      System.out.println("Service " + args[0] + " has " + addrs.length + " instances");
+      for (String[] addrDetails : addrs) {
+        System.out.println("   addr: " + addrDetails[0] + " details: " + addrDetails[1]);
+      }
+    }
+
+    String address = reg.fetch(name);
+    System.out.println("fetch returned: " + address);
+    
+    System.out.println("Sleeping for 30 secs");
+    Thread.sleep(30000);
+    for (int i = 1; i < num + 1; ++i) {
+      String addr = args[1] + i;
+      boolean ok = reg.unregister(name, addr);
+      System.out.println("Unregistered " + addr + " was " + ok);
+    }
+    
+    address = reg.fetch(name);
+    System.out.println("fetch returned: " + address);
+    
+    String addr = args[1] + 0;
+    reg.unregister(name, addr);
+    
+    address = reg.fetch(name);
+    System.out.println("fetch returned: " + address);
+    
+  }
+
+}

Propchange: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/ServiceDriver.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/ServiceDriver.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/ServiceDriver.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/ServiceDriver.java Thu Sep 21 20:39:39 2017
@@ -29,6 +29,8 @@ public interface ServiceDriver extends L
 	public static String Application = "driver.application.name";
 	public static String Port = "driver.server.port";
 	public static String MaxThreads = "driver.server.max.threads";
+	public static String Registry = "registry";
+	// Perhaps should move all these keys into the shared ConfigurationProperties?
 
 	public void initialize(Properties props) throws DriverException;
 	public TaskAllocatorCallbackListener getTaskAllocator();

Modified: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/task/transport/HttpTaskTransportHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/task/transport/HttpTaskTransportHandler.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/task/transport/HttpTaskTransportHandler.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/task/transport/HttpTaskTransportHandler.java Thu Sep 21 20:39:39 2017
@@ -32,6 +32,8 @@ import org.apache.uima.UIMAFramework;
 import org.apache.uima.ducc.common.utils.XStreamUtils;
 import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
 import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction;
+import org.apache.uima.ducc.container.sd.ServiceRegistry;
+import org.apache.uima.ducc.container.sd.ServiceRegistry_impl;
 import org.apache.uima.ducc.container.sd.iface.ServiceDriver;
 import org.apache.uima.ducc.container.sd.task.error.TaskProtocolException;
 import org.apache.uima.ducc.container.sd.task.iface.TaskProtocolHandler;
@@ -54,6 +56,7 @@ public class HttpTaskTransportHandler im
     private volatile boolean running = false;
     // mux is used to synchronize start()
     private Object mux = new Object();
+    
 	public HttpTaskTransportHandler() {
 	}
 
@@ -87,8 +90,8 @@ public class HttpTaskTransportHandler im
 		}
 	}
 
-	public Server createServer(int httpPort, int maxThreads, String app,
-			TaskProtocolHandler handler) throws Exception {
+  public Server createServer(int httpPort, int maxThreads, String app, TaskProtocolHandler handler, String registryAddr) 
+          throws Exception {
 
 		// Server thread pool
 		QueuedThreadPool threadPool = new QueuedThreadPool();
@@ -112,9 +115,30 @@ public class HttpTaskTransportHandler im
 		context.setContextPath("/");
 		server.setHandler(context);
 
-		context.addServlet(new ServletHolder(new TaskHandlerServlet(handler)),
-				app);
-		logger.log(Level.INFO,"Service Driver URL: "+ context.getServer().getURI().toString()+httpPort+app);//"Jetty URL: http://localhost:"+httpPort+app);
+		context.addServlet(new ServletHolder(new TaskHandlerServlet(handler)), "/"+app);
+
+    // Establish the URL we could register for our customers
+    String taskUrl = server.getURI().toString();
+    if (taskUrl.endsWith("/")) {
+      taskUrl = taskUrl.substring(0, taskUrl.length() - 1);
+    }
+    taskUrl += ":" + httpPort + "/" + app;
+    logger.log(Level.INFO, "Service Driver URL: " + taskUrl); // e.g. http://localhost:8888/test");
+
+    // Register the task allocator's URL if a registry is specified
+    // The type of registry is determined by the registry class.
+    
+    String taskServerName = app;   // why not?
+    if (registryAddr != null) {
+      ServiceRegistry registry = ServiceRegistry_impl.getInstance();
+      if (registry.initialize(registryAddr)) {
+        registry.register(taskServerName, taskUrl, "");   // Will also create a shutdown hook to unregister
+        logger.log(Level.INFO,"Registered: " + taskServerName);
+      }
+    } else {
+      logger.log(Level.WARNING, "Registration skipped - registry=" + registryAddr + " server="+taskServerName);
+    }
+
 		return server;
 	}
 
@@ -122,12 +146,11 @@ public class HttpTaskTransportHandler im
 	public void initialize(Properties properties) throws TaskTransportException {
 		// TODO Auto-generated method stub
 		// Max cores
-		int cores = Runtime.getRuntime().availableProcessors();
-		String portString = (String) properties.get(ServiceDriver.Port);
-		String maxThreadsString = (String) properties
-				.get(ServiceDriver.MaxThreads);
-		String appName = (String) properties
-				.get(ServiceDriver.Application);
+    int cores = Runtime.getRuntime().availableProcessors();
+    String portString = (String) properties.get(ServiceDriver.Port);
+    String maxThreadsString = (String) properties.get(ServiceDriver.MaxThreads);
+    String appName = (String) properties.get(ServiceDriver.Application);
+    String registry = (String) properties.get(ServiceDriver.Registry);    // optional
 
 		int maxThreads = cores;
 		int httpPort = -1;
@@ -152,13 +175,12 @@ public class HttpTaskTransportHandler im
 			}
 		}
 		if (appName == null) {
-			throw new TaskTransportException("The required "
-					+ ServiceDriver.Application
-					+ " property is not specified");
+		  appName = "test";
+		  logger.log(Level.WARNING, "The "+ServiceDriver.Application+" property is not specified - using "+appName);
 		}
 		try {
 			// create and initialize Jetty Server
-			server = createServer(httpPort, maxThreads, appName, taskProtocolHandler);
+			server = createServer(httpPort, maxThreads, appName, taskProtocolHandler, registry);
 		} catch (Exception e) {
 			throw new TaskTransportException(e);
 		}

Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Thu Sep 21 20:39:39 2017
@@ -19,9 +19,9 @@
 package org.apache.uima.ducc.transport.configuration.jp;
 import java.io.InvalidClassException;
 import java.lang.management.ManagementFactory;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Properties;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -39,7 +39,6 @@ import org.apache.http.entity.StringEnti
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
 import org.apache.http.impl.pool.BasicConnPool;
-import org.apache.http.impl.pool.BasicPoolEntry;
 import org.apache.http.util.EntityUtils;
 import org.apache.uima.ducc.common.IDuccUser;
 import org.apache.uima.ducc.common.NodeIdentity;
@@ -52,6 +51,10 @@ import org.apache.uima.ducc.container.ne
 import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
 import org.apache.uima.ducc.container.net.impl.PerformanceMetrics;
 import org.apache.uima.ducc.container.net.impl.TransactionId;
+import org.apache.uima.ducc.container.sd.ConfigurationProperties;
+import org.apache.uima.ducc.container.sd.ServiceRegistry;
+import org.apache.uima.ducc.container.sd.ServiceRegistry_impl;
+import org.apache.uima.ducc.container.sd.iface.ServiceDriver;
 
 public class DuccHttpClient {
 	private DuccLogger logger = new DuccLogger(DuccHttpClient.class);
@@ -68,21 +71,54 @@ public class DuccHttpClient {
 	private int ClientMaxConnections = 0;
 	private int ClientMaxConnectionsPerRoute = 0;
 	private int ClientMaxConnectionsPerHostPort = 0;
-     
-    public DuccHttpClient(JobProcessComponent duccComponent) {
-    	 this.duccComponent = duccComponent;
-    }
+  private ServiceRegistry registry = null;
+  private String taskServerName;
+  private Properties config = null;
+	
+  public DuccHttpClient(JobProcessComponent duccComponent) {
+    this.duccComponent = duccComponent;
+  }
+  
 	public void setScaleout(int scaleout) {
 		connPool.setMaxTotal(scaleout);
 		connPool.setDefaultMaxPerRoute(scaleout);
 		connPool.setMaxPerRoute(host, scaleout);
 	}
+	
+	// If no registry use the url in the system properties, e.g. JD/JP case
+	// The fetch should return one of the values saved by the most recent notification.
+	// but should block if no instance currently registered.
 	public String getJdUrl() {
-		return jdUrl;
+	  if (registry == null) {
+	    return jdUrl;
+	  }
+	  String address = registry.fetch(taskServerName);   // Will block if none registered
+	  logger.info("getJdUrl", null, "Registry entry for", taskServerName, "is", address);
+	  return address;
 	}
 	
 	public void initialize(String jdUrl) throws Exception {
-		this.jdUrl = jdUrl;
+
+	  // If not specified get the url from the registry
+	  if (jdUrl == null || jdUrl.isEmpty()) {
+	    config = ConfigurationProperties.getProperties();    // Holds registry details AND service.type
+	    String registryLocn = config.getProperty(ServiceDriver.Registry);
+	    taskServerName = config.getProperty(ServiceDriver.Application);
+	    if (registryLocn != null && taskServerName != null) {
+	      registry = ServiceRegistry_impl.getInstance();
+	      if (!registry.initialize(registryLocn)) {
+	        registry = null;
+	      }
+	    } 
+	    if (registry == null) {
+	      throw new RuntimeException("Failed to connect to registry at "+registryLocn+" to locate server "+taskServerName);
+	    }
+	    logger.info("initialize", null, "Using registry at", registryLocn, "to locate server", taskServerName);
+	    jdUrl = getJdUrl();
+	  }
+	  this.jdUrl = jdUrl;
+		
+		logger.info("initialize", null, "Found jdUrl =", jdUrl);
 		
 		int pos = jdUrl.indexOf("//");
         int ipEndPos = jdUrl.indexOf(":", pos);
@@ -169,12 +205,13 @@ public class DuccHttpClient {
 		return nn;
 	}
 	private String getProcessName() {
-	  String pn = System.getProperty("UimaRequestServiceType");
-	  if (pn == null) {
-	    pn = System.getenv(IDuccUser.EnvironmentVariable.DUCC_ID_PROCESS.value());
+	  String pn = System.getenv(IDuccUser.EnvironmentVariable.DUCC_ID_PROCESS.value());
+	  if (config != null && config.containsKey("service.type")) {
+	    pn = config.getProperty("service.type");  // Indicates the type of service request
 	  }
 		return pn;
 	}
+	
     private void addCommonHeaders( IMetaCasTransaction transaction ) {
     	String location = "addCommonHeaders";
     	transaction.setRequesterAddress(getIP());
@@ -281,19 +318,25 @@ public class DuccHttpClient {
 			}
 		} 
 	}
+
 	private HttpResponse retryUntilSuccessfull(IMetaCasTransaction transaction, HttpPost postMethod) throws Exception {
         HttpResponse response=null;
 		// Only one thread attempts recovery. Other threads will block here
 		// until connection to the remote is restored. 
+		logger.error("retryUntilSucessfull", null, "Connection Lost to", postMethod.getURI(), "- Retrying Until Successful ...");
    		lock.lock();
-   		logger.error("retryUntilSucessfull", null, "Thread:"+Thread.currentThread().getId()+" - Connection Lost to "+postMethod.getURI()+" - Retrying Until Successfull ...");
 
          // retry indefinitely
 		while( duccComponent.isRunning() ) {
        		try {
        			// retry the command
+       		  jdUrl = getJdUrl();
+       		  URI jdUri = new URI(jdUrl);
+       		  postMethod.setURI(jdUri);
+       		  logger.warn("retryUntilSucessfull", null, "Trying to connect to", jdUrl);
        			response = httpClient.execute(postMethod);
-       			logger.error("retryUntilSucessfull", null, "Thread:"+Thread.currentThread().getId()+" Recovered Connection ...");
+       			logger.warn("retryUntilSucessfull", null, "Recovered Connection");
+
        			// success, so release the lock so that other waiting threads
        			// can retry command
        		    if ( lock.isHeldByCurrentThread()) {
@@ -305,12 +348,14 @@ public class DuccHttpClient {
        		} catch( HttpHostConnectException exx ) {
        			// Connection still not available so sleep awhile
        			synchronized(postMethod) {
+       			  logger.warn("retryUntilSucessfull", null, "Connection failed - retry in", duccComponent.getThreadSleepTime()/1000, "secs");
        				postMethod.wait(duccComponent.getThreadSleepTime());
        			}
        		}
         }
 		return response;
 	}
+
 	public static void main(String[] args) {
 		try {
 			HttpPost postMethod = new HttpPost(args[0]);
@@ -327,7 +372,7 @@ public class DuccHttpClient {
 			// HTTP request. HttpClient actually enforces this. So
 			// do a POST instead of a GET.
 			transaction.setType(Type.Get);  // Tell JD you want a Work Item
-			String command = Type.Get.name();
+			//String command = Type.Get.name();
 	    	System.out.println("HttpWorkerThread.run() "+ "Thread Id:"+Thread.currentThread().getId()+" Requesting next WI from JD");;
 			// send a request to JD and wait for a reply
 	    	transaction = client.execute(transaction, postMethod);
@@ -337,7 +382,7 @@ public class DuccHttpClient {
 				System.out.println("CAS:"+transaction.getMetaCas().getUserSpaceCas());
 	    		// Confirm receipt of the CAS. 
 				transaction.setType(Type.Ack);
-				command = Type.Ack.name();
+				//command = Type.Ack.name();
 				tid = new TransactionId(seq.incrementAndGet(), minor++);
 				transaction.setTransactionId(tid);
 				System.out.println("run  Thread:"+Thread.currentThread().getId()+" Sending ACK request - WI:"+transaction.getMetaCas().getSystemKey());
@@ -351,7 +396,7 @@ public class DuccHttpClient {
 
 	        }
 			transaction.setType(Type.End);
-			command = Type.End.name();
+			//command = Type.End.name();
 			tid = new TransactionId(seq.incrementAndGet(), minor++);
 			transaction.setTransactionId(tid);
 			IPerformanceMetrics metricsWrapper =
@@ -369,4 +414,4 @@ public class DuccHttpClient {
 		}
 	}
 	
-}
\ No newline at end of file
+}

Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Thu Sep 21 20:39:39 2017
@@ -218,16 +218,16 @@ public class HttpWorkerThread implements
 						logger.info("run", null, "T["+Thread.currentThread().getId()+"] - Regained Connection to JD");
 					}
                     
+					// If the client did not provide a Work Item, reduce frequency of Get requests
+					// by sleeping in between Get's. Synchronize so only one thread is polling for work
+
                     // if the JD did not provide a Work Item, most likely the CR is
 					// done. In such case, reduce frequency of Get requests
 					// by sleeping in between Get's. Eventually the OR will 
 					// deallocate this process and the thread will exit
 					if ( transaction.getMetaCas() == null || transaction.getMetaCas().getUserSpaceCas() == null) {
-    					logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd Response: JD is presently out of work items. Will retry in "+duccComponent.getThreadSleepTime()/1000+" seconds.");
+					  logger.info("run", null, "Client is out of work - will retry quietly every",duccComponent.getThreadSleepTime()/1000,"secs.");
     					
-						// the JD says there are no more WIs. Sleep awhile
-						// do a GET in case JD changes its mind. The JP will
-						// eventually be stopped by the agent
     			  // Retry at the start of this block as another thread may have just exited with work
     				// so the TAS (or JD) may now have a lot of work.	
  						synchronized (HttpWorkerThread.class) {

Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Thu Sep 21 20:39:39 2017
@@ -277,13 +277,13 @@ implements IJobProcessor{
 				
 				getLogger().info("start", null,"Ducc JP JobType="+jobType);
 				httpClient = new DuccHttpClient(this);
-				String jdURL="";
+				String jdURL = "";
 				try {
 					jdURL = System.getProperty(FlagsHelper.Name.JdURL.pname());
-					// initialize http client. It tests the connection and fails
-					// if unable to connect
+					// Test the connection and fail if unable to connect
+					// Gets the url from the registry if not in the system properties
 					httpClient.initialize(jdURL);
-					logger.info("start", null,"The JP Connected To JD Using URL "+jdURL);
+					logger.info("start", null,"The JP Connected To JD Using URL "+httpClient.getJdUrl());
 				} catch( Exception ee ) {
 					if ( ee.getCause() != null && ee instanceof java.net.ConnectException ) {
 						logger.error("start", null, "JP Process Unable To Connect to the JD Using Provided URL:"+jdURL+" Unable to Continue - Shutting Down JP");

Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceConfiguration.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceConfiguration.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceConfiguration.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceConfiguration.java Thu Sep 21 20:39:39 2017
@@ -31,7 +31,6 @@ import org.apache.uima.ducc.common.conta
 import org.apache.uima.ducc.common.utils.Utils;
 import org.apache.uima.ducc.transport.DuccExchange;
 import org.apache.uima.ducc.transport.DuccTransportConfiguration;
-import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
 import org.apache.uima.ducc.transport.configuration.jp.AgentSession;
 import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
 import org.springframework.beans.factory.annotation.Autowired;