You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uima.apache.org by bu...@apache.org on 2017/09/21 20:39:40 UTC
svn commit: r1809225 - in /uima/uima-ducc/trunk:
uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/
uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/
uima-ducc-container/src/main/java/org/apache/uima/ducc/cont...
Author: burn
Date: Thu Sep 21 20:39:39 2017
New Revision: 1809225
URL: http://svn.apache.org/viewvc?rev=1809225&view=rev
Log:
UIMA-5551 Added optional registry support for pull services
Added:
uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java (with props)
uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java (with props)
uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java (with props)
Modified:
uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/DuccServiceDriver.java
uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/ServiceDriver.java
uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/task/transport/HttpTaskTransportHandler.java
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceConfiguration.java
Added: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java?rev=1809225&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java Thu Sep 21 20:39:39 2017
@@ -0,0 +1,66 @@
+package org.apache.uima.ducc.container.sd;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ *
+ * Loads a configuration properties file from the classpath used to
+ * connect pull services to their clients via a registry.
+ * e.g. the Incremental Ingestion services:
+ * - TAS, Note & Summary need to find the DB
+ * - Note & Summary need to find TAS
+ * - DB and TAS must register their locations.
+ *
+ * Replaces placeholders of the form ${environment-variable}
+ */
+public class ConfigurationProperties {
+
+ private static final String service_configuration_property = "ducc.service.configuration";
+ private static ConfigurationProperties instance = null;
+ private Properties props;
+
+ synchronized static public Properties getProperties() {
+ if (instance == null) {
+ instance = new ConfigurationProperties();
+ }
+ return instance.props;
+ }
+
+ private ConfigurationProperties() {
+ String propertyFile = System.getProperty(service_configuration_property);
+ if (propertyFile == null) {
+ throw new IllegalArgumentException("Missing value for system property: " + service_configuration_property);
+ }
+ InputStream inputStream = ConfigurationProperties.class.getClassLoader().getResourceAsStream(propertyFile);
+ if (inputStream != null) {
+ props = new Properties();
+ try {
+ props.load(inputStream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ for ( Entry<Object, Object> entry : props.entrySet()) {
+ String value = (String) entry.getValue();
+ // Replace placeholders of the form ${env-var-name}
+ int i = value.indexOf("${");
+ if (i >= 0) {
+ do {
+ int j = value.indexOf('}', i);
+ if (j > 0) {
+ String envKey = value.substring(i+2, j);
+ String envValue = System.getenv(envKey);
+ value = value.substring(0,i) + envValue + value.substring(j+1);
+ }
+ i = value.indexOf("${", i+2); // Check if more to replace
+ } while(i >= 0);
+ props.put(entry.getKey(), value); // Update with expanded value
+ }
+ }
+ } else {
+ throw new RuntimeException("Failed to find " + propertyFile + " in the classpath");
+ }
+ }
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ConfigurationProperties.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/DuccServiceDriver.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/DuccServiceDriver.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/DuccServiceDriver.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/DuccServiceDriver.java Thu Sep 21 20:39:39 2017
@@ -97,9 +97,9 @@ public class DuccServiceDriver implement
protocolHandler = new DuccServiceTaskProtocolHandler(taskAllocator);
try {
-
+ logger.log(Level.INFO, "Initializing protocol handler ...");
protocolHandler.initialize(properties);
- logger.log(Level.INFO, "... Protocol handler initialized ...");
+ logger.log(Level.INFO, "Initializing transport ...");
transport.setTaskProtocolHandler(protocolHandler);
transport.initialize(properties);
Added: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java?rev=1809225&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java Thu Sep 21 20:39:39 2017
@@ -0,0 +1,66 @@
+package org.apache.uima.ducc.container.sd;
+
+/**
+ * Maintains a registry of services and the details needed to provide access. The format of the
+ * details is application specific, e.g. could be simply "host:port"
+ *
+ */
+public interface ServiceRegistry {
+
+ /**
+ * Establish connection to the specified registry
+ *
+ * @param location
+ * - The location of the registry
+ * @return - true if connection succeeds
+ */
+ boolean initialize(String location);
+
+ /**
+ * Register an instance of the service along with application-specific details
+ *
+ * @param name
+ * - The name of the service
+ * @param address
+ * - The address/url of the service instance
+ * @param details
+ * - Extra details about the instance
+ *
+ * @return - null or the previous details if this address was already registered
+ */
+ String register(String name, String address, String details);
+
+ /**
+ * Queries all registered instances. Returns an array of instances, each holding a 2-element array
+ * holding the address & details
+ *
+ * @param name
+ * - service name
+ * @return - a Nx2 2-D array of addresses & details
+ */
+ String[][] query(String name);
+
+ /**
+ * Fetches the address of a service instance, blocks if none available.
+ * If more than 1 is available chooses which to return using an appropriate algorithm,
+ * e.g. least-used or round-robin or random or ...
+ *
+ * @param name
+ * - service name
+ * @return - address
+ */
+ String fetch(String name);
+
+ /**
+ * Remove an entry.
+ *
+ * @param name
+ * - service name
+ * @param address
+ * - Indicates which instance to remove.
+ *
+ * @return - true if succeeds
+ */
+ boolean unregister(String name, String address);
+
+}
\ No newline at end of file
Propchange: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java?rev=1809225&view=auto
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java (added)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java Thu Sep 21 20:39:39 2017
@@ -0,0 +1,225 @@
+package org.apache.uima.ducc.container.sd;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements a simple service registry using the shared filesystem.
+ *
+ * A registered entry consists of:
+ * - a folder named for for the service in a shared base registry directory.
+ * - a file in this folder whose name represents the address of the service instance,
+ * holding any additional details about the service instance.
+ *
+ * The service name and address are encoded into valid filenames.
+ * A shutdown hook is created so entries are removed during normal process termination.
+ *
+ * Entries can only be be removed by their creator. Entries should have the same visibility as the
+ * base registry directory.
+ *
+ * NOTE: Alternative implementations could be supported by putting a class with the same name
+ * earlier in the classpath, or by replacing this one with a proxy that loads an arbitrary class.
+ *
+ */
+public class ServiceRegistry_impl implements ServiceRegistry {
+
+ Map<String, Map<String, String>> serviceNameMap = new HashMap<String, Map<String, String>>();
+
+ Map<String, String> serviceMap;
+
+ private File registryDir;
+
+ private static ServiceRegistry instance = new ServiceRegistry_impl();
+
+ public static ServiceRegistry getInstance() {
+ return instance;
+ }
+
+ public boolean initialize(String location) {
+ registryDir = new File(location);
+ if (!registryDir.exists()) {
+ registryDir.mkdir();
+ }
+ return registryDir.canWrite();
+ }
+
+ // Create file "address" holding "details"
+ // Return previous value if already registered, or null if not.
+ @Override
+ public String register(String name, String address, String details) {
+ String prevDetails = null;
+ File serviceDir = new File(registryDir, encode(name));
+ if (!serviceDir.exists()) {
+ serviceDir.mkdir();
+ }
+ // Could be a race condition if the dir is deleted at this time
+ File instanceFile = new File(serviceDir, encode(address));
+ if (instanceFile.exists()) {
+ try (BufferedReader reader = new BufferedReader(new FileReader(instanceFile))) {
+ prevDetails = reader.readLine();
+ } catch (IOException e) {
+ System.err.println("ERROR: Failed to read previous details when updating registry file: "
+ + instanceFile.getAbsolutePath() + " - " + e);
+ prevDetails = e.toString();
+ }
+ }
+ try (PrintWriter writer = new PrintWriter(instanceFile)) {
+ writer.println(details);
+ } catch (FileNotFoundException e) {
+ System.err.println("ERROR: Failed to create registry file: " + instanceFile.getAbsolutePath()
+ + " - " + e);
+ }
+
+ // Ensure that this instance is unregistered when process ends
+ Runtime.getRuntime().addShutdownHook(new UnregisterHook(this, name, address));
+
+ return prevDetails;
+ }
+
+ // Return first address or block if none available
+ @Override
+ public synchronized String fetch(String name) {
+
+ File serviceDir = new File(registryDir, encode(name));
+ while(true) {
+ if (serviceDir.exists()) {
+ File[] files = serviceDir.listFiles();
+ if (files.length > 0) {
+ return decode(files[0].getName());
+ }
+ }
+ try {
+ System.out.println("!! fetch will try "+name+" again in 15 secs");
+ Thread.sleep(15000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ // Return an array of 2-element arrays holding address & details
+ @Override
+ public String[][] query(String name) {
+ File serviceDir = new File(registryDir, encode(name));
+ if (!serviceDir.exists()) {
+ return new String[0][];
+ }
+ File[] files = serviceDir.listFiles();
+ String[][] addrs = new String[files.length][];
+ for (int i = 0; i < addrs.length; ++i) {
+ String details;
+ try (BufferedReader reader = new BufferedReader(new FileReader(files[i]))) {
+ details = reader.readLine();
+ } catch (IOException e) {
+ details = "ERROR: Failed to read instanceId from registry file: " + files[i].getAbsolutePath() + " - " + e;
+ }
+ String[] adPair = new String[2];
+ adPair[0] = decode(files[i].getName());
+ adPair[1] = details;
+ addrs[i] = adPair;
+ }
+ return addrs;
+ }
+
+ // Remove the file "name/address"
+ // Remove the directory when no instances left
+ @Override
+ public boolean unregister(String name, String address) {
+ File serviceDir = new File(registryDir, encode(name));
+ if (serviceDir.exists()) {
+ File instanceFile = new File(serviceDir, encode(address));
+ if (instanceFile.exists()) {
+ instanceFile.delete();
+ if (serviceDir.list().length == 0) {
+ // Could be a race if another thread is about to create an entry !
+ serviceDir.delete();
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Encode/decode characters that are illegal in filenames.
+ // Only '/' is illegal but must also %-encode '%'
+ static String[] badChars = { "%", "%25", "/", "%2F" };
+
+ private String encode(String name) {
+ for (int i = 0; i < badChars.length; i += 2) {
+ name = name.replaceAll(badChars[i], badChars[i + 1]);
+ }
+ return name;
+ }
+
+ private String decode(String name) {
+ for (int i = 0; i < badChars.length; i += 2) {
+ name = name.replaceAll(badChars[i + 1], badChars[i]);
+ }
+ return name;
+ }
+
+ private class UnregisterHook extends Thread {
+ private ServiceRegistry registry;
+ private String name;
+ private String address;
+ UnregisterHook(ServiceRegistry registry, String name, String address) {
+ this.registry = registry;
+ this.name = name;
+ this.address = address;
+ }
+ public void run() {
+ registry.unregister(name, address);
+ System.out.println("!! Unregistered " + address + " in shotdown hook");
+ }
+ }
+
+ /* ================================================================================= */
+
+ public static void main(String[] args) throws InterruptedException {
+ if (args.length != 3) {
+ System.out.println("Usage: service-name address-prefix num");
+ return;
+ }
+ String name = args[0];
+ ServiceRegistry reg = ServiceRegistry_impl.getInstance();
+ String[][] addrs = reg.query(name);
+ System.out.println("Service " + name + " has " + addrs.length + " instances");
+
+ int num = Integer.valueOf(args[2]);
+ for (int i = 0; i < num; ++i) {
+ reg.register(name, args[1] + i, "100" + i);
+ addrs = reg.query(name);
+ System.out.println("Service " + args[0] + " has " + addrs.length + " instances");
+ for (String[] addrDetails : addrs) {
+ System.out.println(" addr: " + addrDetails[0] + " details: " + addrDetails[1]);
+ }
+ }
+
+ String address = reg.fetch(name);
+ System.out.println("fetch returned: " + address);
+
+ System.out.println("Sleeping for 30 secs");
+ Thread.sleep(30000);
+ for (int i = 1; i < num + 1; ++i) {
+ String addr = args[1] + i;
+ boolean ok = reg.unregister(name, addr);
+ System.out.println("Unregistered " + addr + " was " + ok);
+ }
+
+ address = reg.fetch(name);
+ System.out.println("fetch returned: " + address);
+
+ String addr = args[1] + 0;
+ reg.unregister(name, addr);
+
+ address = reg.fetch(name);
+ System.out.println("fetch returned: " + address);
+
+ }
+
+}
Propchange: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/ServiceRegistry_impl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/ServiceDriver.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/ServiceDriver.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/ServiceDriver.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/iface/ServiceDriver.java Thu Sep 21 20:39:39 2017
@@ -29,6 +29,8 @@ public interface ServiceDriver extends L
public static String Application = "driver.application.name";
public static String Port = "driver.server.port";
public static String MaxThreads = "driver.server.max.threads";
+ public static String Registry = "registry";
+ // Perhaps should move all these keys into the shared ConfigurationProperties?
public void initialize(Properties props) throws DriverException;
public TaskAllocatorCallbackListener getTaskAllocator();
Modified: uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/task/transport/HttpTaskTransportHandler.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/task/transport/HttpTaskTransportHandler.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/task/transport/HttpTaskTransportHandler.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-container/src/main/java/org/apache/uima/ducc/container/sd/task/transport/HttpTaskTransportHandler.java Thu Sep 21 20:39:39 2017
@@ -32,6 +32,8 @@ import org.apache.uima.UIMAFramework;
import org.apache.uima.ducc.common.utils.XStreamUtils;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction;
import org.apache.uima.ducc.container.net.iface.IMetaCasTransaction.Direction;
+import org.apache.uima.ducc.container.sd.ServiceRegistry;
+import org.apache.uima.ducc.container.sd.ServiceRegistry_impl;
import org.apache.uima.ducc.container.sd.iface.ServiceDriver;
import org.apache.uima.ducc.container.sd.task.error.TaskProtocolException;
import org.apache.uima.ducc.container.sd.task.iface.TaskProtocolHandler;
@@ -54,6 +56,7 @@ public class HttpTaskTransportHandler im
private volatile boolean running = false;
// mux is used to synchronize start()
private Object mux = new Object();
+
public HttpTaskTransportHandler() {
}
@@ -87,8 +90,8 @@ public class HttpTaskTransportHandler im
}
}
- public Server createServer(int httpPort, int maxThreads, String app,
- TaskProtocolHandler handler) throws Exception {
+ public Server createServer(int httpPort, int maxThreads, String app, TaskProtocolHandler handler, String registryAddr)
+ throws Exception {
// Server thread pool
QueuedThreadPool threadPool = new QueuedThreadPool();
@@ -112,9 +115,30 @@ public class HttpTaskTransportHandler im
context.setContextPath("/");
server.setHandler(context);
- context.addServlet(new ServletHolder(new TaskHandlerServlet(handler)),
- app);
- logger.log(Level.INFO,"Service Driver URL: "+ context.getServer().getURI().toString()+httpPort+app);//"Jetty URL: http://localhost:"+httpPort+app);
+ context.addServlet(new ServletHolder(new TaskHandlerServlet(handler)), "/"+app);
+
+ // Establish the URL we could register for our customers
+ String taskUrl = server.getURI().toString();
+ if (taskUrl.endsWith("/")) {
+ taskUrl = taskUrl.substring(0, taskUrl.length() - 1);
+ }
+ taskUrl += ":" + httpPort + "/" + app;
+ logger.log(Level.INFO, "Service Driver URL: " + taskUrl); // e.g. http://localhost:8888/test");
+
+ // Register the task allocator's URL if a registry is specified
+ // The type of registry is determined by the registry class.
+
+ String taskServerName = app; // why not?
+ if (registryAddr != null) {
+ ServiceRegistry registry = ServiceRegistry_impl.getInstance();
+ if (registry.initialize(registryAddr)) {
+ registry.register(taskServerName, taskUrl, ""); // Will also create a shutdown hook to unregister
+ logger.log(Level.INFO,"Registered: " + taskServerName);
+ }
+ } else {
+ logger.log(Level.WARNING, "Registration skipped - registry=" + registryAddr + " server="+taskServerName);
+ }
+
return server;
}
@@ -122,12 +146,11 @@ public class HttpTaskTransportHandler im
public void initialize(Properties properties) throws TaskTransportException {
// TODO Auto-generated method stub
// Max cores
- int cores = Runtime.getRuntime().availableProcessors();
- String portString = (String) properties.get(ServiceDriver.Port);
- String maxThreadsString = (String) properties
- .get(ServiceDriver.MaxThreads);
- String appName = (String) properties
- .get(ServiceDriver.Application);
+ int cores = Runtime.getRuntime().availableProcessors();
+ String portString = (String) properties.get(ServiceDriver.Port);
+ String maxThreadsString = (String) properties.get(ServiceDriver.MaxThreads);
+ String appName = (String) properties.get(ServiceDriver.Application);
+ String registry = (String) properties.get(ServiceDriver.Registry); // optional
int maxThreads = cores;
int httpPort = -1;
@@ -152,13 +175,12 @@ public class HttpTaskTransportHandler im
}
}
if (appName == null) {
- throw new TaskTransportException("The required "
- + ServiceDriver.Application
- + " property is not specified");
+ appName = "test";
+ logger.log(Level.WARNING, "The "+ServiceDriver.Application+" property is not specified - using "+appName);
}
try {
// create and initialize Jetty Server
- server = createServer(httpPort, maxThreads, appName, taskProtocolHandler);
+ server = createServer(httpPort, maxThreads, appName, taskProtocolHandler, registry);
} catch (Exception e) {
throw new TaskTransportException(e);
}
Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/DuccHttpClient.java Thu Sep 21 20:39:39 2017
@@ -19,9 +19,9 @@
package org.apache.uima.ducc.transport.configuration.jp;
import java.io.InvalidClassException;
import java.lang.management.ManagementFactory;
+import java.net.URI;
import java.util.Arrays;
import java.util.Properties;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -39,7 +39,6 @@ import org.apache.http.entity.StringEnti
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.pool.BasicConnPool;
-import org.apache.http.impl.pool.BasicPoolEntry;
import org.apache.http.util.EntityUtils;
import org.apache.uima.ducc.common.IDuccUser;
import org.apache.uima.ducc.common.NodeIdentity;
@@ -52,6 +51,10 @@ import org.apache.uima.ducc.container.ne
import org.apache.uima.ducc.container.net.impl.MetaCasTransaction;
import org.apache.uima.ducc.container.net.impl.PerformanceMetrics;
import org.apache.uima.ducc.container.net.impl.TransactionId;
+import org.apache.uima.ducc.container.sd.ConfigurationProperties;
+import org.apache.uima.ducc.container.sd.ServiceRegistry;
+import org.apache.uima.ducc.container.sd.ServiceRegistry_impl;
+import org.apache.uima.ducc.container.sd.iface.ServiceDriver;
public class DuccHttpClient {
private DuccLogger logger = new DuccLogger(DuccHttpClient.class);
@@ -68,21 +71,54 @@ public class DuccHttpClient {
private int ClientMaxConnections = 0;
private int ClientMaxConnectionsPerRoute = 0;
private int ClientMaxConnectionsPerHostPort = 0;
-
- public DuccHttpClient(JobProcessComponent duccComponent) {
- this.duccComponent = duccComponent;
- }
+ private ServiceRegistry registry = null;
+ private String taskServerName;
+ private Properties config = null;
+
+ public DuccHttpClient(JobProcessComponent duccComponent) {
+ this.duccComponent = duccComponent;
+ }
+
public void setScaleout(int scaleout) {
connPool.setMaxTotal(scaleout);
connPool.setDefaultMaxPerRoute(scaleout);
connPool.setMaxPerRoute(host, scaleout);
}
+
+ // If no registry use the url in the system properties, e.g. JD/JP case
+ // The fetch should return one of the values saved by the most recent notification.
+ // but should block if no instance currently registered.
public String getJdUrl() {
- return jdUrl;
+ if (registry == null) {
+ return jdUrl;
+ }
+ String address = registry.fetch(taskServerName); // Will block if none registered
+ logger.info("getJdUrl", null, "Registry entry for", taskServerName, "is", address);
+ return address;
}
public void initialize(String jdUrl) throws Exception {
- this.jdUrl = jdUrl;
+
+ // If not specified get the url from the registry
+ if (jdUrl == null || jdUrl.isEmpty()) {
+ config = ConfigurationProperties.getProperties(); // Holds registry details AND service.type
+ String registryLocn = config.getProperty(ServiceDriver.Registry);
+ taskServerName = config.getProperty(ServiceDriver.Application);
+ if (registryLocn != null && taskServerName != null) {
+ registry = ServiceRegistry_impl.getInstance();
+ if (!registry.initialize(registryLocn)) {
+ registry = null;
+ }
+ }
+ if (registry == null) {
+ throw new RuntimeException("Failed to connect to registry at "+registryLocn+" to locate server "+taskServerName);
+ }
+ logger.info("initialize", null, "Using registry at", registryLocn, "to locate server", taskServerName);
+ jdUrl = getJdUrl();
+ }
+ this.jdUrl = jdUrl;
+
+ logger.info("initialize", null, "Found jdUrl =", jdUrl);
int pos = jdUrl.indexOf("//");
int ipEndPos = jdUrl.indexOf(":", pos);
@@ -169,12 +205,13 @@ public class DuccHttpClient {
return nn;
}
private String getProcessName() {
- String pn = System.getProperty("UimaRequestServiceType");
- if (pn == null) {
- pn = System.getenv(IDuccUser.EnvironmentVariable.DUCC_ID_PROCESS.value());
+ String pn = System.getenv(IDuccUser.EnvironmentVariable.DUCC_ID_PROCESS.value());
+ if (config != null && config.containsKey("service.type")) {
+ pn = config.getProperty("service.type"); // Indicates the type of service request
}
return pn;
}
+
private void addCommonHeaders( IMetaCasTransaction transaction ) {
String location = "addCommonHeaders";
transaction.setRequesterAddress(getIP());
@@ -281,19 +318,25 @@ public class DuccHttpClient {
}
}
}
+
private HttpResponse retryUntilSuccessfull(IMetaCasTransaction transaction, HttpPost postMethod) throws Exception {
HttpResponse response=null;
// Only one thread attempts recovery. Other threads will block here
// until connection to the remote is restored.
+ logger.error("retryUntilSucessfull", null, "Connection Lost to", postMethod.getURI(), "- Retrying Until Successful ...");
lock.lock();
- logger.error("retryUntilSucessfull", null, "Thread:"+Thread.currentThread().getId()+" - Connection Lost to "+postMethod.getURI()+" - Retrying Until Successfull ...");
// retry indefinitely
while( duccComponent.isRunning() ) {
try {
// retry the command
+ jdUrl = getJdUrl();
+ URI jdUri = new URI(jdUrl);
+ postMethod.setURI(jdUri);
+ logger.warn("retryUntilSucessfull", null, "Trying to connect to", jdUrl);
response = httpClient.execute(postMethod);
- logger.error("retryUntilSucessfull", null, "Thread:"+Thread.currentThread().getId()+" Recovered Connection ...");
+ logger.warn("retryUntilSucessfull", null, "Recovered Connection");
+
// success, so release the lock so that other waiting threads
// can retry command
if ( lock.isHeldByCurrentThread()) {
@@ -305,12 +348,14 @@ public class DuccHttpClient {
} catch( HttpHostConnectException exx ) {
// Connection still not available so sleep awhile
synchronized(postMethod) {
+ logger.warn("retryUntilSucessfull", null, "Connection failed - retry in", duccComponent.getThreadSleepTime()/1000, "secs");
postMethod.wait(duccComponent.getThreadSleepTime());
}
}
}
return response;
}
+
public static void main(String[] args) {
try {
HttpPost postMethod = new HttpPost(args[0]);
@@ -327,7 +372,7 @@ public class DuccHttpClient {
// HTTP request. HttpClient actually enforces this. So
// do a POST instead of a GET.
transaction.setType(Type.Get); // Tell JD you want a Work Item
- String command = Type.Get.name();
+ //String command = Type.Get.name();
System.out.println("HttpWorkerThread.run() "+ "Thread Id:"+Thread.currentThread().getId()+" Requesting next WI from JD");;
// send a request to JD and wait for a reply
transaction = client.execute(transaction, postMethod);
@@ -337,7 +382,7 @@ public class DuccHttpClient {
System.out.println("CAS:"+transaction.getMetaCas().getUserSpaceCas());
// Confirm receipt of the CAS.
transaction.setType(Type.Ack);
- command = Type.Ack.name();
+ //command = Type.Ack.name();
tid = new TransactionId(seq.incrementAndGet(), minor++);
transaction.setTransactionId(tid);
System.out.println("run Thread:"+Thread.currentThread().getId()+" Sending ACK request - WI:"+transaction.getMetaCas().getSystemKey());
@@ -351,7 +396,7 @@ public class DuccHttpClient {
}
transaction.setType(Type.End);
- command = Type.End.name();
+ //command = Type.End.name();
tid = new TransactionId(seq.incrementAndGet(), minor++);
transaction.setTransactionId(tid);
IPerformanceMetrics metricsWrapper =
@@ -369,4 +414,4 @@ public class DuccHttpClient {
}
}
-}
\ No newline at end of file
+}
Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/HttpWorkerThread.java Thu Sep 21 20:39:39 2017
@@ -218,16 +218,16 @@ public class HttpWorkerThread implements
logger.info("run", null, "T["+Thread.currentThread().getId()+"] - Regained Connection to JD");
}
+ // If the client did not provide a Work Item, reduce frequency of Get requests
+ // by sleeping in between Get's. Synchronize so only one thread is polling for work
+
// if the JD did not provide a Work Item, most likely the CR is
// done. In such case, reduce frequency of Get requests
// by sleeping in between Get's. Eventually the OR will
// deallocate this process and the thread will exit
if ( transaction.getMetaCas() == null || transaction.getMetaCas().getUserSpaceCas() == null) {
- logger.info("run", null,"Thread:"+Thread.currentThread().getId()+" Recv'd Response: JD is presently out of work items. Will retry in "+duccComponent.getThreadSleepTime()/1000+" seconds.");
+ logger.info("run", null, "Client is out of work - will retry quietly every",duccComponent.getThreadSleepTime()/1000,"secs.");
- // the JD says there are no more WIs. Sleep awhile
- // do a GET in case JD changes its mind. The JP will
- // eventually be stopped by the agent
// Retry at the start of this block as another thread may have just exited with work
// so the TAS (or JD) may now have a lot of work.
synchronized (HttpWorkerThread.class) {
Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/jp/JobProcessComponent.java Thu Sep 21 20:39:39 2017
@@ -277,13 +277,13 @@ implements IJobProcessor{
getLogger().info("start", null,"Ducc JP JobType="+jobType);
httpClient = new DuccHttpClient(this);
- String jdURL="";
+ String jdURL = "";
try {
jdURL = System.getProperty(FlagsHelper.Name.JdURL.pname());
- // initialize http client. It tests the connection and fails
- // if unable to connect
+ // Test the connection and fail if unable to connect
+ // Gets the url from the registry if not in the system properties
httpClient.initialize(jdURL);
- logger.info("start", null,"The JP Connected To JD Using URL "+jdURL);
+ logger.info("start", null,"The JP Connected To JD Using URL "+httpClient.getJdUrl());
} catch( Exception ee ) {
if ( ee.getCause() != null && ee instanceof java.net.ConnectException ) {
logger.error("start", null, "JP Process Unable To Connect to the JD Using Provided URL:"+jdURL+" Unable to Continue - Shutting Down JP");
Modified: uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceConfiguration.java
URL: http://svn.apache.org/viewvc/uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceConfiguration.java?rev=1809225&r1=1809224&r2=1809225&view=diff
==============================================================================
--- uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceConfiguration.java (original)
+++ uima/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/configuration/service/ServiceConfiguration.java Thu Sep 21 20:39:39 2017
@@ -31,7 +31,6 @@ import org.apache.uima.ducc.common.conta
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.DuccExchange;
import org.apache.uima.ducc.transport.DuccTransportConfiguration;
-import org.apache.uima.ducc.transport.agent.ProcessStateUpdate;
import org.apache.uima.ducc.transport.configuration.jp.AgentSession;
import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
import org.springframework.beans.factory.annotation.Autowired;